Python 的異步 IO:Asyncio 簡介

所謂「異步 IO」,就是你發起一個 IO 操做,卻不用等它結束,你能夠繼續作其餘事情,當它結束時,你會獲得通知。python

Asyncio 是併發(concurrency)的一種方式。對 Python 來講,併發還能夠經過線程(threading)和多進程(multiprocessing)來實現。多線程

Asyncio 並不能帶來真正的並行(parallelism)。固然,由於 GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的並行。併發

可交給 asyncio 執行的任務,稱爲協程(coroutine)。一個協程能夠放棄執行,把機會讓給其它協程(即 yield fromawait)。`異步

定義協程

協程的定義,須要使用 async def 語句。async

async def do_some_work(x): pass

do_some_work 即是一個協程。
準確來講,do_some_work 是一個協程函數,能夠經過 asyncio.iscoroutinefunction 來驗證:函數

print(asyncio.iscoroutinefunction(do_some_work))  # True

這個協程什麼都沒作,咱們讓它睡眠幾秒,以模擬實際的工做量 :oop

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

在解釋 await 以前,有必要說明一下協程能夠作哪些事。協程能夠:線程

* 等待一個 future 結束
* 等待另外一個協程(產生一個結果,或引起一個異常)
* 產生一個結果給正在等它的協程
* 引起一個異常給正在等它的協程

asyncio.sleep 也是一個協程,因此 await asyncio.sleep(x) 就是等待另外一個協程。可參見 asyncio.sleep 的文檔:code

sleep(delay, result=None, *, loop=None)
Coroutine that completes after a given time (in seconds).

運行協程

調用協程函數,協程並不會開始運行,只是返回一個協程對象,能夠經過 asyncio.iscoroutine 來驗證:協程

print(asyncio.iscoroutine(do_some_work(3)))  # True

此處還會引起一條警告:

async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited
  print(asyncio.iscoroutine(do_some_work(3)))

要讓這個協程對象運行的話,有兩種方式:

* 在另外一個已經運行的協程中用 `await` 等待它
* 經過 `ensure_future` 函數計劃它的執行

簡單來講,只有 loop 運行了,協程纔可能運行。
下面先拿到當前線程缺省的 loop ,而後把協程對象交給 loop.run_until_complete,協程對象隨後會在 loop 裏獲得運行。

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

run_until_complete 是一個阻塞(blocking)調用,直到協程運行結束,它才返回。這一點從函數名不難看出。
run_until_complete 的參數是一個 future,可是咱們這裏傳給它的倒是協程對象,之因此能這樣,是由於它在內部作了檢查,經過 ensure_future 函數把協程對象包裝(wrap)成了 future。因此,咱們能夠寫得更明顯一些:

loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))

完整代碼:

import asyncio

async def do_some_work(x):
    print("Waiting " + str(x))
    await asyncio.sleep(x)

loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))

運行結果:

Waiting 3
<三秒鐘後程序結束>

回調

假如協程是一個 IO 的讀操做,等它讀完數據後,咱們但願獲得通知,以便下一步數據的處理。這一需求能夠經過往 future 添加回調來實現。

def done_callback(futu):
    print('Done')

futu = asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)

loop.run_until_complete(futu)

多個協程

實際項目中,每每有多個協程,同時在一個 loop 裏運行。爲了把多個協程交給 loop,須要藉助 asyncio.gather 函數。

loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))

或者先把協程存在列表裏:

coros = [do_some_work(1), do_some_work(3)]
loop.run_until_complete(asyncio.gather(*coros))

運行結果:

Waiting 3
Waiting 1
<等待三秒鐘>
Done

這兩個協程是併發運行的,因此等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程爲準。

參考函數 gather 的文檔:

gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutines or futures.

發現也能夠傳 futures 給它:

futus = [asyncio.ensure_future(do_some_work(1)),
             asyncio.ensure_future(do_some_work(3))]

loop.run_until_complete(asyncio.gather(*futus))

gather 起聚合的做用,把多個 futures 包裝成單個 future,由於 loop.run_until_complete 只接受單個 future。

run_until_complete 和 run_forever

咱們一直經過 run_until_complete 來運行 loop ,等到 future 完成,run_until_complete 也就返回了。

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3)
loop.run_until_complete(coro)

輸出:

Waiting 3
<等待三秒鐘>
Done
<程序退出>

如今改用 run_forever

async def do_some_work(x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

loop = asyncio.get_event_loop()

coro = do_some_work(3)
asyncio.ensure_future(coro)

loop.run_forever()

輸出:

Waiting 3
<等待三秒鐘>
Done
<程序沒有退出>

三秒鐘事後,future 結束,可是程序並不會退出。run_forever 會一直運行,直到 stop 被調用,可是你不能像下面這樣調 stop

loop.run_forever()
loop.stop()

run_forever 不返回,stop 永遠也不會被調用。因此,只能在協程中調 stop

async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')
    loop.stop()

這樣並不是沒有問題,假若有多個協程在 loop 裏運行:

asyncio.ensure_future(do_some_work(loop, 1))
asyncio.ensure_future(do_some_work(loop, 3))

loop.run_forever()

第二個協程沒結束,loop 就中止了——被先結束的那個協程給停掉的。
要解決這個問題,能夠用 gather 把多個協程合併成一個 future,並添加回調,而後在回調裏再去中止 loop。

async def do_some_work(loop, x):
    print('Waiting ' + str(x))
    await asyncio.sleep(x)
    print('Done')

def done_callback(loop, futu):
    loop.stop()

loop = asyncio.get_event_loop()

futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3))
futus.add_done_callback(functools.partial(done_callback, loop))

loop.run_forever()

其實這基本上就是 run_until_complete 的實現了,run_until_complete 在內部也是調用 run_forever

Close Loop?

以上示例都沒有調用 loop.close,好像也沒有什麼問題。因此到底要不要調 loop.close 呢?
簡單來講,loop 只要不關閉,就還能夠再運行。:

loop.run_until_complete(do_some_work(loop, 1))
loop.run_until_complete(do_some_work(loop, 3))
loop.close()

可是若是關閉了,就不能再運行了:

loop.run_until_complete(do_some_work(loop, 1))
loop.close()
loop.run_until_complete(do_some_work(loop, 3))  # 此處異常

建議調用 loop.close,以完全清理 loop 對象防止誤用。

gather vs. wait

asyncio.gatherasyncio.wait 功能類似。

coros = [do_some_work(loop, 1), do_some_work(loop, 3)]
loop.run_until_complete(asyncio.wait(coros))

具體差異可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait

Timer

C++ Boost.Asio 提供了 IO 對象 timer,可是 Python 並無原生支持 timer,不過能夠用 asyncio.sleep 模擬。

async def timer(x, cb):
    futu = asyncio.ensure_future(asyncio.sleep(x))
    futu.add_done_callback(cb)
    await futu

t = timer(3, lambda futu: print('Done'))
loop.run_until_complete(t)
相關文章
相關標籤/搜索