《流暢的Python》筆記。javascript
本篇主要討論asyncio包,這個包使用事件循環驅動的協程實現併發。java
本篇主要介紹若是使用asyncio
包將上一篇中線程版的「國旗下載」程序改成協程版本,經過異步非阻塞來實現併發。python
說實話,我在讀這部份內容的時候是懵逼的,書中阻塞非阻塞、同步異步的概念和我以前的理解有很大差別。以前一直覺得同步就意味着阻塞,異步就意味着非阻塞。但其實,阻塞非阻塞與同步異步並無本質的聯繫。編程
同步(Synchronizing)異步(Asynchronizing)是對指令而言的,也就是程序(理解成「函數」會更好一些)。以含有I/O操做的函數爲例(被調用方),若是這個函數要等到I/O操做結束,獲取了數據,才返回到調用方,這就叫同步(絕大部分函數都同步);反之,不等I/O執行完畢就返回到調用方,獲取的數據以其餘方式轉給調用方,這就叫異步。api
阻塞(Blocking)非阻塞(Non-Blocking)是對進程線程而言(爲了簡潔,只以「線程」爲例)。由於某些緣由(好比I/O),線程被掛起(被移出CPU),這就叫阻塞;反之,即便由於這些緣由,線程依然不被掛起(不被移出CPU),這就叫非阻塞。服務器
可見,這兩組概念一共能夠組成四種不一樣狀況:同步阻塞(常見),同步非阻塞(不常見),異步阻塞(不常見),異步非阻塞(常見)。微信
仍以上述I/O函數爲例:網絡
因爲以前並無遇到代碼世界中的同步非阻塞和異步阻塞這兩種狀況,因此我也不肯定上述這兩種狀況的例子是否準確,歡迎大佬留言指導。但這四種狀況在現實生活中就很常見了,下面舉個在某處看到的例子:session
從這四個例子能夠看出,阻不阻塞是對老張而言的,在計算機中對應的就是進程線程;同步異步是對水壺而言的,在計算機中對應的就是函數。閉包
有了上述概念後,咱們接下來將使用asyncio
包,將以前下載國旗的程序改成協程版本。
以前咱們使用線程實現了併發下載數據,它是同步阻塞的,由於一到I/O操做,線程就被阻塞,而後調入新的線程。如今,咱們將實現一個異步非阻塞版本。但從上述介紹知道,異步有兩種方式:回調和協程。本文並不會實現回調版本的「下載國旗」,提出回調只是爲了和協程進行比較。
舉個例子說明回調。在調用函數A時除了傳入必要的參數外,還傳入一個參數:函數B。A中有一些費時的操做,好比I/O,A在沒獲得結果以前就返回,而將等待結果以及進行後續處理的事情交給函數B。這個過程就是回調,函數B就稱爲回調函數。
這種編程方式不太符合人的思惟習慣,代碼也不易於理解,狀況一複雜,就極可能遇到**「回調地獄」**:多層嵌套回調。下面是一個JavaScript中使用回調的例子,它嵌套了3層:
// 代碼2.1
api_call1(request1, function (response1){ // 多麼痛的領悟
var request2 = step1(response1); // 第一步
api_call2(request2, function (response2){
var request3 = step2(response2); // 第二步
api_call3(request3, function (response3){
step(response3); // 第三步
})
})
})
複製代碼
api_call1
、api_call2
和api_call3
都是庫函數,用於異步獲取結果。JavaScript中經常使用匿名函數做爲回調函數。下面咱們使用Python來實現上述代碼,上述三個匿名函數分別命名爲stage1
,stage2
和stage3
:
# 代碼2.2
def stage1(response1):
request2 = step1(response1)
api_call2(request2, stage2)
def stage2(response2):
request3 = step2(response2)
api_call3(request3, stage3)
def stage3(response3):
step3(response3)
api_call1(request1, stage1) # 代碼從這裏開始執行
複製代碼
可見,即便用Python寫,也不容易理解,這要是再多嵌套幾層,不逼瘋已經不錯了。並且,若是要在stage2
中使用request2
,還得使用閉包,這就又變成了嵌套定義函數的狀況。而且上述代碼尚未考慮拋出異常的狀況:在基於回調的API中,這個問題的解決辦法是爲每一個異步調用註冊兩個回調,一個用於處理操做成功時返回的結果,一個用於處理錯誤。能夠看出,一旦涉及錯誤處理,回調將更可怕。
如今咱們用協程來改寫上述代碼:
# 代碼2.3
import asyncio
@asyncio.coroutine
def three_stages(request1):
response1 = yield from api_call1(request1)
request2 = step1(response1)
response2 = yield from api_call2(request2)
request3 = step2(response2)
response3 = yield from api_call3(request3)
step3(response3)
loop = asyncio.get_event_loop()
loop.create_task(three_stages(request1))
複製代碼
與前面兩個版本的回調相比,這個版本的代碼將3個步驟依次寫在同一函數中,易於理解,這樣看起來是否是也更像同步函數?若是要處理異常,只須要相應的yield from
語句處添加try/except
便可。
但也別急着把這稱爲「協程天堂」,由於:
yield from
語句;api_call1(request1)
那樣直接調用three_stages(request1)
,必須使用事件循環(上面的loop
)來驅動協程。但無論怎樣,代碼讀起來和寫起來比回調簡單多了,尤爲是嵌套回調。
小技巧:讀協程的代碼時,爲了便於理解代碼的意思,能夠直接將yield from
關鍵字忽略掉。
下面咱們開始實現協程版本的「下載國旗」。
爲了將其改成協程版本,咱們不能使用以前的requests
包,由於它會阻塞線程,改成使用aiohttp
包。爲了儘可能保持代碼的簡潔,這裏不處理異常。下方是完整的代碼,代碼中咱們使用了新語法。如下代碼的基本思路是:在一個單線程程序中使用主循環一次激活隊列中的協程,各個協程向前執行幾步,而後把控制權讓給主循環,主循環再激活隊列中的下一個協程。
# 代碼2.4
import aiohttp, os, sys, time, asyncio # 代碼中請勿這麼寫,這裏只是爲了減小行數
POP20_CC = ("CN IN US ID BR PK NG BD RU JP MX PH VN ET EG DE IR TR CD FR").split()
BASE_URL = "http://flupy.org/data/flags"
DEST_DIR = "downloads/"
def save_flag(img, filename):
path = os.path.join(DEST_DIR, filename)
with open(path, "wb") as fp:
fp.write(img)
def show(text):
print(text, end=" ")
sys.stdout.flush()
async def get_flag(cc): # aiohttp只支持TCP和UDP請求
url = "{}/{cc}/{cc}.gif".format(BASE_URL, cc=cc.lower())
async with aiohttp.ClientSession() as session: # <1> 開啓一個會話
async with session.get(url) as resp: # 發送請求
image = await resp.read() # 讀取請求
return image
async def download_one(cc):
image = await get_flag(cc)
show(cc)
save_flag(image, cc.lower() + ".gif")
return cc
def download_many(cc_list):
loop = asyncio.get_event_loop() # 獲取事件循環
to_do = [download_one(cc) for cc in sorted(cc_list)] # 生成協程列表
wait_coro = asyncio.wait(to_do) # 將協程包裝成Task類,wait_coro並非運行結果!而是協程!
res, _ = loop.run_until_complete(wait_coro) # 驅動每一個協程運行
loop.close() # 循環結束
return len(res)
def main(download_many):
t0 = time.time()
count = download_many(POP20_CC)
elapsed = time.time() - t0
msg = "\n{} flags downloaded in {:.2f}s"
print(msg.format(count, elapsed))
if __name__ == "__main__":
main(download_many)
# 結果:
VN TR FR DE IN ID RU NG CN EG BR MX PH CD IR PK ET JP BD US
20 flags downloaded in 1.27s
複製代碼
解釋:
①這裏使用了新的語法async/await
。再Python3.5以前,若是想定義一個協程只能延用函數的定義方式def
,而後在定義體裏面使用yield
或yield from
。若是想把一個函數更明確地聲明爲協程(或者說異步函數),還可使用asyncio
中的coroutine
裝飾器,但這麼作是否是挺麻煩的?從Python3.5起,能夠明確**使用async
來定義協程(異步函數)**和異步生成器。使用async
則能夠省略掉@asyncio.coroutine
裝飾器;在用async
修飾的協程的定義體中可使用yield
關鍵字,但不能使用yield from
,它必須被替換爲await
,即便yield from
後面只是一個普通的生成器;從由async修飾
的協程或生成器中獲取數據時,必須使用await
。
②若是要使用@asyncio.coroutine
裝飾器明確聲明協程,那麼在協程定義體內部只能使用yield from
,不能使用yield
,由於使用到yield
的地方已經在asyncio
中所有封裝成了函數或者方法。最新版的@asyncio.coroutine
也能夠裝飾async
修飾的協程,這種狀況下coroutine
不作任何事,只是原封不動的返回被裝飾的協程。
③ <1>處的代碼之因此改用async with
(異步上下文管理器),是由於新版asyncio
並不支持書中的舊語法yield from aiohttp.request("GET", url)
。關於async/await
,async with/async for
的相關內容將在後續文章中介紹,這裏只須要知道async
對應於@asyncio.coroutine
,await
對應於yield from
便可。
④咱們將get_flag
改爲了協程版本,並使用aiohttp
來實現異步請求;download_one
函數也隨之變成了協程版本。
⑤download_many
只是一個普通函數,它要驅動協程運行。在這個函數中,咱們經過asyncio.get_event_loop()
建立事件循環(實質就是一個線程)來驅動協程的運行。接着生成含20個download_one
協程的協程列表to_do
,隨後再調用asyncio.wait(to_do)
將這個協程列表包裝成一個wait
協程,取名爲wait_coro
。wait
協程會將to_do
中全部的協程包裝成Task
對象(Future
的子類),再造成列表。最後,咱們經過loop.run_until_complete(wait_coro)
驅動協程wait_coro
運行。整個的驅動鏈是這樣的:loop.run_until_complete
驅動協程wait_coro
,wait_coro
再在內部驅動20個協程。
⑥wait
協程最後會返回一個元組,第一個元素是完成的協程數,第二個是未完成的協程數,loop.run_until_complete
返回傳入的協程的返回值(實際代碼是Future.result())。有點繞,其實就是wait_coro
最後返回一個元組給run_until_complete
,run_until_complete
再把這個值返回給調用方。
⑦在上一篇中,咱們知道concurrent.futures
中有一個Future
,且經過它的result
方法獲取最後運行的結果;在asyncio
包中,不光有Future
,還有它的子類Task
,但獲取結果一般並非調用result
方法,而是經過yield from
或await
,即yield from future
獲取結果。asyncio.Future
類的result
方法沒有參數,不能設置超時時間;若是調用result
時future
還未運行完畢,它並不會阻塞去等待結果,而是拋出asyncio.InvalidStateError
異常。
上一篇中,咱們除了使用Executor.map()
批量處理線程以外,咱們還使用了concurrent.futures.as_completed()
挨個迭代運行完的線程返回的結果。asyncio
也實現了這個方法,咱們將使用這個函數改寫上方的代碼。
還有一個問題:咱們每每只關注了網絡I/O請求,經常忽略本地的I/O操做。線程版本中的save_flag
函數也是會阻塞線程的,由於它操做了磁盤。但因爲圖片過小,速度太快,咱們感受並不明顯,若是換成更高像素的圖片,這種速度差別就會很明顯。咱們將會以某種方式使其避免阻塞線程。下面是改寫的代碼:
# 代碼2.5
import asyncio, os, sys, time, aiohttp
async def download_one(cc, semaphore):
async with semaphore:
image = await get_flag(cc)
loop = asyncio.get_event_loop()
loop.run_in_executor(None, save_flag, image, cc + ".gif")
return cc
async def download_coro(cc_list, concur_req):
semaphore = asyncio.Semaphore(concur_req) # 它是一個信號量,用於控制併發量
to_do = [download_one(cc, semaphore) for cc in sorted(cc_list)]
to_do_iter = asyncio.as_completed(to_do)
for future in to_do_iter:
res = await future
print("Downloaded", res)
def download_many(cc_list, concur_req): # 變化不大
loop = asyncio.get_event_loop()
coro = download_coro(cc_list, concur_req)
loop.run_until_complete(coro)
loop.close()
if __name__ == "__main__":
t0 = time.time()
download_many(POP20_CC, 1000) # 第二個參數表示最大併發數
print("\nDone! Time elapsed {:.2f}s.".format(time.time() - t0))
# 結果:
Downloaded BD
Downloaded CN
-- snip --
Downloaded US
Done! Time elapsed 1.21s.
複製代碼
上述代碼有3個地方值得關注:
asyncio.as_completed()
以元素爲協程的可迭代對象爲參數,但自身並非協程,只是一個生成器。它在內部將傳入的協程包裝成Task
,而後返回一個生成器,產出協程的返回值。這個生成器按協程完成的順序生成值(先完成先產出),而不是按協程在迭代器中的順序生成值。asyncio.Semaphore
是個信號量類,內部維護這一個計數器,調用它的acquire
方法(這個方法是個協程),計數器減一;對其調用release
方法(這個方法不是協程),計數器加一;當計數器爲0時,會阻塞調用這個方法的協程。save_flag
函數放到了其餘線程中,loop.run_in_executor()
的第一個參數是Executor
實例,若是爲None
,則使用事件循環的默認ThreadPoolExecutor
實例。餘下的參數是可調用對象,以及可調用對象的位置參數。本章開篇介紹了阻塞非阻塞、同步異步的概念,而後介紹了異步的兩種實現方式:回調和協程。並經過代碼比較了回調和協程的實現方式。而後咱們使用asyncio
和aiohttp
兩個庫,將以前線程版本的下載國旗程序改成了協程版本。惋惜我也是剛接觸協程不久,寫的內容不必定準確,尤爲是關於asyncio
的內容,這個庫以前是一點都沒接觸過。後面我會專門研究Python中的協程,以及asyncio
的實現,爭取把這部份內容完全搞懂。
迎你們關注個人微信公衆號"代碼港" & 我的網站 www.vpointer.net ~