莫煩網-爬蟲學習-代碼記錄

from urllib.request import urlopen,urljoin
import re
from bs4 import BeautifulSoup
import random
import requests
import webbrowser
import os
from urllib.request import urlretrieve
import multiprocessing as mp
import time
import asyncio
import aiohttp
from selenium import webdriver
from selenium.webdriver.chrome.options import Options
import scrapy
def url():
    base_url = "https://baike.baidu.com"
    his = ["/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711"]
    url=base_url+his[-1]
    html=urlopen(url).read().decode('utf-8')
    #html=urlopen("https://morvanzhou.github.io/static/scraping/basic-structure.html").read().decode("utf-8")
    #html = urlopen("https://morvanzhou.github.io/static/scraping/list.html").read().decode('utf-8')
    #html = urlopen("https://morvanzhou.github.io/static/scraping/table.html").read().decode('utf-8')
    return html

def findobject():
    html=url()
    res=re.findall(r"<title>(.+?)</title>",html)
    rese=re.findall(r"<p>(.*?)</p>",html,flags=re.DOTALL)
    reses=re.findall(r'href="(.*?)"', html)
    print("\nPage title is: ",res[0])
    print("\nPage paragraph is: ",rese[0])
    print("\nAll links: ",reses)

def usesoup():
    html=url()
    soup=BeautifulSoup(html,features='lxml')
    print(soup.h1)
    print('\n',soup.p)
    all_href=soup.find_all('a')
    all_href=[l['href'] for l in all_href]
    print('\n',all_href) 
    month=soup.find_all('li',{"class":"month"})
    for m in month:
        print(m.get_text())
    jan=soup.find('ul',{"class":"jan"})
    d_jan=jan.find_all('li')
    for d in d_jan:
        print(d.get_text())

def Rexsoup():
    html=url()
    soup=BeautifulSoup(html,features='lxml')
    img_links=soup.find_all("img",{"src":re.compile('.*?\.jpg')})
    for link in img_links:
        print(link['src'])
    course_links=soup.find_all('a',{"href":re.compile('https://morvan.*')})
    for link in course_links:
        print(link['href'])

def baike():
    base_url = "https://baike.baidu.com"
    his = ["/item/%E7%BD%91%E7%BB%9C%E7%88%AC%E8%99%AB/5162711"]
    for i in range(20):
        url=base_url+his[-1]
        html=urlopen(url).read().decode('utf-8')
        soup=BeautifulSoup(html,features='lxml')
        print(i,soup.find('h1').get_text(),' url:',his[-1])

        sub_urls=soup.find_all("a",{"target":"_blank","href":re.compile("/item/(%.{2})+$")})
        if len(sub_urls)!=0:
            his.append(random.sample(sub_urls,1)[0]['href'])
        else:
            his.pop()
    #print(his)
def getbaidus():
    param = {"wd": "莫煩Python"}
    r=requests.get("http://www.baidu.com/s",params=param)
    print(r.url)
    webbrowser.open(r.url)

def postbaidu():#problem
    data = {'firstname': '莫煩', 'lastname': ''}
    r = requests.post('http://pythonscraping.com/files/processing.php', data=data)
    print(r.text)

def postfiile():#problem
    file = {'uploadFile': open('C:/Users/LX/Pictures/TLP.jpg', 'rb')}
    r = requests.post('http://pythonscraping.com/files/processing2.php', files=file)
    print(r.text)

def cookiepage():#problem
    payload={'username':'dsfdsfs','password':'password'}
    r = requests.post('http://pythonscraping.com/pages/cookies/welcome.php',data=payload)
    print(r.cookies.get_dict())
    a = requests.get('http://pythonscraping.com/pages/cookies/profile.php', cookies=r.cookies)
    print(a.text)

def sessioncookies():
    session=requests.Session()
    payload={'username':'dsfdsfs','password':'password'}
    r=session.post('http://pythonscraping.com/pages/cookies/welcome.php',data=payload)
    print(r.cookies.get_dict())

    r=session.get("http://pythonscraping.com/pages/cookies/profile.php")
    print(r.text)

def uploadfile():
    os.makedirs('d:\yanglele',exist_ok=True)
    IMAGE_URL = "https://morvanzhou.github.io/static/img/description/learning_step_flowchart.png"
    urlretrieve(IMAGE_URL,'d:\yanglele\image1.png')#下載功能

def requestfile():
    IMAGE_URL = "https://morvanzhou.github.io/static/img/description/learning_step_flowchart.png"
    r=requests.get(IMAGE_URL)#下載功能
    with open('d:\yanglele\image2.png','wb') as f:
        f.write(r.content)

def requestf():
    IMAGE_URL = "https://morvanzhou.github.io/static/img/description/learning_step_flowchart.png"
    r=requests.get(IMAGE_URL,stream=True)
    with open('d:\yanglele\image3.png','wb') as f:
        for chunk in r.iter_content(chunk_size=32):#下載功能
            f.write(chunk)

def downloadimg():
    URL = "http://www.nationalgeographic.com.cn/animals/"
    html=requests.get(URL).text
    soup=BeautifulSoup(html,'lxml')
    img_url=soup.find_all('ul',{'class':'img_list'})
    for ul in img_url:
        imgs=ul.find_all('img')
        for img in imgs:
            url=img['src']
            r=requests.get(url,stream=True)
            image_name=url.split('/')[-1]
            with open('d:\yanglele\%s' % image_name,'wb') as f:
                for chunk in r.iter_content(chunk_size=128):
                    f.write(chunk)
            print('Saved %s' % image_name)

base_url ='https://morvanzhou.github.io/'
if base_url !='https://morvanzhou.github.io/':
    restricted_crawl = True
else:
    restricted_crawl = False
def crawl(url):
    response=urlopen(url)
    #time.sleep(0.1)
    return response.read().decode()

def parse(html):
    soup = BeautifulSoup(html,'lxml')
    urls = soup.find_all('a',{'href':re.compile('^/.+?/$')})
    title = soup.find('h1').get_text().strip()
    page_urls=set([urljoin(base_url,url['href']) for url in urls])#去重
    url = soup.find('meta',{'property':'og:url'})['content']
    return title,page_urls,url

def singleuse():    
    unseen=set([base_url,])
    seen=set()
    if base_url !='https://morvanzhou.github.io/':
        restricted_crawl = True
    else:
        restricted_crawl = False
    count,t1=1,time.time()
    while len(unseen) != 0:
        if restricted_crawl and len(seen) >= 20:
            break
        print('\nDistributed Crawling...')
        htmls=[crawl(url) for url in unseen]
        print('\nDistributed Parsing...')
        results=[parse(html) for html in htmls]
        print('\nAnalysing...')
        seen.update(unseen)
        unseen.clear()
        for title,page_urls,url in results:
            print(count,title,url)
            count+=1
            unseen.update(page_urls - seen)
    print('Total time: %.1f s' % (time.time()-t1,))

def multiuse():#須要if __name__=='__main__':才能正常運行
    unseen=set([base_url,])
    seen=set()
    pool=mp.Pool(4)
    count,t1=1,time.time()
    while len(unseen)!=0:
        if restricted_crawl and len(seen)>20:
            break
        print('\nDistributed Crawling...')
        crawl_jobs=[pool.apply_async(crawl,args=(url,)) for url in unseen]
        htmls=[j.get() for j in crawl_jobs]
        print('\nDistributed Parsing...')
        parse_jobs=[pool.apply_async(parse,args=(html,)) for html in htmls]
        results=[j.get() for j in parse_jobs]
        print('\nAnalysing...')
        seen.update(unseen)
        unseen.clear()
        for title,page_urls,url in results:
            print(count,title,url)
            count+=1
            unseen.update(page_urls - seen)
    print('Total time: %.1f s' % (time.time()-t1,))

def job(x):
    return x*x

def pooltest():
    pool = mp.Pool()
    res=pool.map(job,range(10))
    print(res)
    res=pool.apply_async(job,(2,))
    nulti_res=[pool.apply_async(job,(i,)) for i in range(10)]
    print(res.get())
    print([mures.get() for mures in multi_res])

def job1(t):
    print('Start job',t)
    time.sleep(t)
    print('Job',t,'takes',t,' s')

def main():
    [job1(t) for t in range(1,3)]

async def job2(t):  # async 形式的功能
    print('Start job',t)
    await asyncio.sleep(t)  # 等待 "t" 秒, 期間切換其餘任務
    print('Job',t,'takes',t,' s')

async def main1(loop):
    tasks = [
        loop.create_task(job2(t)) for t in range(1,3)    # 建立任務, 可是不執行
    ]
    await asyncio.wait(tasks)   # 執行並等待全部任務完成

def normal():
    for i in range(2):
        r=requests.get(base_url)
        url=r.url
        print(url)

async def job3(session):
    response = await session.get(base_url)   # 等待並切換
    return str(response.url)

async def main2(loop):
    async with aiohttp.ClientSession() as session:
        tasks = [loop.create_task(job3(session)) for _ in range(2)]
        finished,unfinished = await asyncio.wait(tasks)
        all_results = [r.result() for r in finished]
        print(all_results)

def asyncdo():
    t1=time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main2(loop))
    loop.close()
    print("Async total time:",time.time()-t1)

def seleniumweb():
    #chrome_options=Options()#不彈出瀏覽器窗口,可是仍是彈出窗口
    #chrome_options.add_argument("--headless")  
    #driver = webdriver.Chrome(chrome_options=chrome_options)
    driver = webdriver.Chrome(executable_path="C:\Program Files (x86)\Google\Chrome\Application\chromedriver")
    driver.get("https://morvanzhou.github.io/")
    driver.find_element_by_xpath(u"//img[@alt='強化學習 (Reinforcement Learning)']").click()
    driver.find_element_by_link_text("About").click()
    driver.find_element_by_link_text(u"贊助").click()
    driver.find_element_by_link_text(u"教程 ▾").click()
    driver.find_element_by_link_text(u"數據處理 ▾").click()
    driver.find_element_by_link_text(u"網頁爬蟲").click()

    html = driver.page_source
    driver.get_screenshot_as_file("D:\yanglele\jietu2.png")
    driver.close()


if __name__=='__main__':
    seleniumweb()

上面有些代碼執行不成功,姑且全記下php

import scrapy

class QuotesSpider(scrapy.Spider):
    name = "quotes"
    start_urls = [
        'http://quotes.toscrape.com/tag/humor/',
    ]

    def parse(self, response):
        for quote in response.css('div.quote'):
            yield {
                'text': quote.css('span.text::text').extract_first(),
                'author': quote.xpath('span/small/text()').extract_first(),
            }

        next_page = response.css('li.next a::attr("href")').extract_first()
        if next_page is not None:
            yield response.follow(next_page, self.parse)

https://docs.scrapy.org/en/latest/intro/overview.htmlcss

相關文章
相關標籤/搜索