爬蟲是 IO 密集型任務,好比咱們使用 requests 庫來爬取某個站點的話,發出一個請求以後,程序必需要等待網站返回響應以後才能接着運行,而在等待響應的過程當中,整個爬蟲程序是一直在等待的,實際上沒有作任何的事情。html
所以,有必要提升程序的運行效率,異步就是其中有效的一種方法。python
今天咱們一塊兒來學習下異步爬蟲的相關內容。編程
阻塞狀態指程序未獲得所需計算資源時被掛起的狀態。程序在等待某個操做完成期間,自身沒法繼續處理其餘的事情,則稱該程序在該操做上是阻塞的。常見的阻塞形式有:網絡 I/O 阻塞、磁盤 I/O 阻塞、用戶輸入阻塞等。阻塞是無處不在的,包括 CPU 切換上下文時,全部的進程都沒法真正處理事情,它們也會被阻塞。若是是多核 CPU 則正在執行上下文切換操做的核不可被利用。api
程序在等待某操做過程當中,自身不被阻塞,能夠繼續處理其餘的事情,則稱該程序在該操做上是非阻塞的。非阻塞並非在任何程序級別、任何狀況下均可以存在的。僅當程序封裝的級別能夠囊括獨立的子程序單元時,它纔可能存在非阻塞狀態。非阻塞的存在是由於阻塞存在,正由於某個操做阻塞致使的耗時與效率低下,咱們纔要把它變成非阻塞的。服務器
不一樣程序單元爲了完成某個任務,在執行過程當中需靠某種通訊方式以協調一致,咱們稱這些程序單元是同步執行的。例如購物系統中更新商品庫存,須要用「行鎖」做爲通訊信號,讓不一樣的更新請求強制排隊順序執行,那更新庫存的操做是同步的。簡言之,同步意味着有序。markdown
爲完成某個任務,不一樣程序單元之間過程當中無需通訊協調,也能完成任務的方式,不相關的程序單元之間能夠是異步的。例如,爬蟲下載網頁。調度程序調用下載程序後,便可調度其餘任務,而無需與該下載任務保持通訊以協調行爲。不一樣網頁的下載、保存等操做都是無關的,也無需相互通知協調。這些異步操做的完成時刻並不肯定。簡言之,異步意味着無序。網絡
多進程就是利用 CPU 的多核優點,在同一時間並行地執行多個任務,能夠大大提升執行效率。session
協程,英文叫做 Coroutine,又稱微線程、纖程,協程是一種用戶態的輕量級線程。協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以協程能保留上一次調用時的狀態,即全部局部狀態的一個特定組合,每次過程重入時,就至關於進入上一次調用的狀態。協程本質上是個單進程,協程相對於多進程來講,無需線程上下文切換的開銷,無需原子操做鎖定及同步的開銷,編程模型也很是簡單。咱們可使用協程來實現異步操做,好比在網絡爬蟲場景下,咱們發出一個請求以後,須要等待必定的時間才能獲得響應,但其實在這個等待過程當中,程序能夠幹許多其餘的事情,等到響應獲得以後才切換回來繼續處理,這樣能夠充分利用 CPU 和其餘資源,這就是協程的優點。多線程
從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程仍是以生成器對象爲基礎的,在 Python 3.5 則增長了 async/await,使得協程的實現更加方便。併發
asyncio
Python 中使用協程最經常使用的庫莫過於 asyncio
async/await 關鍵字,是從 Python 3.5 纔出現的,專門用於定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。
定義協程
定義一個協程,感覺它和普通進程在實現上的不一樣之處,代碼以下:
import asyncio
async def execute(x):
print('Number:', x)
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')
複製代碼
運行結果以下:
Coroutine: <coroutine object execute at 0x0000027808F5BE48>
After calling execute
Number: 666
After calling loop
Process finished with exit code 0
複製代碼
首先導入 asyncio 這個包,這樣纔可使用 async 和 await,而後使用 async 定義了一個 execute 方法,方法接收一個數字參數,方法執行以後會打印這個數字。
隨後咱們直接調用了這個方法,然而這個方法並無執行,而是返回了一個 coroutine 協程對象。隨後咱們使用 get_event_loop 方法建立了一個事件循環 loop,並調用了 loop 對象的 run_until_complete 方法將協程註冊到事件循環 loop 中,而後啓動。最後咱們纔看到了 execute 方法打印了輸出結果。
可見,async 定義的方法就會變成一個沒法直接執行的 coroutine 對象,必須將其註冊到事件循環中才能夠執行。
前面還提到了 task,它是對 coroutine 對象的進一步封裝,它裏面相比 coroutine 對象多了運行狀態,好比 running、finished 等,咱們能夠用這些狀態來獲取協程對象的執行狀況。在上面的例子中,當咱們將 coroutine 對象傳遞給 run_until_complete 方法的時候,實際上它進行了一個操做就是將 coroutine 封裝成了 task 對象。task也能夠顯式地進行聲明,以下所示:
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
複製代碼
運行結果以下
Coroutine: <coroutine object execute at 0x000001CB3F90BE48>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/pycharm2020/program/test_003.py:3>>
Number: 666
Task: <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666>
After calling loop
Process finished with exit code 0
複製代碼
這裏咱們定義了 loop 對象以後,接着調用了它的 create_task 方法將 coroutine 對象轉化爲了 task 對象,隨後咱們打印輸出一下,發現它是 pending 狀態。接着咱們將 task 對象添加到事件循環中獲得執行,隨後咱們再打印輸出一下 task 對象,發現它的狀態就變成了 finished,同時還能夠看到其 result 變成了 666,也就是咱們定義的 execute 方法的返回結果。
定義 task 對象還有一種經常使用方式,就是直接經過 asyncio 的 ensure_future 方法,返回結果也是 task 對象,這樣的話咱們就能夠不借助於 loop 來定義,即便尚未聲明 loop 也能夠提早定義好 task 對象,寫法以下:
import asyncio
async def execute(x):
print('Number:', x)
return x
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
複製代碼
運行效果以下:
Coroutine: <coroutine object execute at 0x0000019794EBBE48>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/pycharm2020/program/test_003.py:3>>
Number: 666
Task: <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666>
After calling loop
Process finished with exit code 0
複製代碼
發現其運行效果都是同樣的
task對象的綁定回調操做
能夠爲某個 task 綁定一個回調方法,舉以下例子:
import asyncio
import requests
async def call_on():
status = requests.get('https://www.baidu.com')
return status
def call_back(task):
print('Status:', task.result())
corountine = call_on()
task = asyncio.ensure_future(corountine)
task.add_done_callback(call_back)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
複製代碼
定義了一個call_on 方法,請求了百度,獲取其狀態碼,可是這個方法裏面咱們沒有任何 print 語句。隨後咱們定義了一個 call_back 方法,這個方法接收一個參數,是 task 對象,而後調用 print打印了 task 對象的結果。這樣咱們就定義好了一個 coroutine 對象和一個回調方法,
但願達到的效果是,當 coroutine 對象執行完畢以後,就去執行聲明的 callback 方法。實現這樣的效果只須要調用 add_done_callback 方法便可,咱們將 callback 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢以後就能夠調用 callback 方法了,同時 task 對象還會做爲參數傳遞給 callback 方法,調用 task 對象的 result 方法就能夠獲取返回結果了。
運行結果以下:
Task: <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4> cb=[call_back() at D:/python/pycharm2020/program/test_003.py:8]>
Status: <Response [200]>
Task: <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>
複製代碼
也能夠不用回調方法,直接在 task 運行完畢以後也能直接調用 result 方法獲取結果,以下所示:
import asyncio
import requests
async def call_on():
status = requests.get('https://www.baidu.com')
return status
def call_back(task):
print('Status:', task.result())
corountine = call_on()
task = asyncio.ensure_future(corountine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task:', task.result())
複製代碼
運行效果同樣:
Task: <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4>>
Task: <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>
Task: <Response [200]>
複製代碼
要實現異步處理,得先要有掛起的操做,當一個任務須要等待 IO 結果的時候,能夠掛起當前任務,轉而去執行其餘任務,這樣才能充分利用好資源,要實現異步,須要瞭解一下 await 的用法,使用 await 能夠將耗時等待的操做掛起,讓出控制權。當協程執行的時候遇到 await,時間循環就會將本協程掛起,轉而去執行別的協程,直到其餘的協程掛起或執行完畢。
await 後面的對象必須是以下格式之一:
aiohttp 的使用
aiohttp 是一個支持異步請求的庫,利用它和 asyncio 配合咱們能夠很是方便地實現異步請求操做。下面以訪問博客裏面的文章,並返回 reponse.text()
爲例,實現異步爬蟲。
from lxml import etree
import requests
import logging
import time
import aiohttp
import asyncio
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
start_time = time.time()
# 先獲取博客裏的文章連接
def get_urls():
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
resp = requests.get(url, headers=headers)
html = etree.HTML(resp.text)
url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
return url_list
async def request_page(url):
logging.info('scraping %s', url)
async with aiohttp.ClientSession() as session:
response = await session.get(url)
return await response.text()
def main():
url_list = get_urls()
tasks = [asyncio.ensure_future(request_page(url)) for url in url_list]
loop = asyncio.get_event_loop()
tasks = asyncio.gather(*tasks)
loop.run_until_complete(tasks)
if __name__ == '__main__':
main()
end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)
複製代碼
實例中將請求庫由 requests 改爲了 aiohttp,經過 aiohttp 的 ClientSession 類的 get 方法進行請求,運行效果以下:
異步操做的便捷之處在於,當遇到阻塞式操做時,任務被掛起,程序接着去執行其餘的任務,而不是傻傻地等待,這樣能夠充分利用 CPU 時間,而沒必要把時間浪費在等待 IO 上。
上面的例子與單線程版和多線程版的比較以下:
多線程版
import requests
import logging
import time
from lxml import etree
from concurrent.futures import ThreadPoolExecutor
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
start_time = time.time()
# 先獲取博客裏的文章連接
def get_urls():
resp = requests.get(url, headers=headers)
html = etree.HTML(resp.text)
url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
return url_list
def request_page(url):
logging.info('scraping %s', url)
resp = requests.get(url, headers=headers)
return resp.text
def main():
url_list = get_urls()
with ThreadPoolExecutor(max_workers=6) as executor:
executor.map(request_page, url_list)
if __name__ == '__main__':
main()
end_time = time.time()
logging.info('total time %s seconds', end_time - start_time)
複製代碼
運行結果以下:
單線程版:
import requests
import logging
import time
from lxml import etree
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
start_time = time.time()
# 先獲取博客裏的文章連接
def get_urls():
resp = requests.get(url, headers=headers)
html = etree.HTML(resp.text)
url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
return url_list
def request_page(url):
logging.info('scraping %s', url)
resp = requests.get(url, headers=headers)
return resp.text
def main():
url_list = get_urls()
for url in url_list:
request_page(url)
if __name__ == '__main__':
main()
end_time = time.time()
複製代碼
運行效果以下:
通過測試能夠發現,若是能將異步請求靈活運用在爬蟲中,在服務器能承受高併發的前提下增長併發數量,爬取效率提高是很是可觀的。