高性能異步爬蟲
引入html
不少同窗對於異步這個概念只是停留在了「據說很NB」的認知層面上,不多有人可以在項目中真正的使用異步實現高性能的相關操做。本節課,我們就一塊兒來學習一下,爬蟲中如何使用異步實現高性能的數據爬取操做。python
背景程序員
其實爬蟲的本質就是client發請求批量獲取server的響應數據,若是咱們有多個url待爬取,只用一個線程且採用串行的方式執行,那隻能等待爬取一個結束後才能繼續下一個,效率會很是低。須要強調的是:對於單線程下串行N個任務,並不徹底等同於低效,若是這N個任務都是純計算的任務,那麼該線程對cpu的利用率仍然會很高,之因此單線程下串行多個爬蟲任務低效,是由於爬蟲任務是明顯的IO密集型(阻塞)程序。那麼該如何提升爬取性能呢?sql
分析處理express
- 同步調用:即提交一個任務後就在原地等待任務結束,等到拿到任務的結果後再繼續下一行代碼,效率低下flask
import requests def parse_page(res): print('解析 %s' %(len(res))) def get_page(url): print('下載 %s' %url) response=requests.get(url) if response.status_code == 200: return response.text urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org'] for url in urls: res=get_page(url) #調用一個任務,就在原地等待任務結束拿到結果後才繼續日後執行 parse_page(res)
- a. 解決同步調用方案之多線程/多進程服務器
-
-
好處:在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。
- 弊端:開啓多進程或都線程的方式,咱們是沒法無限制地開啓多進程或多線程的:在遇到要同時響應成百上千路的鏈接請求,則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,並且線程與進程自己也更容易進入假死狀態。
-
- b. 解決同步調用方案之線程/進程池網絡
-
- 好處:不少程序員可能會考慮使用「線程池」或「鏈接池」。「線程池」旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務。能夠很好的下降系統開銷。
- 弊端:「線程池」和「鏈接池」技術也只是在必定程度上緩解了頻繁調用IO接口帶來的資源佔用。並且,所謂「池」始終有其上限,當請求大大超過上限時,「池」構成的系統對外界的響應並不比沒有池的時候效果好多少。因此使用「池」必須考慮其面臨的響應規模,並根據響應規模調整「池」的大小。
- 案例:基於multiprocessing.dummy線程池爬取梨視頻的視頻信息
import requests import random from lxml import etree import re from fake_useragent import UserAgent #安裝fake-useragent庫:pip install fake-useragent #導入線程池模塊 from multiprocessing.dummy import Pool #實例化線程池對象 pool = Pool() url = 'http://www.pearvideo.com/category_1' #隨機產生UA ua = UserAgent().random headers = { 'User-Agent':ua } #獲取首頁頁面數據 page_text = requests.get(url=url,headers=headers).text #對獲取的首頁頁面數據中的相關視頻詳情連接進行解析 tree = etree.HTML(page_text) li_list = tree.xpath('//div[@id="listvideoList"]/ul/li') detail_urls = []#存儲二級頁面的url for li in li_list: detail_url = 'http://www.pearvideo.com/'+li.xpath('./div/a/@href')[0] title = li.xpath('.//div[@class="vervideo-title"]/text()')[0] detail_urls.append(detail_url) vedio_urls = []#存儲視頻的url for url in detail_urls: page_text = requests.get(url=url,headers=headers).text vedio_url = re.findall('srcUrl="(.*?)"',page_text,re.S)[0] vedio_urls.append(vedio_url) #使用線程池進行視頻數據下載 func_request = lambda link:requests.get(url=link,headers=headers).content video_data_list = pool.map(func_request,vedio_urls) #使用線程池進行視頻數據保存 func_saveData = lambda data:save(data) pool.map(func_saveData,video_data_list) def save(data): fileName = str(random.randint(1,10000))+'.mp4' with open(fileName,'wb') as fp: fp.write(data) print(fileName+'已存儲') pool.close() pool.join()
- 總結:對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,「線程池」或「鏈接池」或許能夠緩解部分壓力,可是不能解決全部問題。總之,多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,能夠用非阻塞接口來嘗試解決這個問題。session
終極處理方案多線程
- 上述不管哪一種解決方案其實沒有解決一個性能相關的問題:IO阻塞,不管是多進程仍是多線程,在遇到IO阻塞時都會被操做系統強行剝奪走CPU的執行權限,程序的執行效率所以就下降了下來。
- 解決這一問題的關鍵在於,咱們本身從應用程序級別檢測IO阻塞而後切換到咱們本身程序的其餘任務執行,這樣把咱們程序的IO降到最低,咱們的程序處於就緒態就會增多,以此來迷惑操做系統,操做系統便覺得咱們的程序是IO比較少的程序,從而會盡量多的分配CPU給咱們,這樣也就達到了提高程序執行效率的目的。
- 在python3.4以後新增了asyncio模塊,能夠幫咱們檢測IO(只能是網絡IO【HTTP鏈接就是網絡IO操做】),實現應用程序級別的切換(異步IO)。注意:asyncio只能發tcp級別的請求,不能發http協議。
- 異步IO:所謂「異步 IO」,就是你發起一個 網絡IO 操做,卻不用等它結束,你能夠繼續作其餘事情,當它結束時,你會獲得通知。
- 實現方式:單線程+協程實現異步IO操做。
- 異步協程用法
接下來讓咱們來了解下協程的實現,從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程仍是以生成器對象爲基礎的,在 Python 3.5 則增長了 async/await,使得協程的實現更加方便。首先咱們須要瞭解下面幾個概念:
-
event_loop:事件循環,至關於一個無限循環,咱們能夠把一些函數註冊到這個事件循環上,當知足條件發生的時候,就會調用對應的處理方法。
-
coroutine:中文翻譯叫協程,在 Python 中常指代爲協程對象類型,咱們能夠將協程對象註冊到時間循環中,它會被事件循環調用。咱們可使用 async 關鍵字來定義一個方法,這個方法在調用時不會當即被執行,而是返回一個協程對象。
-
task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。
-
future:表明未來執行或沒有執行的任務的結果,實際上和 task 沒有本質區別。
另外咱們還須要瞭解 async/await 關鍵字,它是從 Python 3.5 纔出現的,專門用於定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。
- 定義一個協程
import asyncio async def execute(x): print('Number:', x) coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') loop = asyncio.get_event_loop() loop.run_until_complete(coroutine) print('After calling loop')
運行結果:
Coroutine: <coroutine object execute at 0x1034cf830> After calling execute Number: 1 After calling loop
首先咱們引入了 asyncio 這個包,這樣咱們纔可使用 async 和 await,而後咱們使用 async 定義了一個 execute() 方法,方法接收一個數字參數,方法執行以後會打印這個數字。
隨後咱們直接調用了這個方法,然而這個方法並無執行,而是返回了一個 coroutine 協程對象。隨後咱們使用 get_event_loop() 方法建立了一個事件循環 loop,並調用了 loop 對象的 run_until_complete() 方法將協程註冊到事件循環 loop 中,而後啓動。最後咱們纔看到了 execute() 方法打印了輸出結果。
可見,async 定義的方法就會變成一個沒法直接執行的 coroutine 對象,必須將其註冊到事件循環中才能夠執行。
上文咱們還提到了 task,它是對 coroutine 對象的進一步封裝,它裏面相比 coroutine 對象多了運行狀態,好比 running、finished 等,咱們能夠用這些狀態來獲取協程對象的執行狀況。
在上面的例子中,當咱們將 coroutine 對象傳遞給 run_until_complete() 方法的時候,實際上它進行了一個操做就是將 coroutine 封裝成了 task 對象,咱們也能夠顯式地進行聲明,以下所示:
import asyncio async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') loop = asyncio.get_event_loop() task = loop.create_task(coroutine) print('Task:', task) loop.run_until_complete(task) print('Task:', task) print('After calling loop')
運行結果:
Coroutine: <coroutine object execute at 0x10e0f7830> After calling execute Task: <Task pending coro=<execute() running at demo.py:4>> Number: 1 Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1> After calling loop
這裏咱們定義了 loop 對象以後,接着調用了它的 create_task() 方法將 coroutine 對象轉化爲了 task 對象,隨後咱們打印輸出一下,發現它是 pending 狀態。接着咱們將 task 對象添加到事件循環中獲得執行,隨後咱們再打印輸出一下 task 對象,發現它的狀態就變成了 finished,同時還能夠看到其 result 變成了 1,也就是咱們定義的 execute() 方法的返回結果。
另外定義 task 對象還有一種方式,就是直接經過 asyncio 的 ensure_future() 方法,返回結果也是 task 對象,這樣的話咱們就能夠不借助於 loop 來定義,即便咱們尚未聲明 loop 也能夠提早定義好 task 對象,寫法以下:
import asyncio async def execute(x): print('Number:', x) return x coroutine = execute(1) print('Coroutine:', coroutine) print('After calling execute') task = asyncio.ensure_future(coroutine) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task) print('After calling loop')
- 綁定回調:也能夠爲某個 task 綁定一個回調方法,來看下面的例子:
import asyncio import requests async def request(): url = 'https://www.baidu.com' status = requests.get(url).status_code return status def callback(task): print('Status:', task.result()) coroutine = request() task = asyncio.ensure_future(coroutine) task.add_done_callback(callback) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task)
在這裏咱們定義了一個 request() 方法,請求了百度,返回狀態碼,可是這個方法裏面咱們沒有任何 print() 語句。隨後咱們定義了一個 callback() 方法,這個方法接收一個參數,是 task 對象,而後調用 print() 方法打印了 task 對象的結果。這樣咱們就定義好了一個 coroutine 對象和一個回調方法,咱們如今但願的效果是,當 coroutine 對象執行完畢以後,就去執行聲明的 callback() 方法。
那麼它們兩者怎樣關聯起來呢?很簡單,只須要調用 add_done_callback() 方法便可,咱們將 callback() 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢以後就能夠調用 callback() 方法了,同時 task 對象還會做爲參數傳遞給 callback() 方法,調用 task 對象的 result() 方法就能夠獲取返回結果了。
運行結果:
Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]> Status: <Response [200]> Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>
實際上不用回調方法,直接在 task 運行完畢以後也能夠直接調用 result() 方法獲取結果,運行結果是同樣的。以下所示:
import asyncio import requests async def request(): url = 'https://www.baidu.com' status = requests.get(url).status_code return status coroutine = request() task = asyncio.ensure_future(coroutine) print('Task:', task) loop = asyncio.get_event_loop() loop.run_until_complete(task) print('Task:', task) print('Task Result:', task.result())
- 多任務協程:上面的例子咱們只執行了一次請求,若是咱們想執行屢次請求應該怎麼辦呢?咱們能夠定義一個 task 列表,而後使用 asyncio 的 wait() 方法便可執行。
import asyncio import requests async def request(): url = 'https://www.baidu.com' status = requests.get(url).status_code return status tasks = [asyncio.ensure_future(request()) for _ in range(5)] print('Tasks:', tasks) loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) for task in tasks: print('Task Result:', task.result())
這裏咱們使用一個 for 循環建立了五個 task,組成了一個列表,而後把這個列表首先傳遞給了 asyncio 的 wait() 方法,而後再將其註冊到時間循環中,就能夠發起五個任務了。最後咱們再將任務的運行結果輸出出來,運行結果以下:
Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>] Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]> Task Result: <Response [200]>
- 協程實現:前面說了這麼一通,又是 async,又是 coroutine,又是 task,又是 callback,但彷佛並無看出協程的優點啊?反而寫法上更加奇怪和麻煩了,別急,上面的案例只是爲後面的使用做鋪墊,接下來咱們正式來看下協程在解決 IO 密集型任務上有怎樣的優點吧!
爲了表現出協程的優點,咱們須要先建立一個合適的實驗環境,最好的方法就是模擬一個須要等待必定時間才能夠獲取返回結果的網頁,上面的代碼中使用了百度,但百度的響應太快了,並且響應速度也會受本機網速影響,因此最好的方式是本身在本地模擬一個慢速服務器,這裏咱們選用 Flask。
而後編寫服務器代碼以下:
from flask import Flask import time app = Flask(__name__) @app.route('/') def index(): time.sleep(3) return 'Hello!' if __name__ == '__main__': app.run(threaded=True)#這代表 Flask 啓動了多線程模式,否則默認是隻有一個線程的。
接下來咱們再從新使用上面的方法請求一遍:
import asyncio import requests import time start = time.time() async def request(): url = 'http://127.0.0.1:5000' print('Waiting for', url) response = requests.get(url) print('Get response from', url, 'Result:', response.text) tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('Cost time:', end - start)
在這裏咱們仍是建立了五個 task,而後將 task 列表傳給 wait() 方法並註冊到時間循環中執行。
運行結果以下:
Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Cost time: 15.049368143081665
能夠發現和正常的請求並無什麼兩樣,依然仍是順次執行的,耗時 15 秒,平均一個請求耗時 3 秒,說好的異步處理呢?
其實,要實現異步處理,咱們得先要有掛起的操做,當一個任務須要等待 IO 結果的時候,能夠掛起當前任務,轉而去執行其餘任務,這樣咱們才能充分利用好資源,上面方法都是一本正經的串行走下來,連個掛起都沒有,怎麼可能實現異步?
要實現異步,接下來咱們再瞭解一下 await 的用法,使用 await 能夠將耗時等待的操做掛起,讓出控制權。當協程執行的時候遇到 await,時間循環就會將本協程掛起,轉而去執行別的協程,直到其餘的協程掛起或執行完畢。
因此,咱們可能會將代碼中的 request() 方法改爲以下的樣子:
async def request(): url = 'http://127.0.0.1:5000' print('Waiting for', url) response = await requests.get(url) print('Get response from', url, 'Result:', response.text)
僅僅是在 requests 前面加了一個 await,然而執行如下代碼,會獲得以下報錯:
Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Cost time: 15.048935890197754 Task exception was never retrieved future: <Task finished coro=<request() done, defined at demo.py:7> exception=TypeError("object Response can't be used in 'await' expression",)> Traceback (most recent call last): File "demo.py", line 10, in request status = await requests.get(url) TypeError: object Response can't be used in 'await' expression
此次它遇到 await 方法確實掛起了,也等待了,可是最後卻報了這麼個錯,這個錯誤的意思是 requests 返回的 Response 對象不能和 await 一塊兒使用,爲何呢?由於根據官方文檔說明,await 後面的對象必須是以下格式之一:
-
A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象。
-
A generator-based coroutine object returned from a function decorated with types.coroutine(),一個由 types.coroutine() 修飾的生成器,這個生成器能夠返回 coroutine 對象。
-
An object with an await__ method returning an iterator,一個包含 __await 方法的對象返回的一個迭代器。
reqeusts 返回的 Response 不符合上面任一條件,所以就會報上面的錯誤了。
那麼有的小夥伴就發現了,既然 await 後面能夠跟一個 coroutine 對象,那麼我用 async 把請求的方法改爲 coroutine 對象不就能夠了嗎?因此就改寫成以下的樣子:
import asyncio import requests import time start = time.time() async def get(url): return requests.get(url) async def request(): url = 'http://127.0.0.1:5000' print('Waiting for', url) response = await get(url) print('Get response from', url, 'Result:', response.text) tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('Cost time:', end - start)
這裏咱們將請求頁面的方法獨立出來,並用 async 修飾,這樣就獲得了一個 coroutine 對象,咱們運行一下看看:
Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Cost time: 15.134317874908447
仍是不行,它還不是異步執行,也就是說咱們僅僅將涉及 IO 操做的代碼封裝到 async 修飾的方法裏面是不可行的!咱們必需要使用支持異步操做的請求方式才能夠實現真正的異步,因此這裏就須要 aiohttp 派上用場了。
- 使用aiohttp
- 環境安裝:pip install aiohttp
- 下面咱們將 aiohttp 用上來,將代碼改爲以下樣子:
import asyncio import aiohttp import time start = time.time() async def get(url): session = aiohttp.ClientSession() response = await session.get(url) result = await response.text() session.close() return result async def request(): url = 'http://127.0.0.1:5000' print('Waiting for', url) result = await get(url) print('Get response from', url, 'Result:', result) tasks = [asyncio.ensure_future(request()) for _ in range(5)] loop = asyncio.get_event_loop() loop.run_until_complete(asyncio.wait(tasks)) end = time.time() print('Cost time:', end - start)
在這裏咱們將請求庫由 requests 改爲了 aiohttp,經過 aiohttp 的 ClientSession 類的 get() 方法進行請求,結果以下:
Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Cost time: 3.0199508666992188
成功了!咱們發現此次請求的耗時由 15 秒變成了 3 秒,耗時直接變成了原來的 1/5。
代碼裏面咱們使用了 await,後面跟了 get() 方法,在執行這五個協程的時候,若是遇到了 await,那麼就會將當前協程掛起,轉而去執行其餘的協程,直到其餘的協程也掛起或執行完畢,再進行下一個協程的執行。
開始運行時,時間循環會運行第一個 task,針對第一個 task 來講,當執行到第一個 await 跟着的 get() 方法時,它被掛起,但這個 get() 方法第一步的執行是非阻塞的,掛起以後立馬被喚醒,因此當即又進入執行,建立了 ClientSession 對象,接着遇到了第二個 await,調用了 session.get() 請求方法,而後就被掛起了,因爲請求須要耗時好久,因此一直沒有被喚醒,好第一個 task 被掛起了,那接下來該怎麼辦呢?事件循環會尋找當前未被掛起的協程繼續執行,因而就轉而執行第二個 task 了,也是同樣的流程操做,直到執行了第五個 task 的 session.get() 方法以後,所有的 task 都被掛起了。全部 task 都已經處於掛起狀態,那咋辦?只好等待了。3 秒以後,幾個請求幾乎同時都有了響應,而後幾個 task 也被喚醒接着執行,輸出請求結果,最後耗時,3 秒!
怎麼樣?這就是異步操做的便捷之處,當遇到阻塞式操做時,任務被掛起,程序接着去執行其餘的任務,而不是傻傻地等着,這樣能夠充分利用 CPU 時間,而沒必要把時間浪費在等待 IO 上。
有人就會說了,既然這樣的話,在上面的例子中,在發出網絡請求後,既然接下來的 3 秒都是在等待的,在 3 秒以內,CPU 能夠處理的 task 數量遠不止這些,那麼豈不是咱們放 10 個、20 個、50 個、100 個、1000 個 task 一塊兒執行,最後獲得全部結果的耗時不都是 3 秒左右嗎?由於這幾個任務被掛起後都是一塊兒等待的。
理論來講確實是這樣的,不過有個前提,那就是服務器在同一時刻接受無限次請求都能保證正常返回結果,也就是服務器無限抗壓,另外還要忽略 IO 傳輸時延,確實能夠作到無限 task 一塊兒執行且在預想時間內獲得結果。
咱們這裏將 task 數量設置成 100,再試一下:
tasks = [asyncio.ensure_future(request()) for _ in range(100)]
耗時結果以下:
Cost time: 3.106252670288086
最後運行時間也是在 3 秒左右,固然多出來的時間就是 IO 時延了。
可見,使用了異步協程以後,咱們幾乎能夠在相同的時間內實現成百上千倍次的網絡請求,把這個運用在爬蟲中,速度提高可謂是很是可觀了。
- 與多進程結合
在最新的 PyCon 2018 上,來自 Facebook 的 John Reese 介紹了 asyncio 和 multiprocessing 各自的特色,並開發了一個新的庫,叫作 aiomultiprocess,感興趣的能夠了解下:https://www.youtube.com/watch?v=0kXaLh8Fz3k。
這個庫的安裝方式是:pip install aiomultiprocess
須要 Python 3.6 及更高版本纔可以使用。
使用這個庫,咱們能夠將上面的例子改寫以下:
import asyncio import aiohttp import time from aiomultiprocess import Pool start = time.time() async def get(url): session = aiohttp.ClientSession() response = await session.get(url) result = await response.text() session.close() return result async def request(): url = 'http://127.0.0.1:5000' urls = [url for _ in range(100)] async with Pool() as pool: result = await pool.map(get, urls) return result coroutine = request() task = asyncio.ensure_future(coroutine) loop = asyncio.get_event_loop() loop.run_until_complete(task) end = time.time() print('Cost time:', end - start)
這樣就會同時使用多進程和異步協程進行請求,固然最後的結果其實和異步是差很少的:
Cost time: 3.1156570434570312
由於個人測試接口的緣由,最快的響應也是 3 秒,因此這部分多餘的時間基本都是 IO 傳輸時延。但在真實狀況下,咱們在作爬取的時候遇到的狀況變幻無窮,一方面咱們使用異步協程來防止阻塞,另外一方面咱們使用 multiprocessing 來利用多核成倍加速,節省時間其實仍是很是可觀的。