Python學習之路37-使用asyncio包處理併發

《流暢的Python》筆記。javascript

本篇主要討論asyncio包,這個包使用事件循環驅動的協程實現併發。java

1. 前言

本篇主要介紹若是使用asyncio包將上一篇中線程版的「國旗下載」程序改成協程版本,經過異步非阻塞來實現併發。python

說實話,我在讀這部份內容的時候是懵逼的,書中阻塞非阻塞、同步異步的概念和我以前的理解有很大差別。以前一直覺得同步就意味着阻塞,異步就意味着非阻塞。但其實,阻塞非阻塞與同步異步並無本質的聯繫。編程

同步(Synchronizing)異步(Asynchronizing)是對指令而言的,也就是程序(理解成「函數」會更好一些)。以含有I/O操做的函數爲例(被調用方),若是這個函數要等到I/O操做結束,獲取了數據,才返回到調用方,這就叫同步(絕大部分函數都同步);反之,不等I/O執行完畢就返回到調用方,獲取的數據以其餘方式轉給調用方,這就叫異步。api

阻塞(Blocking)非阻塞(Non-Blocking)是對進程線程而言(爲了簡潔,只以「線程」爲例)。由於某些緣由(好比I/O),線程被掛起(被移出CPU),這就叫阻塞;反之,即便由於這些緣由,線程依然不被掛起(不被移出CPU),這就叫非阻塞。服務器

可見,這兩組概念一共能夠組成四種不一樣狀況:同步阻塞(常見),同步非阻塞(不常見),異步阻塞(不常見),異步非阻塞(常見)。微信

仍以上述I/O函數爲例:網絡

  • 若是這個函數的I/O請求已發出,只是單純地在等服務器發回數據,線程也只是單純地在等這個函數返回結果,CPU將會把這個線程掛起,這就叫作同步阻塞
  • 若是這個函數中調用的是一個執行復雜計算的子函數,此時,函數依然在等結果沒有返回,但線程並非沒有運行,不會被CPU掛起,這就叫作同步非阻塞(「CPU以輪詢的方式查看I/O是否結束」更能說明這種狀況,但這已經是很古老的方式了);
  • 若是這個函數在I/O請求沒獲得結果以前就返回了,但線程依然在等這個結果(在函數體以外等待使用這個數據),這就叫異步阻塞
  • 若是這個函數在沒獲得結果以前返回了,線程繼續執行其餘函數,這就叫作異步非阻塞。更具體一點,這種狀況對應的是使用回調實現異步非阻塞的狀況;而Python中還有一種狀況,也是本篇要講的,就是使用協程實現異步非阻塞:協程在獲得結果前依然不返回,但線程並無等待,而是去執行其餘協程。協程看起來就像同步同樣。

因爲以前並無遇到代碼世界中的同步非阻塞異步阻塞這兩種狀況,因此我也不肯定上述這兩種狀況的例子是否準確,歡迎大佬留言指導。但這四種狀況在現實生活中就很常見了,下面舉個在某處看到的例子:session

  • 老張把一普通水壺接上水放火上,眼睛直勾勾盯着等水開,不幹其餘事,這叫同步阻塞
  • 老張依然用一普通水壺燒水,但把水壺放火上後去客廳看電視,時不時回來看水燒好了沒有,這叫同步非阻塞
  • 老張用一能響的水壺燒水,沒盯着看,但也沒幹其餘事,只是在那兒發愣。水燒好後,壺可勁兒的響,老張一驚,取走水壺,這叫異步阻塞
  • 老張用一能響的水壺燒水,把壺放火上後去客廳看電視,等壺響了再去拿壺,這叫異步非阻塞

從這四個例子能夠看出,阻不阻塞是對老張而言的,在計算機中對應的就是進程線程;同步異步是對水壺而言的,在計算機中對應的就是函數。閉包

有了上述概念後,咱們接下來將使用asyncio包,將以前下載國旗的程序改成協程版本。

2. 異步

以前咱們使用線程實現了併發下載數據,它是同步阻塞的,由於一到I/O操做,線程就被阻塞,而後調入新的線程。如今,咱們將實現一個異步非阻塞版本。但從上述介紹知道,異步有兩種方式:回調和協程。本文並不會實現回調版本的「下載國旗」,提出回調只是爲了和協程進行比較。

2.1 回調

舉個例子說明回調。在調用函數A時除了傳入必要的參數外,還傳入一個參數:函數B。A中有一些費時的操做,好比I/O,A在沒獲得結果以前就返回,而將等待結果以及進行後續處理的事情交給函數B。這個過程就是回調,函數B就稱爲回調函數

這種編程方式不太符合人的思惟習慣,代碼也不易於理解,狀況一複雜,就極可能遇到**「回調地獄」**:多層嵌套回調。下面是一個JavaScript中使用回調的例子,它嵌套了3層:

// 代碼2.1
api_call1(request1, function (response1){  // 多麼痛的領悟
    var request2 = step1(response1);  // 第一步
    api_call2(request2, function (response2){
        var request3 = step2(response2);  // 第二步
        api_call3(request3, function (response3){
            step(response3);  // 第三步
        })
    })
})
複製代碼

api_call1api_call2api_call3都是庫函數,用於異步獲取結果。JavaScript中經常使用匿名函數做爲回調函數。下面咱們使用Python來實現上述代碼,上述三個匿名函數分別命名爲stage1stage2stage3

# 代碼2.2
def stage1(response1):
    request2 = step1(response1)
    api_call2(request2, stage2)

def stage2(response2):
    request3 = step2(response2)
    api_call3(request3, stage3)

def stage3(response3):
    step3(response3)

api_call1(request1, stage1)  # 代碼從這裏開始執行
複製代碼

可見,即便用Python寫,也不容易理解,這要是再多嵌套幾層,不逼瘋已經不錯了。並且,若是要在stage2中使用request2,還得使用閉包,這就又變成了嵌套定義函數的狀況。而且上述代碼尚未考慮拋出異常的狀況:在基於回調的API中,這個問題的解決辦法是爲每一個異步調用註冊兩個回調,一個用於處理操做成功時返回的結果,一個用於處理錯誤。能夠看出,一旦涉及錯誤處理,回調將更可怕。

2.2 協程

如今咱們用協程來改寫上述代碼:

# 代碼2.3
import asyncio

@asyncio.coroutine
def three_stages(request1):
    response1 = yield from api_call1(request1)
    request2 = step1(response1)
    response2 = yield from api_call2(request2)
    request3 = step2(response2)
    response3 = yield from api_call3(request3)
    step3(response3)

loop = asyncio.get_event_loop()
loop.create_task(three_stages(request1))
複製代碼

與前面兩個版本的回調相比,這個版本的代碼將3個步驟依次寫在同一函數中,易於理解,這樣看起來是否是也更像同步函數?若是要處理異常,只須要相應的yield from語句處添加try/except便可。

但也別急着把這稱爲「協程天堂」,由於:

  • 不能使用常規函數,必須使用協程,並且要習慣yield from語句;
  • 不能直接調用協程。即,不能像直接調用api_call1(request1)那樣直接調用three_stages(request1),必須使用事件循環(上面的loop)來驅動協程。

但無論怎樣,代碼讀起來和寫起來比回調簡單多了,尤爲是嵌套回調。

小技巧:讀協程的代碼時,爲了便於理解代碼的意思,能夠直接將yield from關鍵字忽略掉。

2.3 下載國旗批量版

下面咱們開始實現協程版本的「下載國旗」。

爲了將其改成協程版本,咱們不能使用以前的requests包,由於它會阻塞線程,改成使用aiohttp包。爲了儘可能保持代碼的簡潔,這裏不處理異常。下方是完整的代碼,代碼中咱們使用了新語法。如下代碼的基本思路是:在一個單線程程序中使用主循環一次激活隊列中的協程,各個協程向前執行幾步,而後把控制權讓給主循環,主循環再激活隊列中的下一個協程

# 代碼2.4
import aiohttp, os, sys, time, asyncio   # 代碼中請勿這麼寫,這裏只是爲了減小行數

POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"

def save_flag(img, filename):
    path = os.path.join(DEST_DIR, filename)
    with open(path, "wb") as fp: 
        fp.write(img)

def show(text):
    print(text, end=" ")
    sys.stdout.flush()

async def get_flag(cc):   # aiohttp只支持TCP和UDP請求
    url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
    async with aiohttp.ClientSession() as session: # <1> 開啓一個會話
        async with session.get(url) as resp:   # 發送請求
            image = await resp.read()   # 讀取請求
    return image

async def download_one(cc):
    image = await get_flag(cc)
    show(cc)
    save_flag(image, cc.lower() + ".gif")
    return cc

def download_many(cc_list):
    loop = asyncio.get_event_loop()   # 獲取事件循環
    to_do = [download_one(cc) for cc in sorted(cc_list)]  # 生成協程列表
    wait_coro = asyncio.wait(to_do)   # 將協程包裝成Task類,wait_coro並非運行結果!而是協程!
    res, _ = loop.run_until_complete(wait_coro) # 驅動每一個協程運行
    loop.close()   # 循環結束
    return len(res)

def main(download_many):
    t0 = time.time()
    count = download_many(POP20_CC)
    elapsed = time.time() - t0
    msg = "\n{} flags downloaded in {:.2f}s"
    print(msg.format(count, elapsed))

if __name__ == "__main__":
    main(download_many)

# 結果:
VN TR FR DE IN ID RU NG CN EG BR MX PH CD IR PK ET JP BD US 
20 flags downloaded in 1.27s
複製代碼

解釋:

①這裏使用了新的語法async/await。再Python3.5以前,若是想定義一個協程只能延用函數的定義方式def,而後在定義體裏面使用yieldyield from。若是想把一個函數更明確地聲明爲協程(或者說異步函數),還可使用asyncio中的coroutine裝飾器,但這麼作是否是挺麻煩的?從Python3.5起,能夠明確**使用async來定義協程(異步函數)**和異步生成器。使用async則能夠省略掉@asyncio.coroutine裝飾器;在用async修飾的協程的定義體中可使用yield關鍵字,但不能使用yield from,它必須被替換爲await,即便yield from後面只是一個普通的生成器;從由async修飾的協程或生成器中獲取數據時,必須使用await

②若是要使用@asyncio.coroutine裝飾器明確聲明協程,那麼在協程定義體內部只能使用yield from,不能使用yield,由於使用到yield的地方已經在asyncio中所有封裝成了函數或者方法。最新版的@asyncio.coroutine也能夠裝飾async修飾的協程,這種狀況下coroutine不作任何事,只是原封不動的返回被裝飾的協程。

③ <1>處的代碼之因此改用async with(異步上下文管理器),是由於新版asyncio並不支持書中的舊語法yield from aiohttp.request("GET", url)。關於async/awaitasync with/async for的相關內容將在後續文章中介紹,這裏只須要知道async對應於@asyncio.coroutineawait對應於yield from便可。

④咱們將get_flag改爲了協程版本,並使用aiohttp來實現異步請求;download_one函數也隨之變成了協程版本。

download_many只是一個普通函數,它要驅動協程運行。在這個函數中,咱們經過asyncio.get_event_loop()建立事件循環(實質就是一個線程)來驅動協程的運行。接着生成含20個download_one協程的協程列表to_do,隨後再調用asyncio.wait(to_do)將這個協程列表包裝成一個wait協程,取名爲wait_corowait協程會將to_do中全部的協程包裝成Task對象(Future的子類),再造成列表。最後,咱們經過loop.run_until_complete(wait_coro)驅動協程wait_coro運行。整個的驅動鏈是這樣的:loop.run_until_complete驅動協程wait_corowait_coro再在內部驅動20個協程。

wait協程最後會返回一個元組,第一個元素是完成的協程數,第二個是未完成的協程數loop.run_until_complete返回傳入的協程的返回值(實際代碼是Future.result())。有點繞,其實就是wait_coro最後返回一個元組給run_until_completerun_until_complete再把這個值返回給調用方。

⑦在上一篇中,咱們知道concurrent.futures中有一個Future,且經過它的result方法獲取最後運行的結果;在asyncio包中,不光有Future,還有它的子類Task,但獲取結果一般並非調用result方法,而是經過yield fromawait,即yield from future獲取結果。asyncio.Future類的result方法沒有參數,不能設置超時時間;若是調用resultfuture還未運行完畢,它並不會阻塞去等待結果,而是拋出asyncio.InvalidStateError異常。

2.4 下載國旗改進版

上一篇中,咱們除了使用Executor.map()批量處理線程以外,咱們還使用了concurrent.futures.as_completed()挨個迭代運行完的線程返回的結果。asyncio也實現了這個方法,咱們將使用這個函數改寫上方的代碼。

還有一個問題:咱們每每只關注了網絡I/O請求,經常忽略本地的I/O操做。線程版本中的save_flag函數也是會阻塞線程的,由於它操做了磁盤。但因爲圖片過小,速度太快,咱們感受並不明顯,若是換成更高像素的圖片,這種速度差別就會很明顯。咱們將會以某種方式使其避免阻塞線程。下面是改寫的代碼:

# 代碼2.5
import asyncio, os, sys, time, aiohttp

async def download_one(cc, semaphore):
    async with semaphore:
        image = await get_flag(cc)
    loop = asyncio.get_event_loop()
    loop.run_in_executor(None, save_flag, image, cc + ".gif")
    return cc

async def download_coro(cc_list, concur_req):
    semaphore = asyncio.Semaphore(concur_req)  # 它是一個信號量,用於控制併發量
    to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
    to_do_iter = asyncio.as_completed(to_do)
    for future in to_do_iter:
        res = await future
        print("Downloaded", res)

def download_many(cc_list, concur_req):  # 變化不大
    loop = asyncio.get_event_loop()
    coro = download_coro(cc_list, concur_req)
    loop.run_until_complete(coro)
    loop.close()

if __name__ == "__main__":
    t0 = time.time()
    download_many(POP20_CC, 1000)  # 第二個參數表示最大併發數
    print("\nDone! Time elapsed {:.2f}s.".format(time.time() - t0))

# 結果:
Downloaded BD
Downloaded CN
-- snip --
Downloaded US

Done! Time elapsed 1.21s.
複製代碼

上述代碼有3個地方值得關注:

  • asyncio.as_completed()元素爲協程的可迭代對象爲參數,但自身並非協程,只是一個生成器。它在內部將傳入的協程包裝成Task,而後返回一個生成器,產出協程的返回值。這個生成器按協程完成的順序生成值(先完成先產出),而不是按協程在迭代器中的順序生成值。
  • asyncio.Semaphore是個信號量類,內部維護這一個計數器,調用它的acquire方法(這個方法是個協程),計數器減一;對其調用release方法(這個方法不是協程),計數器加一;當計數器爲0時,會阻塞調用這個方法的協程。
  • 咱們將save_flag函數放到了其餘線程中,loop.run_in_executor()的第一個參數是Executor實例,若是爲None,則使用事件循環的默認ThreadPoolExecutor實例。餘下的參數是可調用對象,以及可調用對象的位置參數。

3. 總結

本章開篇介紹了阻塞非阻塞、同步異步的概念,而後介紹了異步的兩種實現方式:回調和協程。並經過代碼比較了回調和協程的實現方式。而後咱們使用asyncioaiohttp兩個庫,將以前線程版本的下載國旗程序改成了協程版本。惋惜我也是剛接觸協程不久,寫的內容不必定準確,尤爲是關於asyncio的內容,這個庫以前是一點都沒接觸過。後面我會專門研究Python中的協程,以及asyncio的實現,爭取把這部份內容完全搞懂。


迎你們關注個人微信公衆號"代碼港" & 我的網站 www.vpointer.net ~

相關文章
相關標籤/搜索