Python異步爬蟲

Python異步爬蟲進階

爬蟲是 IO 密集型任務,好比咱們使用 requests 庫來爬取某個站點的話,發出一個請求以後,程序必需要等待網站返回響應以後才能接着運行,而在等待響應的過程當中,整個爬蟲程序是一直在等待的,實際上沒有作任何的事情。html

所以,有必要提升程序的運行效率,異步就是其中有效的一種方法。python

今天咱們一塊兒來學習下異步爬蟲的相關內容。編程

1、基本概念

  • 阻塞

阻塞狀態指程序未獲得所需計算資源時被掛起的狀態。程序在等待某個操做完成期間,自身沒法繼續處理其餘的事情,則稱該程序在該操做上是阻塞的。常見的阻塞形式有:網絡 I/O 阻塞、磁盤 I/O 阻塞、用戶輸入阻塞等。阻塞是無處不在的,包括 CPU 切換上下文時,全部的進程都沒法真正處理事情,它們也會被阻塞。若是是多核 CPU 則正在執行上下文切換操做的核不可被利用。api

  • 非阻塞

程序在等待某操做過程當中,自身不被阻塞,能夠繼續處理其餘的事情,則稱該程序在該操做上是非阻塞的。非阻塞並非在任何程序級別、任何狀況下均可以存在的。僅當程序封裝的級別能夠囊括獨立的子程序單元時,它纔可能存在非阻塞狀態。非阻塞的存在是由於阻塞存在,正由於某個操做阻塞致使的耗時與效率低下,咱們纔要把它變成非阻塞的。服務器

  • 同步

不一樣程序單元爲了完成某個任務,在執行過程當中需靠某種通訊方式以協調一致,咱們稱這些程序單元是同步執行的。例如購物系統中更新商品庫存,須要用「行鎖」做爲通訊信號,讓不一樣的更新請求強制排隊順序執行,那更新庫存的操做是同步的。簡言之,同步意味着有序。markdown

  • 異步

爲完成某個任務,不一樣程序單元之間過程當中無需通訊協調,也能完成任務的方式,不相關的程序單元之間能夠是異步的。例如,爬蟲下載網頁。調度程序調用下載程序後,便可調度其餘任務,而無需與該下載任務保持通訊以協調行爲。不一樣網頁的下載、保存等操做都是無關的,也無需相互通知協調。這些異步操做的完成時刻並不肯定。簡言之,異步意味着無序。網絡

  • 多進程

多進程就是利用 CPU 的多核優點,在同一時間並行地執行多個任務,能夠大大提升執行效率。session

  • 協程

協程,英文叫做 Coroutine,又稱微線程、纖程,協程是一種用戶態的輕量級線程。協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以協程能保留上一次調用時的狀態,即全部局部狀態的一個特定組合,每次過程重入時,就至關於進入上一次調用的狀態。協程本質上是個單進程,協程相對於多進程來講,無需線程上下文切換的開銷,無需原子操做鎖定及同步的開銷,編程模型也很是簡單。咱們可使用協程來實現異步操做,好比在網絡爬蟲場景下,咱們發出一個請求以後,須要等待必定的時間才能獲得響應,但其實在這個等待過程當中,程序能夠幹許多其餘的事情,等到響應獲得以後才切換回來繼續處理,這樣能夠充分利用 CPU 和其餘資源,這就是協程的優點。多線程

2、協程用法

從 Python 3.4 開始,Python 中加入了協程的概念,但這個版本的協程仍是以生成器對象爲基礎的,在 Python 3.5 則增長了 async/await,使得協程的實現更加方便。併發

asyncio

Python 中使用協程最經常使用的庫莫過於 asyncio

  • event_loop:事件循環,至關於一個無限循環,咱們能夠把一些函數註冊到這個事件循環上,當知足條件發生的時候,就會調用對應的處理方法。
  • coroutine:中文翻譯叫協程,在 Python 中常指代爲協程對象類型,咱們能夠將協程對象註冊到時間循環中,它會被事件循環調用。咱們可使用 async 關鍵字來定義一個方法,這個方法在調用時不會當即被執行,而是返回一個協程對象。
  • task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。
  • future:表明未來執行或沒有執行的任務的結果,實際上和 task 沒有本質區別。

async/await 關鍵字,是從 Python 3.5 纔出現的,專門用於定義協程。其中,async 定義一個協程,await 用來掛起阻塞方法的執行。

定義協程

定義一個協程,感覺它和普通進程在實現上的不一樣之處,代碼以下:

import asyncio

async def execute(x):
    print('Number:', x)

coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine)
print('After calling loop')
複製代碼

運行結果以下:

Coroutine: <coroutine object execute at 0x0000027808F5BE48>
After calling execute
Number: 666
After calling loop

Process finished with exit code 0
複製代碼

首先導入 asyncio 這個包,這樣纔可使用 async 和 await,而後使用 async 定義了一個 execute 方法,方法接收一個數字參數,方法執行以後會打印這個數字。

隨後咱們直接調用了這個方法,然而這個方法並無執行,而是返回了一個 coroutine 協程對象。隨後咱們使用 get_event_loop 方法建立了一個事件循環 loop,並調用了 loop 對象的 run_until_complete 方法將協程註冊到事件循環 loop 中,而後啓動。最後咱們纔看到了 execute 方法打印了輸出結果。

可見,async 定義的方法就會變成一個沒法直接執行的 coroutine 對象,必須將其註冊到事件循環中才能夠執行。

前面還提到了 task,它是對 coroutine 對象的進一步封裝,它裏面相比 coroutine 對象多了運行狀態,好比 running、finished 等,咱們能夠用這些狀態來獲取協程對象的執行狀況。在上面的例子中,當咱們將 coroutine 對象傳遞給 run_until_complete 方法的時候,實際上它進行了一個操做就是將 coroutine 封裝成了 task 對象。task也能夠顯式地進行聲明,以下所示:

import asyncio

async def execute(x):
    print('Number:', x)
    return x
    
coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)
print('Task:', task)
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
複製代碼

運行結果以下

Coroutine: <coroutine object execute at 0x000001CB3F90BE48>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/pycharm2020/program/test_003.py:3>>
Number: 666
Task: <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666>
After calling loop

Process finished with exit code 0
複製代碼

這裏咱們定義了 loop 對象以後,接着調用了它的 create_task 方法將 coroutine 對象轉化爲了 task 對象,隨後咱們打印輸出一下,發現它是 pending 狀態。接着咱們將 task 對象添加到事件循環中獲得執行,隨後咱們再打印輸出一下 task 對象,發現它的狀態就變成了 finished,同時還能夠看到其 result 變成了 666,也就是咱們定義的 execute 方法的返回結果。

定義 task 對象還有一種經常使用方式,就是直接經過 asyncio 的 ensure_future 方法,返回結果也是 task 對象,這樣的話咱們就能夠不借助於 loop 來定義,即便尚未聲明 loop 也能夠提早定義好 task 對象,寫法以下:

import asyncio

async def execute(x):
    print('Number:', x)
    return x

coroutine = execute(666)
print('Coroutine:', coroutine)
print('After calling execute')
task = asyncio.ensure_future(coroutine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('After calling loop')
複製代碼

運行效果以下:

Coroutine: <coroutine object execute at 0x0000019794EBBE48>
After calling execute
Task: <Task pending coro=<execute() running at D:/python/pycharm2020/program/test_003.py:3>>
Number: 666
Task: <Task finished coro=<execute() done, defined at D:/python/pycharm2020/program/test_003.py:3> result=666>
After calling loop

Process finished with exit code 0
複製代碼

發現其運行效果都是同樣的

task對象的綁定回調操做

能夠爲某個 task 綁定一個回調方法,舉以下例子:

import asyncio
import requests

async def call_on():
    status = requests.get('https://www.baidu.com')
    return status

def call_back(task):
    print('Status:', task.result())

corountine = call_on()
task = asyncio.ensure_future(corountine)
task.add_done_callback(call_back)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
複製代碼

定義了一個call_on 方法,請求了百度,獲取其狀態碼,可是這個方法裏面咱們沒有任何 print 語句。隨後咱們定義了一個 call_back 方法,這個方法接收一個參數,是 task 對象,而後調用 print打印了 task 對象的結果。這樣咱們就定義好了一個 coroutine 對象和一個回調方法,

但願達到的效果是,當 coroutine 對象執行完畢以後,就去執行聲明的 callback 方法。實現這樣的效果只須要調用 add_done_callback 方法便可,咱們將 callback 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢以後就能夠調用 callback 方法了,同時 task 對象還會做爲參數傳遞給 callback 方法,調用 task 對象的 result 方法就能夠獲取返回結果了。

運行結果以下:

Task: <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4> cb=[call_back() at D:/python/pycharm2020/program/test_003.py:8]>
Status: <Response [200]>
Task: <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>
複製代碼

也能夠不用回調方法,直接在 task 運行完畢以後也能直接調用 result 方法獲取結果,以下所示:

import asyncio
import requests

async def call_on():
    status = requests.get('https://www.baidu.com')
    return status

def call_back(task):
    print('Status:', task.result())

corountine = call_on()
task = asyncio.ensure_future(corountine)
print('Task:', task)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:', task)
print('Task:', task.result())
複製代碼

運行效果同樣:

Task: <Task pending coro=<call_on() running at D:/python/pycharm2020/program/test_003.py:4>>
Task: <Task finished coro=<call_on() done, defined at D:/python/pycharm2020/program/test_003.py:4> result=<Response [200]>>
Task: <Response [200]>
複製代碼

3、異步爬蟲實現

要實現異步處理,得先要有掛起的操做,當一個任務須要等待 IO 結果的時候,能夠掛起當前任務,轉而去執行其餘任務,這樣才能充分利用好資源,要實現異步,須要瞭解一下 await 的用法,使用 await 能夠將耗時等待的操做掛起,讓出控制權。當協程執行的時候遇到 await,時間循環就會將本協程掛起,轉而去執行別的協程,直到其餘的協程掛起或執行完畢。

await 後面的對象必須是以下格式之一:

  • A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine,一個由 types.coroutine 修飾的生成器,這個生成器能夠返回 coroutine 對象。
  • An object with an await method returning an iterator,一個包含 await 方法的對象返回的一個迭代器。

aiohttp 的使用

aiohttp 是一個支持異步請求的庫,利用它和 asyncio 配合咱們能夠很是方便地實現異步請求操做。下面以訪問博客裏面的文章,並返回 reponse.text() 爲例,實現異步爬蟲。

from lxml import etree
import requests
import logging
import time
import aiohttp
import asyncio

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
start_time = time.time()

# 先獲取博客裏的文章連接
def get_urls():
    headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
    resp = requests.get(url, headers=headers)
    html = etree.HTML(resp.text)
    url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
    return url_list

async def request_page(url):
    logging.info('scraping %s', url)
    async with aiohttp.ClientSession() as session:
        response = await session.get(url)
        return await response.text()

def main():
    url_list = get_urls()
    tasks = [asyncio.ensure_future(request_page(url)) for url in url_list]
    loop = asyncio.get_event_loop()
    tasks = asyncio.gather(*tasks)
    loop.run_until_complete(tasks)

if __name__ == '__main__':
    main()
    end_time = time.time()
    logging.info('total time %s seconds', end_time - start_time)
複製代碼

實例中將請求庫由 requests 改爲了 aiohttp,經過 aiohttp 的 ClientSession 類的 get 方法進行請求,運行效果以下:

異步操做的便捷之處在於,當遇到阻塞式操做時,任務被掛起,程序接着去執行其餘的任務,而不是傻傻地等待,這樣能夠充分利用 CPU 時間,而沒必要把時間浪費在等待 IO 上。

上面的例子與單線程版和多線程版的比較以下:

多線程版

import requests
import logging
import time
from lxml import etree
from concurrent.futures import ThreadPoolExecutor

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
start_time = time.time()

# 先獲取博客裏的文章連接
def get_urls():
    resp = requests.get(url, headers=headers)
    html = etree.HTML(resp.text)
    url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
    return url_list

def request_page(url):
    logging.info('scraping %s', url)
    resp = requests.get(url, headers=headers)
    return resp.text

def main():
    url_list = get_urls()
    with ThreadPoolExecutor(max_workers=6) as executor:
        executor.map(request_page, url_list)

if __name__ == '__main__':
    main()
    end_time = time.time()
    logging.info('total time %s seconds', end_time - start_time)
複製代碼

運行結果以下:

單線程版:

import requests
import logging
import time
from lxml import etree

logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s: %(message)s')
url = 'https://blog.csdn.net/?spm=1001.2014.3001.4477'
headers = {"user-agent": "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1"}
start_time = time.time()

# 先獲取博客裏的文章連接
def get_urls():
    resp = requests.get(url, headers=headers)
    html = etree.HTML(resp.text)
    url_list = html.xpath("//div[@class='list_con']/div[@class='title']/h2/a/@href")
    return url_list

def request_page(url):
    logging.info('scraping %s', url)
    resp = requests.get(url, headers=headers)
    return resp.text

def main():
    url_list = get_urls()
    for url in url_list:
        request_page(url)

if __name__ == '__main__':
    main()
    end_time = time.time()
複製代碼

運行效果以下:

通過測試能夠發現,若是能將異步請求靈活運用在爬蟲中,在服務器能承受高併發的前提下增長併發數量,爬取效率提高是很是可觀的。

相關文章
相關標籤/搜索