Python爬蟲進階 | 異步協程

1、背景

  以前爬蟲使用的是requests+多線程/多進程,後來隨着前幾天的深刻了解,才發現,對於爬蟲來講,真正的瓶頸並非CPU的處理速度,而是對於網頁抓取時候的往返時間,由於若是採用requests+多線程/多進程,他自己是阻塞式的編程,因此時間都花費在了等待網頁結果的返回和對爬取到的數據的寫入上面。而若是採用非阻塞編程,那麼就沒有這個困擾。這邊首先要理解一下阻塞和非阻塞的區別。php

(1)阻塞調用是指調用結果返回以前,當前線程會被掛起(線程進入非可執行狀態,在這個狀態下,CPU不會給線程分配時間片,即線程暫停運行)。函數只有在獲得結果以後纔會返回。python

(2)對於非阻塞則不會掛起,直接執行接下去的程序,返回結果後再回來處理返回值。程序員

 

  其實爬蟲的本質就是client發請求,批量獲取server的響應數據,若是咱們有多個url待爬取,只用一個線程且採用串行的方式執行,那隻能等待爬取一個結束後才能繼續下一個,效率會很是低。須要強調的是:對於單線程下串行N個任務,並不徹底等同於低效,若是這N個任務都是純計算的任務,那麼該線程對cpu的利用率仍然會很高,之因此單線程下串行多個爬蟲任務低效,是由於爬蟲任務是明顯的IO密集型(阻塞)程序。那麼該如何提升爬取性能呢?express

 

2、基本概念

2.1 阻塞編程

阻塞狀態指程序未獲得所需計算資源時被掛起的狀態。程序在等待某個操做完成期間,自身沒法繼續幹別的事情,則稱該程序在該操做上是阻塞的。flask

常見的阻塞形式有:網絡 I/O 阻塞、磁盤 I/O 阻塞、用戶輸入阻塞等。阻塞是無處不在的,包括 CPU 切換上下文時,全部的進程都沒法真正幹事情,它們也會被阻塞。若是是多核 CPU 則正在執行上下文切換操做的核不可被利用。服務器

 

2.2 非阻塞網絡

程序在等待某操做過程當中,自身不被阻塞,能夠繼續運行幹別的事情,則稱該程序在該操做上是非阻塞的。session

非阻塞並非在任何程序級別、任何狀況下均可以存在的。僅當程序封裝的級別能夠囊括獨立的子程序單元時,它纔可能存在非阻塞狀態。多線程

非阻塞的存在是由於阻塞存在,正由於某個操做阻塞致使的耗時與效率低下,咱們纔要把它變成非阻塞的。

 

2.3 同步

不一樣程序單元爲了完成某個任務,在執行過程當中需靠某種通訊方式以協調一致,稱這些程序單元是同步執行的。例如購物系統中更新商品庫存,須要用「行鎖」做爲通訊信號,讓不一樣的更新請求強制排隊順序執行,那更新庫存的操做是同步的。簡言之,同步意味着有序。

 

2.4 異步

爲完成某個任務,不一樣程序單元之間過程當中無需通訊協調,也能完成任務的方式,不相關的程序單元之間能夠是異步的例如,爬蟲下載網頁。調度程序調用下載程序後,便可調度其餘任務,而無需與該下載任務保持通訊以協調行爲。不一樣網頁的下載、保存等操做都是無關的,也無需相互通知協調。這些異步操做的完成時刻並不肯定。簡言之,異步意味着無序。

 

2.5 多進程

多進程就是利用 CPU 的多核優點,在同一時間並行地執行多個任務,能夠大大提升執行效率。

 

2.6 協程

協程,英文叫作 Coroutine,又稱微線程,纖程,協程是一種用戶態的輕量級線程。

協程擁有本身的寄存器上下文和棧。協程調度切換時,將寄存器上下文和棧保存到其餘地方,在切回來的時候,恢復先前保存的寄存器上下文和棧。所以協程能保留上一次調用時的狀態,即全部局部狀態的一個特定組合,每次過程重入時,就至關於進入上一次調用的狀態。

協程本質上是個單進程,協程相對於多進程來講,無需線程上下文切換的開銷,無需原子操做鎖定及同步的開銷,編程模型也很是簡單。

咱們可使用協程來實現異步操做,好比在網絡爬蟲場景下,咱們發出一個請求以後,須要等待必定的時間才能獲得響應,但其實在這個等待過程當中,程序能夠幹許多其餘的事情,等到響應獲得以後才切換回來繼續處理,這樣能夠充分利用 CPU 和其餘資源,這就是異步協程的優點。

 

3、分析處理 

  同步調用:即提交一個任務後就在原地等待任務結束,等到拿到任務的結果後再繼續下一行代碼,效率低

import requests

def get_page(url):
    print('下載 %s' %url)
    response=requests.get(url)
    if response.status_code == 200:
        return response.text

def parse_page(res):
    print('解析 %s' %(len(res)))


def main():
    urls=['https://www.baidu.com/','http://www.sina.com.cn/','https://www.python.org']
    for url in urls:
        res=get_page(url)                         #調用一個任務,就在原地等待任務結束拿到結果後才繼續日後執行
        parse_page(res)

if __name__ == "__main__":
    main()

a. 解決同步調用方案之多線程/多進程

好處:在服務器端使用多線程(或多進程)。多線程(或多進程)的目的是讓每一個鏈接都擁有獨立的線程(或進程),這樣任何一個鏈接的阻塞都不會影響其餘的鏈接。

弊端:開啓多進程或都線程的方式,咱們是沒法無限制地開啓多進程或多線程的:在遇到要同時響應成百上千路的鏈接請求,則不管多線程仍是多進程都會嚴重佔據系統資源,下降系統對外界響應效率,並且線程與進程自己也更容易進入假死狀態。

b. 解決同步調用方案之線程/進程池

好處:不少程序員可能會考慮使用線程池鏈接池線程池旨在減小建立和銷燬線程的頻率,其維持必定合理數量的線程,並讓空閒的線程從新承擔新的執行任務。能夠很好的下降系統開銷。

弊端:線程池鏈接池技術也只是在必定程度上緩解了頻繁調用IO接口帶來的資源佔用。並且,所謂始終有其上限,當請求大大超過上限時,構成的系統對外界的響應並不比沒有池的時候效果好多少。因此使用必須考慮其面臨的響應規模,並根據響應規模調整的大小。

 

案例:基於multiprocessing.dummy線程池爬取梨視頻的視頻信息

import requests
import re
from lxml import etree
from multiprocessing.dummy import Pool
import random

header = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/72.0.3626.119 Safari/537.36'
}

def get_page(url):
    response = requests.get(url=url,headers=header)
    if response.status_code == 200:
        return response.text
    return None

def parse_page(res):
    tree = etree.HTML(res)
    li_list = tree.xpath('//div[@id="listvideoList"]/ul/li')

    video_url_list = []
    for li in li_list:
        detail_url = 'https://www.pearvideo.com/' + li.xpath('./div/a/@href')[0]
        detail_page = requests.get(url=detail_url, headers=header).text
        video_url = re.findall('srcUrl="(.*?)",vdoUrl', detail_page, re.S)[0]
        video_url_list.append(video_url)

    return video_url_list

# 獲取視頻
def getVideoData(url):
    return requests.get(url=url, headers=header).content

# 持久化存儲
def saveVideo(data):
    fileName = str(random.randint(0, 5000)) + '.mp4'  # 因回調函數只能傳一個參數,因此沒辦法再傳名字了,只能本身取名
    with open(fileName, 'wb') as fp:
        fp.write(data)

def main():
    url = 'https://www.pearvideo.com/category_1'
    res = get_page(url)
    links = parse_page(res)


    pool = Pool(5)  # 實例化一個線程池對象

    #  pool.map(回調函數,可迭代對象)函數依次執行對象
    video_data_list = pool.map(getVideoData, links)
    pool.map(saveVideo, video_data_list)
    
 pool.close() pool.join() if __name__== "__main__":
    main()

總結:對應上例中的所面臨的可能同時出現的上千甚至上萬次的客戶端請求,線程池鏈接池或許能夠緩解部分壓力,可是不能解決全部問題。總之,多線程模型能夠方便高效的解決小規模的服務請求,但面對大規模的服務請求,多線程模型也會遇到瓶頸,能夠用非阻塞接口來嘗試解決這個問題。

 

終極處理方案

  上述不管哪一種方案都沒有解決一個性能相關的問題:IO阻塞,不管是多進程仍是多線程,在遇到IO阻塞時都會被操做系統強行剝奪走CPU的執行權限,程序的執行效率所以就下降了下來。

  解決這一問題的關鍵在於,咱們本身從應用程序級別檢測IO阻塞而後切換到咱們本身程序的其餘任務執行,這樣把咱們程序的IO降到最低,咱們的程序處於就緒態就會增多,以此來迷惑操做系統,操做系統便覺得咱們的程序是IO比較少的程序,從而會盡量多的分配CPU給咱們,這樣也就達到了提高程序執行效率的目的。

  實現方式:單線程+協程實現異步IO操做。

  異步IO:就是你發起一個 網絡IO 操做,卻不用等它結束,你能夠繼續作其餘事情,當它結束時,你會獲得通知。

 

4、 異步協程

python3.4以後新增了asyncio模塊,能夠幫咱們檢測IO(只能是網絡IOHTTP鏈接就是網絡IO操做】),實現應用程序級別的切換(異步IO)。注意:asyncio只能發tcp級別的請求,不能發http協議。

asyncio 是幹什麼的?

  • 異步網絡操做
  • 併發
  • 協程 

 

幾個概念:

event_loop:事件循環,至關於一個無限循環,咱們能夠把一些函數註冊到這個事件循環上,當知足條件發生的時候,就會調用對應的處理方法。

coroutine:中文翻譯叫協程,在 Python 中常指代爲協程對象類型,咱們能夠將協程對象註冊到時間循環中,它會被事件循環調用。咱們可使用 async 關鍵字來定義一個方法,這個方法在調用時不會當即被執行,而是返回一個協程對象。

task:任務,它是對協程對象的進一步封裝,包含了任務的各個狀態。

future:表明未來執行或沒有執行的任務的結果,實際上和 task 沒有本質區別。

async關鍵字async 定義一個協程;

await 關鍵字用來掛起阻塞方法的執行。

 

注意事項:在特殊函數內部不能夠出現不支持異步模塊相關的代碼。(例:time,request)

 

1.定義一個協程

import asyncio

async def execute(x):
    print('Number:', x)


coroutine = execute(1)
print('Coroutine:', coroutine)
print('After calling execute')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine) print('After calling loop'

運行結果:

Coroutine: <coroutine object execute at 0x1034cf830>

After calling execute

Number: 1

After calling loop

可見,async 定義的方法就會變成一個沒法直接執行的 coroutine 對象,必須將其註冊到事件循環中才能夠執行。

上文咱們還提到了 task,它是對 coroutine 對象的進一步封裝,它裏面相比 coroutine 對象多了運行狀態,好比 runningfinished 等,咱們能夠用這些狀態來獲取協程對象的執行狀況。

 

在上面的例子中,當咱們 coroutine 對象傳遞給 run_until_complete() 方法的時候,實際上它進行了一個操做就是將 coroutine 封裝成了 task 對象咱們也能夠顯式地進行聲明,以下所示:

import asyncio

async def execute(x):
    print('Number:',x)
    return x

coroutine = execute(1)
print('Coroutine:',coroutine)print('After calling execute')

loop = asyncio.get_event_loop()
task = loop.create_task(coroutine) print('Task:',task)

loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

運行結果:

Coroutine: <coroutine object execute at 0x10e0f7830>

After calling execute

Task: <Task pending coro=<execute() running at demo.py:4>>

Number: 1

Task: <Task finished coro=<execute() done, defined at demo.py:4> result=1>

After calling loop

這裏咱們定義了 loop 對象以後,接着調用了它的 create_task() 方法將 coroutine 對象轉化爲了 task 對象,隨後咱們打印輸出一下,發現它是 pending 狀態。接着咱們將 task 對象添加到事件循環中獲得執行,隨後咱們再打印輸出一下 task 對象,發現它的狀態就變成了 finished,同時還能夠看到其 result 變成了 1,也就是咱們定義的 execute() 方法的返回結果。

 

另外,定義 task 對象還有一種方式,就是直接經過 asyncio ensure_future() 方法,返回結果也是 task 對象,這樣的話咱們就能夠不借助於 loop 來定義,即便咱們尚未聲明 loop 也能夠提早定義好 task 對象,寫法以下:

import asyncio

async def execute(x):
    print('Number:',x)
    return x

coroutine = execute(1)
print('Coroutine:',coroutine)
print('After calling execute')

task=asyncio.ensure_future(coroutine) print('Task:',task)

loop=asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('After calling loop')

2.綁定回調:也能夠爲某個 task 綁定一個回調方法.

import asyncio
import requests

async def request():
    url='https://www.baidu.com'
    status = requests.get(url).status_code
    return status

def  callback(task):
    print('Status:',task.result())

coroutine = request()
task = asyncio.ensure_future(coroutine)
task.add_done_callback(callback) print('Task:',task)

loop = asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)

 運行結果:

Task: <Task pending coro=<request() running at demo.py:5> cb=[callback() at demo.py:11]>

Status: <Response [200]>

Task: <Task finished coro=<request() done, defined at demo.py:5> result=<Response [200]>>

在這裏咱們定義了一個 request() 方法,請求了百度,返回狀態碼,可是這個方法裏面咱們沒有任何 print() 語句。隨後咱們定義了一個 callback() 方法,這個方法接收一個參數,是 task 對象,而後調用 print() 方法打印了 task 對象的結果。這樣咱們就定義好了一個 coroutine 對象和一個回調方法,咱們如今但願的效果是,當 coroutine 對象執行完畢以後,就去執行聲明的 callback() 方法。

那麼它們兩者怎樣關聯起來呢?很簡單,只須要調用 add_done_callback() 方法便可,咱們將 callback() 方法傳遞給了封裝好的 task 對象,這樣當 task 執行完畢以後就能夠調用 callback() 方法了,同時 task 對象還會做爲參數傳遞給 callback() 方法,調用 task 對象的 result() 方法就能夠獲取返回結果了。

實際上不用回調方法,直接在 task 運行完畢以後也能夠直接調用 result() 方法獲取結果,運行結果是同樣的。以下所示:

import asyncio
import requests

async def request():
    url='https://www.baidu.com'
    status=requests.get(url).status_code
    return status

coroutine=request()
task=asyncio.ensure_future(coroutine)
print('Task:',task)

loop=asyncio.get_event_loop()
loop.run_until_complete(task)
print('Task:',task)
print('Task Result:',task.result())

 

3.多任務協程

  上面的例子咱們只執行了一次請求,若是咱們想執行屢次請求應該怎麼辦呢?咱們能夠定義一個 task 列表,而後使用 asyncio 的 wait() 方法便可執行。

import asyncio
import requests

async def request():
    url = 'https://www.baidu.com'
    status = requests.get(url).status_code
    return status

tasks = [asyncio.ensure_future(request()) for _ in range(5)] print('Tasks:',tasks)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
for task in tasks:
    print('Task Result:',task.result())

運行結果:

Tasks: [<Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>, <Task pending coro=<request() running at demo.py:5>>]

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

Task Result: <Response [200]>

這裏咱們使用一個 for 循環建立了五個 task,組成了一個列表,而後把這個列表首先傳遞給了 asyncio wait() 方法,而後再將其註冊到時間循環中,就能夠發起五個任務了。

 

4.協程實現

  上面的案例只是爲後面的使用做鋪墊,接下來咱們正式來看下協程在解決 IO 密集型任務上有怎樣的優點吧!

  爲了表現出協程的優點,咱們須要先建立一個合適的實驗環境,最好的方法就是模擬一個須要等待必定時間才能夠獲取返回結果的網頁,上面的代碼中使用了百度,但百度的響應太快了,並且響應速度也會受本機網速影響,因此最好的方式是本身在本地模擬一個慢速服務器,這裏咱們選用 Flask

服務器代碼:
from flask import Flask
import time
 
app = Flask(__name__)
 
@app.route('/')
def index():
    time.sleep(3)
return 'Hello!'

if __name__ == '__main__':
    app.run(threaded=True)                #這代表 Flask 啓動了多線程模式,否則默認是隻有一個線程的。

接下來咱們再從新使用上面的方法請求一遍:

import asyncio
import requests
import time
 
start = time.time()
 
async def request():
    url 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = requests.get(url)
    print('Get response from', url, 'Result:', response.text)
 
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
 
end = time.time()
print('Cost time:', end - start)
運行結果以下:
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.049368143081665

在這裏咱們仍是建立了五個 task,而後將 task 列表傳給 wait() 方法並註冊到時間循環中執行。

其實,要實現異步處理,咱們得先要有掛起的操做,當一個任務須要等待 IO 結果的時候,能夠掛起當前任務,轉而去執行其餘任務,這樣咱們才能充分利用好資源,上面方法都是一本正經的串行走下來,連個掛起都沒有,怎麼可能實現異步?

要實現異步,接下來咱們再瞭解一下 await 的用法,使用 await 能夠將耗時等待的操做掛起,讓出控制權。當協程執行的時候遇到 await,時間循環就會將本協程掛起,轉而去執行別的協程,直到其餘的協程掛起或執行完畢。

因此,咱們可能會將代碼中的 request() 方法改爲以下的樣子:

async def request():
    url 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await requests.get(url)
print('Get response from', url, 'Result:', response.text)

僅僅是在 requests 前面加了一個 await,然而執行如下代碼,會獲得以下報錯:

Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Cost time: 15.048935890197754 Task exception was never retrieved future: <Task finished coro=<request() done, defined at demo.py:7> exception=TypeError("object Response can't be used in 'await' expression",)> Traceback (most recent call last):   File "demo.py", line 10, in request     status = await requests.get(url) TypeError: object Response can't be used in 'await' expression

此次它遇到 await 方法確實掛起了,也等待了,可是最後卻報了這麼個錯,這個錯誤的意思是 requests 返回的 Response 對象不能和 await 一塊兒使用,爲何呢?由於根據官方文檔說明,await 後面的對象必須是以下格式之一:

  • A native coroutine object returned from a native coroutine function,一個原生 coroutine 對象。
  • A generator-based coroutine object returned from a function decorated with types.coroutine(),一個由 types.coroutine() 修飾的生成器,這個生成器能夠返回 coroutine 對象。
  • An object with an await__ method returning an iterator,一個包含 __await 方法的對象返回的一個迭代器。

reqeusts 返回的 Response 不符合上面任一條件,所以就會報上面的錯誤了。既然 await 後面能夠跟一個 coroutine 對象,那麼我將請求頁面的方法獨立出來,並用 async 修飾,這樣就獲得了一個 coroutine 對象

import asyncio
import requests
import time
 
start = time.time()
 
async def get(url):
    return requests.get(url)
 
async def request():
    url 'http://127.0.0.1:5000'
    print('Waiting for', url)
    response = await get(url)
    print('Get response from', url, 'Result:', response.text)
 
tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
 
end = time.time()
print('Cost time:', end - start)

這裏咱們,咱們運行一下看看:

Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Waiting for http://127.0.0.1:5000
Get response from http://127.0.0.1:5000 Result: Hello!
Cost time: 15.134317874908447

仍是不行,它還不是異步執行,也就是說咱們僅僅將涉及 IO 操做的代碼封裝到 async 修飾的方法裏面是不可行的!咱們必需要使用支持異步操做的請求方式才能夠實現真正的異步,因此這裏就須要 aiohttp 派上用場了。(因爲requests 模塊不支持異步,因此用aiohttp 模塊)

 

5.使用 aiohttp 

-環境安裝:pip install aiohttp

咱們將 aiohttp 用上來將請求庫由 requests 改爲了 aiohttp,經過 aiohttp ClientSession 類的 get() 方法進行請求

import asyncio
import aiohttp
import time

start= time.time()

async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result

async def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for',url)
    result = await get(url)
    print('Get response from',url,'Result:',result)

tasks = [asyncio.ensure_future(request()) for _ in range(5)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

end = time.time()
print('Cost time:', end - start)

結果以下:咱們發現此次請求的耗時由 15 秒變成了 3 秒,耗時直接變成了原來的 1/5

Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Waiting for http://127.0.0.1:5000 Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Get response from http://127.0.0.1:5000 Result: Hello! Cost time: 3.0199508666992188

代碼裏面咱們使用了 await,後面跟了 get() 方法,在執行這五個協程的時候,若是遇到了 await,那麼就會將當前協程掛起,轉而去執行其餘的協程,直到其餘的協程也掛起或執行完畢,再進行下一個協程的執行。充分利用 CPU 時間,而沒必要把時間浪費在等待 IO 上

開始運行時,時間循環會運行第一個 task,針對第一個 task 來講,當執行到第一個 await 跟着的 get() 方法時,它被掛起,但這個 get() 方法第一步的執行是非阻塞的,掛起以後立馬被喚醒,因此當即又進入執行,建立了 ClientSession 對象,接着遇到了第二個 await,調用了 session.get() 請求方法,而後就被掛起了,因爲請求須要耗時好久,因此一直沒有被喚醒,好第一個 task 被掛起了,那接下來該怎麼辦呢?事件循環會尋找當前未被掛起的協程繼續執行,因而就轉而執行第二個 task 了,也是同樣的流程操做,直到執行了第五個 task 的 session.get() 方法以後,所有的 task 都被掛起了。全部 task 都已經處於掛起狀態,那咋辦?只好等待了。3 秒以後,幾個請求幾乎同時都有了響應,而後幾個 task 也被喚醒接着執行,輸出請求結果,最後耗時,3 秒!

在上面的例子中,在發出網絡請求後,既然接下來的 3 秒都是在等待的,在 3 秒以內,CPU 能夠處理的 task 數量遠不止這些,那麼豈不是咱們放 不少 個 task 一塊兒執行,最後獲得全部結果的耗時不都是 3 秒左右嗎?由於這幾個任務被掛起後都是一塊兒等待的。理論來講確實是這樣的,不過有個前提,那就是服務器在同一時刻接受無限次請求都能保證正常返回結果,也就是服務器無限抗壓,另外還要忽略 IO 傳輸時延,確實能夠作到無限 task 一塊兒執行且在預想時間內獲得結果。

咱們這裏將 task 數量設置成 100,再試一下:

tasks = [asyncio.ensure_future(request()) for _ in range(100)]
耗時結果以下:
Cost time: 3.106252670288086

最後運行時間也是在 3 秒左右,固然多出來的時間就是 IO 時延了。可見,使用了異步協程以後,咱們幾乎能夠在相同的時間內實現成百上千倍次的網絡請求,把這個運用在爬蟲中,速度提高可謂是很是可觀了。

 

6. 與單進程、多進程對比

單進程

import requests
import time

start = time.time()


def request():
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = requests.get(url).text
    print('Get response from', url, 'Result:', result)


for _ in range(100):
    request()

end = time.time()
print('Cost time:', end - start)
最後耗時:
Cost time: 305.16639709472656

多進程
import requests
import time
import multiprocessing

start = time.time()


def request(_):
    url = 'http://127.0.0.1:5000'
    print('Waiting for', url)
    result = requests.get(url).text
    print('Get response from', url, 'Result:', result)


cpu_count = multiprocessing.cpu_count()
print('Cpu count:', cpu_count)
pool = multiprocessing.Pool(cpu_count)
pool.map(request, range(100))

end = time.time()
print('Cost time:', end - start)

這裏我使用了multiprocessing 裏面的 Pool 類,即進程池。個人電腦的 CPU 個數是 8 個,這裏的進程池的大小就是 8。

耗時:
Cost time: 48.17306900024414

 

7.與多進程結合

在最新的 PyCon 2018 上,來自 Facebook John Reese 介紹了 asyncio multiprocessing 各自的特色,並開發了一個新的庫,叫作 aiomultiprocess須要 Python 3.6 及更高版本纔可以使用。

安裝:pip install aiomultiprocess

使用這個庫,咱們能夠將上面的例子改寫以下:

import asyncio
import aiohttp
import time
from aiomultiprocess import Pool
 
start = time.time()
 
async def get(url):
    session = aiohttp.ClientSession()
    response = await session.get(url)
    result = await response.text()
    session.close()
    return result
 
async def request():
    url = 'http://127.0.0.1:5000'
    urls = [url for _ in range(100)]
    async with Pool() as pool:
        result = await pool.map(get, urls) return result
 
coroutine = request()
task = asyncio.ensure_future(coroutine)
loop = asyncio.get_event_loop()
loop.run_until_complete(task)
 
end = time.time()
print('Cost time:', end - start)

這樣就會同時使用多進程和異步協程進行請求,固然最後的結果其實和異步是差很少的:

Cost time: 3.1156570434570312

由於個人測試接口的緣由,最快的響應也是 3 秒,因此這部分多餘的時間基本都是 IO 傳輸時延。但在真實狀況下,咱們在作爬取的時候遇到的狀況變幻無窮,一方面咱們使用異步協程來防止阻塞,另外一方面咱們使用 multiprocessing 來利用多核成倍加速,節省時間其實仍是很是可觀的。

 

更多案例

import aiohttp
import asyncio
from lxml import etree

all_titles = []

headers = {
    'User-Agent':'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/70.0.3538.77 Safari/537.36'

}
async def request(url):
    async with aiohttp.ClientSession() as s:
        async with await s.get(url,headers=headers) as response:
            page_text = await response.text()
            return page_text


def parse(task):
    page_text = task.result()
    page_text = page_text.encode('gb2312').decode('gbk')
    tree = etree.HTML(page_text)
    tr_list = tree.xpath('//*[@id="morelist"]/div/table[2]//tr/td/table//tr')
    for tr in tr_list:
        title = tr.xpath('./td[2]/a[2]/text()')[0]
        print(title)
        all_titles.append(title)

urls = []
url = 'http://wz.sun0769.com/index.php/question/questionType?type=4&page=%d'
for page in range(100):
    u_page = page * 30
    new_url = format(url%u_page)
    urls.append(new_url)

tasks = []
for url in urls:
    c = request(url)
    task = asyncio.ensure_future(c)
    task.add_done_callback(parse)
    tasks.append(task)

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

 

 

參考連接: https://blog.csdn.net/zhusongziye/article/details/81637088

相關文章
相關標籤/搜索