所謂「異步 IO」,就是你發起一個 IO 操做,卻不用等它結束,你能夠繼續作其餘事情,當它結束時,你會獲得通知。python
Asyncio 是併發(concurrency)的一種方式。對 Python 來講,併發還能夠經過線程(threading)和多進程(multiprocessing)來實現。多線程
Asyncio 並不能帶來真正的並行(parallelism)。固然,由於 GIL(全局解釋器鎖)的存在,Python 的多線程也不能帶來真正的並行。併發
可交給 asyncio 執行的任務,稱爲協程(coroutine)。一個協程能夠放棄執行,把機會讓給其它協程(即 yield from
或 await
)。`異步
協程的定義,須要使用 async def
語句。async
async def do_some_work(x): pass
do_some_work
即是一個協程。
準確來講,do_some_work
是一個協程函數,能夠經過 asyncio.iscoroutinefunction
來驗證:函數
print(asyncio.iscoroutinefunction(do_some_work)) # True
這個協程什麼都沒作,咱們讓它睡眠幾秒,以模擬實際的工做量 :oop
async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x)
在解釋 await
以前,有必要說明一下協程能夠作哪些事。協程能夠:線程
* 等待一個 future 結束 * 等待另外一個協程(產生一個結果,或引起一個異常) * 產生一個結果給正在等它的協程 * 引起一個異常給正在等它的協程
asyncio.sleep
也是一個協程,因此 await asyncio.sleep(x)
就是等待另外一個協程。可參見 asyncio.sleep
的文檔:code
sleep(delay, result=None, *, loop=None) Coroutine that completes after a given time (in seconds).
調用協程函數,協程並不會開始運行,只是返回一個協程對象,能夠經過 asyncio.iscoroutine
來驗證:協程
print(asyncio.iscoroutine(do_some_work(3))) # True
此處還會引起一條警告:
async1.py:16: RuntimeWarning: coroutine 'do_some_work' was never awaited print(asyncio.iscoroutine(do_some_work(3)))
要讓這個協程對象運行的話,有兩種方式:
* 在另外一個已經運行的協程中用 `await` 等待它 * 經過 `ensure_future` 函數計劃它的執行
簡單來講,只有 loop 運行了,協程纔可能運行。
下面先拿到當前線程缺省的 loop ,而後把協程對象交給 loop.run_until_complete
,協程對象隨後會在 loop 裏獲得運行。
loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3))
run_until_complete
是一個阻塞(blocking)調用,直到協程運行結束,它才返回。這一點從函數名不難看出。run_until_complete
的參數是一個 future,可是咱們這裏傳給它的倒是協程對象,之因此能這樣,是由於它在內部作了檢查,經過 ensure_future
函數把協程對象包裝(wrap)成了 future。因此,咱們能夠寫得更明顯一些:
loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))
完整代碼:
import asyncio async def do_some_work(x): print("Waiting " + str(x)) await asyncio.sleep(x) loop = asyncio.get_event_loop() loop.run_until_complete(do_some_work(3))
運行結果:
Waiting 3 <三秒鐘後程序結束>
假如協程是一個 IO 的讀操做,等它讀完數據後,咱們但願獲得通知,以便下一步數據的處理。這一需求能夠經過往 future 添加回調來實現。
def done_callback(futu): print('Done') futu = asyncio.ensure_future(do_some_work(3)) futu.add_done_callback(done_callback) loop.run_until_complete(futu)
實際項目中,每每有多個協程,同時在一個 loop 裏運行。爲了把多個協程交給 loop,須要藉助 asyncio.gather
函數。
loop.run_until_complete(asyncio.gather(do_some_work(1), do_some_work(3)))
或者先把協程存在列表裏:
coros = [do_some_work(1), do_some_work(3)] loop.run_until_complete(asyncio.gather(*coros))
運行結果:
Waiting 3 Waiting 1 <等待三秒鐘> Done
這兩個協程是併發運行的,因此等待的時間不是 1 + 3 = 4 秒,而是以耗時較長的那個協程爲準。
參考函數 gather
的文檔:
gather(*coros_or_futures, loop=None, return_exceptions=False)
Return a future aggregating results from the given coroutines or futures.
發現也能夠傳 futures 給它:
futus = [asyncio.ensure_future(do_some_work(1)), asyncio.ensure_future(do_some_work(3))] loop.run_until_complete(asyncio.gather(*futus))
gather
起聚合的做用,把多個 futures 包裝成單個 future,由於 loop.run_until_complete
只接受單個 future。
咱們一直經過 run_until_complete
來運行 loop ,等到 future 完成,run_until_complete
也就返回了。
async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop = asyncio.get_event_loop() coro = do_some_work(3) loop.run_until_complete(coro)
輸出:
Waiting 3 <等待三秒鐘> Done <程序退出>
如今改用 run_forever
:
async def do_some_work(x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop = asyncio.get_event_loop() coro = do_some_work(3) asyncio.ensure_future(coro) loop.run_forever()
輸出:
Waiting 3 <等待三秒鐘> Done <程序沒有退出>
三秒鐘事後,future 結束,可是程序並不會退出。run_forever
會一直運行,直到 stop
被調用,可是你不能像下面這樣調 stop
:
loop.run_forever() loop.stop()
run_forever
不返回,stop
永遠也不會被調用。因此,只能在協程中調 stop
:
async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') loop.stop()
這樣並不是沒有問題,假若有多個協程在 loop 裏運行:
asyncio.ensure_future(do_some_work(loop, 1)) asyncio.ensure_future(do_some_work(loop, 3)) loop.run_forever()
第二個協程沒結束,loop 就中止了——被先結束的那個協程給停掉的。
要解決這個問題,能夠用 gather
把多個協程合併成一個 future,並添加回調,而後在回調裏再去中止 loop。
async def do_some_work(loop, x): print('Waiting ' + str(x)) await asyncio.sleep(x) print('Done') def done_callback(loop, futu): loop.stop() loop = asyncio.get_event_loop() futus = asyncio.gather(do_some_work(loop, 1), do_some_work(loop, 3)) futus.add_done_callback(functools.partial(done_callback, loop)) loop.run_forever()
其實這基本上就是 run_until_complete
的實現了,run_until_complete
在內部也是調用 run_forever
。
以上示例都沒有調用 loop.close
,好像也沒有什麼問題。因此到底要不要調 loop.close
呢?
簡單來講,loop 只要不關閉,就還能夠再運行。:
loop.run_until_complete(do_some_work(loop, 1)) loop.run_until_complete(do_some_work(loop, 3)) loop.close()
可是若是關閉了,就不能再運行了:
loop.run_until_complete(do_some_work(loop, 1)) loop.close() loop.run_until_complete(do_some_work(loop, 3)) # 此處異常
建議調用 loop.close
,以完全清理 loop 對象防止誤用。
asyncio.gather
和 asyncio.wait
功能類似。
coros = [do_some_work(loop, 1), do_some_work(loop, 3)] loop.run_until_complete(asyncio.wait(coros))
具體差異可請參見 StackOverflow 的討論:Asyncio.gather vs asyncio.wait。
C++ Boost.Asio 提供了 IO 對象 timer,可是 Python 並無原生支持 timer,不過能夠用 asyncio.sleep
模擬。
async def timer(x, cb): futu = asyncio.ensure_future(asyncio.sleep(x)) futu.add_done_callback(cb) await futu t = timer(3, lambda futu: print('Done')) loop.run_until_complete(t)