8、asynicio模塊以及爬蟲應用asynicio模塊(高性能爬蟲)

asynicio模塊以及爬蟲應用asynicio模塊(高性能爬蟲)

1、背景知識html

 爬蟲的本質就是一個socket客戶端與服務端的通訊過程,若是咱們有多個url待爬取,只用一個線程且採用串行的方式執行,那隻能等待爬取一個結束後才能繼續下一個,效率會很是低。python

須要強調的是:對於單線程下串行N個任務,並不徹底等同於低效,若是這N個任務都是純計算的任務,那麼該線程對cpu的利用率仍然會很高,之因此單線程下串行多個爬蟲任務低效,是由於爬蟲任務是明顯的IO密集型程序。react

2、同步、異步、回調機制程序員

一、同步調用:即提交一個任務後就在原地等待任務結束,等到拿到任務的結果後再繼續下一行代碼,效率低下web

複製代碼
import requests

def parse_page(res):
    print('解析 %s' %(len(res)))

def get_page(url):
    print('下載 %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        return response.text

urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
for url in urls:
    res=get_page(url) #調用一個任務,就在原地等待任務結束拿到結果後才繼續日後執行
    parse_page(res)
複製代碼

二、一個簡單的解決方案:多線程或多進程數據庫

#在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),
這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。
複製代碼
#IO密集型程序應該用多線程
import requests
from threading import Thread,current_thread

def parse_page(res):
    print('%s 解析 %s' %(current_thread().getName(),len(res)))

def get_page(url,callback=parse_page):
    print('%s 下載 %s' %(current_thread().getName(),url))
    response=requests.get(url)
    if response.status_code == 200:
        callback(response.text)

if __name__ == '__main__':
    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
    for url in urls:
        t=Thread(target=get_page,args=(url,))
        t.start()
複製代碼

   該方案的問題是:緩存

開啓多進程或都線程的方式,咱們是沒法無限制地開啓多進程或多線程的:在遇到要同時響應成百上千路的鏈接請求,
則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,並且線程與進程自己也更容易進入假死狀態。
三、改進方案: 線程池或進程池+異步調用:提交一個任務後並不會等待任務結束,而是繼續下一行代碼
#不少程序員可能會考慮使用「線程池」或「鏈接池」。「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,
並讓空閒的線程從新承擔新的執行任務。「鏈接池」維持鏈接的緩存池,儘可能重用已有的鏈接、減小建立和關閉鏈接的頻率。
這兩種技術均可以很好的下降系統開銷,都被普遍應用不少大型系統,如websphere、tomcat和各類數據庫等。
 1 #IO密集型程序應該用多線程,因此此時咱們使用線程池
 2 import requests
 3 from threading import current_thread
 4 from concurrent.futures import ThreadPoolExecutor,ProcessPoolExecutor
 5 
 6 def parse_page(res):
 7     res=res.result()
 8     print('%s 解析 %s' %(current_thread().getName(),len(res)))
 9 
10 def get_page(url):
11     print('%s 下載 %s' %(current_thread().getName(),url))
12     response=requests.get(url)
13     if response.status_code == 200:
14         return response.text
15 
16 if __name__ == '__main__':
17     urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
18 
19     pool=ThreadPoolExecutor(50)
20     # pool=ProcessPoolExecutor(50)
21     for url in urls:
22         pool.submit(get_page,url).add_done_callback(parse_page)
23 
24     pool.shutdown(wait=True)
進程池或線程池:異步調用+回調機制

    改進後方案其實也存在着問題:tomcat

#「線程池」和「鏈接池」技術也只是在必定程度上緩解了頻繁調用IO接口帶來的資源佔用。並且,所謂「池」始終有其上限,
當請求大大超過上限時,「池」構成的系統對外界的響應並不比沒有池的時候效果好多少。因此使用「池」必須考慮其面臨的響應規模,
並根據響應規模調整「池」的大小

對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。服務器

總之,多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,能夠用非阻塞接口來嘗試解決這個問題。網絡

3、高性能

    上述不管哪一種解決方案其實沒有解決一個性能相關的問題:IO阻塞,不管是多進程仍是多線程,在遇到IO阻塞時都會被操做系統強行剝奪走CPU的執行權限,程序的執行效率所以就下降了下來。

    解決這一問題的關鍵在於,咱們本身從應用程序級別檢測IO阻塞而後切換到咱們本身程序的其餘任務執行,這樣把咱們程序的IO降到最低,咱們的程序處於就緒態就會增多,以此來迷惑操做系統,操做系統便覺得咱們的程序是IO比較少的程序,從而會盡量多的分配CPU給咱們,這樣也就達到了提高程序執行效率的目的

    一、在python3.3以後新增了asyncio模塊,能夠幫咱們檢測IO(只能是網絡IO),實現應用程序級別的切換

複製代碼
import asyncio
#當程序遇到IO的時候不阻塞了,讓這個裝飾器去檢測有沒有IO,當有IO的時候提醒一下,切到其餘的地方去
@asyncio.coroutine
def task(task_id,seconds):
    print("%s is start"%task_id)
    yield from asyncio.sleep(seconds)  #自動檢測IO,  #遇到IO就切,而且保存狀態
    print("%s id end" %task_id)

tasks = [
    task(task_id="任務1",seconds=3),
    task(task_id="任務2",seconds=2),
    task(task_id="任務3",seconds=1),
]
loop = asyncio.get_event_loop()  #建立事件循環
loop.run_until_complete(asyncio.wait(tasks))  #運行事件循環,直到任務完成
loop.close()  #一旦任務結束,就獲取到任務的結果
複製代碼

  二、但asyncio模塊只能發tcp級別的請求,不能發http協議,所以,在咱們須要發送http請求的時候,須要咱們自定義http報頭

 1 import requests
 2 import asyncio
 3 import uuid
 4 User_Agent='Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/63.0.3239.132 Safari/537.36'
 5 
 6 def parse_page(res):
 7     with open("%s.html"%uuid.uuid1(),"wb") as f:
 8         f.write(res)
 9 
10 def get_pager(host,port=80,url="/",ssl=False,callback=parse_page):
11 
12     #一、創建鏈接
13     if ssl:
14         port = 443
15     print("下載:https:%s:%s:%s"%(host,port,url))
16     recv,send = yield from asyncio.open_connection(host=host,port=port,ssl=ssl)
17 
18     #二、封裝請求頭
19     request_headers="""GET %s HTTP/1.0\r\nHost: %s\r\nUser-Agent: %s\r\n\r\n""" %(url,host,User_Agent)# http / 1.0省去了拼接太多的東西
20     request_headers=request_headers.encode('utf-8')
21 
22     #三、發送請求頭
23     send.write(request_headers) #套接字不能發字符串,要發bytes
24     yield from send.drain() # 發送請求頭 #遇到IO就切,而且保存狀態
25     #四、接收響應頭
26     # recv.read()  # 接收所有的,可是不能區分響應頭和響應體
27     # recv.readline()  # 一次收一行,可是你也不肯定一次收幾行,因此搞個循環
28     while True:
29         line = yield from recv.readline()
30         if line == b'\r\n':  # 最後一行是\r\n,就結束了
31             break
32     #五、接受響應體
33     text = yield from recv.read()
34     #六、調用回調函數,完成解析功能
35     #看一下效果,保存起來,吧返回的值給一個回調函數
36     callback(text)
37     #七、關閉鏈接
38     send.close()
39     # 三次握手創建好以後,必定是四次以後才斷開鏈接
40     # 發送端決定接受數據的何時關閉,
41     # 沒有recv.close()
42 
43 if __name__ == '__main__':
44     tasks = [
45         get_pager(host='www.baidu.com', url='/s?wd=唐詩三百首', ssl=True),
46         get_pager(host='www.cnblogs.com', url='/haiyan123/p/7445542.html', ssl=True)
47     ]
48     loop = asyncio.get_event_loop()
49     loop.run_until_complete(asyncio.wait(tasks))
50     loop.close()
爬蟲應用asyncio模塊

  三、自定義http報頭多少有點麻煩,因而有了aiohttp模塊,專門幫咱們封裝http報頭,而後咱們還須要用asyncio檢測IO實現切換

 1 import aiohttp
 2 import asyncio
 3 
 4 @asyncio.coroutine
 5 def get_page(url):
 6     print('GET:%s' %url)
 7     response=yield from aiohttp.request('GET',url)
 8 
 9     data=yield from response.read()
10 
11     print(url,data)
12     response.close()
13     return 1
14 
15 tasks=[
16     get_page('https://www.python.org/doc'),
17     get_page('https://www.cnblogs.com/linhaifeng'),
18     get_page('https://www.openstack.org')
19 ]
20 
21 loop=asyncio.get_event_loop()
22 results=loop.run_until_complete(asyncio.gather(*tasks))
23 loop.close()
24 
25 print('=====>',results) #[1, 1, 1]
asyncio+aiohttp

 四、此外,還能夠將requests.get函數傳給asyncio,就可以被檢測了

 1 import requests
 2 import asyncio
 3 
 4 @asyncio.coroutine
 5 def get_page(func,*args):
 6     print('GET:%s' %args[0])
 7     loog=asyncio.get_event_loop()
 8     furture=loop.run_in_executor(None,func,*args)
 9     response=yield from furture
10 
11     print(response.url,len(response.text))
12     return 1
13 
14 tasks=[
15     get_page(requests.get,'https://www.python.org/doc'),
16     get_page(requests.get,'https://www.cnblogs.com/linhaifeng'),
17     get_page(requests.get,'https://www.openstack.org')
18 ]
19 
20 loop=asyncio.get_event_loop()
21 results=loop.run_until_complete(asyncio.gather(*tasks))
22 loop.close()
23 
24 print('=====>',results) #[1, 1, 1]
asyncio+requests模塊的方法

五、還有以前在協程時介紹的gevent模塊

 1 from gevent import monkey;monkey.patch_all()
 2 import gevent
 3 import requests
 4 
 5 def get_page(url):
 6     print('GET:%s' %url)
 7     response=requests.get(url)
 8     print(url,len(response.text))
 9     return 1
10 
11 # g1=gevent.spawn(get_page,'https://www.python.org/doc')
12 # g2=gevent.spawn(get_page,'https://www.cnblogs.com/linhaifeng')
13 # g3=gevent.spawn(get_page,'https://www.openstack.org')
14 # gevent.joinall([g1,g2,g3,])
15 # print(g1.value,g2.value,g3.value) #拿到返回值
16 
17 
18 #協程池
19 from gevent.pool import Pool
20 pool=Pool(2)
21 g1=pool.spawn(get_page,'https://www.python.org/doc')
22 g2=pool.spawn(get_page,'https://www.cnblogs.com/linhaifeng')
23 g3=pool.spawn(get_page,'https://www.openstack.org')
24 gevent.joinall([g1,g2,g3,])
25 print(g1.value,g2.value,g3.value) #拿到返回值
gevent+requests

六、封裝了gevent+requests模塊的grequests模塊

1 #pip3 install grequests
 2 
 3 import grequests
 4 
 5 request_list=[
 6     grequests.get('https://wwww.xxxx.org/doc1'),
 7     grequests.get('https://www.cnblogs.com/linhaifeng'),
 8     grequests.get('https://www.openstack.org')
 9 ]
10 
11 
12 ##### 執行並獲取響應列表 #####
13 # response_list = grequests.map(request_list)
14 # print(response_list)
15 
16 ##### 執行並獲取響應列表(處理異常) #####
17 def exception_handler(request, exception):
18     # print(request,exception)
19     print("%s Request failed" %request.url)
20 
21 response_list = grequests.map(request_list, exception_handler=exception_handler)
22 print(response_list)
grequests

 七、twisted:是一個網絡框架,其中一個功能是發送異步請求,檢測IO並自動切換

 1 from twisted.web.client import getPage,defer
 2 from twisted.internet import reactor
 3 #pip install pypiwin32
 4 
 5 
 6 def all_done(res):  #這裏的res ,接收的是全部函D:\pywin32+twisted\Twisted-17.9.0-cp36-cp36m-win_amd64.whl數的返回值,
 7     '''等到全部的任務都結束了才觸發這個函數'''
 8     print(res) # #打印結果[(回調函數是否拋出異常<True,False>,回調函數的返回值),(),()]
 9     reactor.stop()
10 
11 def callback(res):
12     print(len(res))  #obj
13     return 1
14 
15 
16 urls = [
17     'http://www.baidu.com',
18     'http://www.bing.com',
19     'http://www.python.org',
20 ]
21 task = []
22 for url in urls:
23     obj = getPage(url.encode("utf-8"),)  #請求url頁面,要傳bytes類型的
24     obj.addCallback(callback)  #吧結果給了回調函數
25     task.append(obj)
26 # defer.Deferred(task)  #建立循環,開始檢測IO
27 defer.DeferredList(task).addBoth(all_done)  #給全部任務綁定一個回調函數,等全部的任務都結束了之後關閉鏈接
28 reactor.run()
twisted

八、tornado

 1 from tornado.httpclient import AsyncHTTPClient
 2 from tornado.httpclient import HTTPRequest
 3 from tornado import ioloop
 4 
 5 
 6 def handle_response(response):
 7     """
 8     處理返回值內容(須要維護計數器,來中止IO循環),調用 ioloop.IOLoop.current().stop()
 9     :param response:
10     :return:
11     """
12     if response.error:
13         print("Error:", response.error)
14     else:
15         print(response.body)
16 
17 
18 def func():
19     url_list = [
20         'http://www.baidu.com',
21         'http://www.bing.com',
22     ]
23     for url in url_list:
24         print(url)
25         http_client = AsyncHTTPClient()
26         http_client.fetch(HTTPRequest(url), handle_response)
27 
28 
29 ioloop.IOLoop.current().add_callback(func)
30 ioloop.IOLoop.current().start()
31 
32 
33 
34 
35 #發現上例在全部任務都完畢後也不能正常結束,爲了解決該問題,讓咱們來加上計數器
36 from tornado.httpclient import AsyncHTTPClient
37 from tornado.httpclient import HTTPRequest
38 from tornado import ioloop
39 
40 count=0
41 
42 def handle_response(response):
43     """
44     處理返回值內容(須要維護計數器,來中止IO循環),調用 ioloop.IOLoop.current().stop()
45     :param response:
46     :return:
47     """
48     if response.error:
49         print("Error:", response.error)
50     else:
51         print(len(response.body))
52 
53     global count
54     count-=1 #完成一次回調,計數減1
55     if count == 0:
56         ioloop.IOLoop.current().stop() 
57 
58 def func():
59     url_list = [
60         'http://www.baidu.com',
61         'http://www.bing.com',
62     ]
63 
64     global count
65     for url in url_list:
66         print(url)
67         http_client = AsyncHTTPClient()
68         http_client.fetch(HTTPRequest(url), handle_response)
69         count+=1 #計數加1
70 
71 ioloop.IOLoop.current().add_callback(func)
72 ioloop.IOLoop.current().start()
Tornado
相關文章
相關標籤/搜索