asyncio是一個使用async / await語法編寫併發代碼的庫。html
asyncio用做多個Python異步框架的基礎,這些框架提供高性能的網絡和Web服務器,數據庫鏈接庫,分佈式任務隊列等。python
asyncio一般很是適合IO綁定和高級 結構化網絡代碼。數據庫
asyncio提供了一組高級 API:express
此外,還有一些用於庫和框架開發人員的低級 API :api
networking
,運行subprocesses
,處理等;OS signals
使用async / await語法聲明的協同程序是編寫asyncio應用程序的首選方法。例如,如下代碼片斷(須要Python 3.7+)打印「hello」,等待1秒,而後打印「world」:安全
import asyncio async def main(): print('hello') await asyncio.sleep(1) print('world') asyncio.run(main()) # hello # world
上面代碼等同於下面(不推薦使用基於生成器的協同程序的支持,並計劃在Python 3.10中刪除。)服務器
import asyncio @asyncio.coroutine def main(): print('hello') yield from asyncio.sleep(1) print('world') asyncio.run(main())
asyncio實際等同於下面的工做(參數爲An asyncio.Future, a coroutine or an awaitable is required)網絡
import asyncio @asyncio.coroutine def main(): print('hello') yield from asyncio.sleep(1) print('world') # asyncio.run(main()) loop = asyncio.events.new_event_loop() asyncio.events.set_event_loop(loop) loop.run_until_complete(main()) # hello # world
1 This function runs the passed coroutine, taking care of 2 managing the asyncio event loop and finalizing asynchronous 3 generators. 4 5 This function cannot be called when another asyncio event loop is 6 running in the same thread. 7 8 If debug is True, the event loop will be run in debug mode. 9 10 This function always creates a new event loop and closes it at the end. 11 It should be used as a main entry point for asyncio programs, and should 12 ideally only be called once.
實際運行協程asyncio提供了三種主要機制:多線程
一、The asyncio.run()函數來運行頂層入口點「main()」函數(見上面的例子)併發
二、Awaiting on a coroutine 如下代碼片斷將在等待1秒後打印「hello」,而後在等待另外 2秒後打印「world」 :
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 11:54:48 # hello # world # finished at 11:54:51
三、asyncio.create_task()
與asyncio同時運行協同程序的功能Tasks;
讓咱們修改上面的例子並同時運行兩個say_after
協同程序 :
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(f"{what} at {time.strftime('%X')}") async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await task1 await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 14:27:22 # hello at 14:27:23 # world at 14:27:24 # finished at 14:27:24
稍微改變一下形式,能夠理解的更多
import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(f"{what} at {time.strftime('%X')}") async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # Wait until both tasks are completed (should take # around 2 seconds.) await asyncio.sleep(3) # await task1 # await task2 print(f"finished at {time.strftime('%X')}") asyncio.run(main()) # started at 14:29:41 # hello at 14:29:42 # world at 14:29:43 # finished at 14:29:44
咱們說若是一個對象能夠在表達式中使用,那麼它就是一個等待對象await
。許多asyncio API旨在接受等待。
有三種主要類型的等待對象:coroutines, Tasks, and Futures.
Coroutines
Python coroutines are awaitables and therefore can be awaited from other coroutines:
import asyncio async def nested(): return 42 async def main(): # Nothing happens if we just call "nested()". # A coroutine object is created but not awaited, # so it *won't run at all*. nested() # Let's do it differently now and await it: print(await nested()) # will print "42". asyncio.run(main()) # 42
重要
在本文檔中,術語「coroutine」可用於兩個密切相關的概念:
async def
Tasks
任務用於調度協同程序併發。
當一個協程被包裝到一個Task中時,會像asyncio.create_task()
同樣 conroutine自動安排很快運行:
import asyncio async def nested(): return 42 async def main(): # Schedule nested() to run soon concurrently # with "main()". task = asyncio.create_task(nested()) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task asyncio.run(main())
Futures
A Future
是一個特殊的低級別等待對象,它表示異步操做的最終結果。
當等待 Future對象時,它意味着協程將等到Future在其餘地方解析。
須要asyncio中的將來對象以容許基於回調的代碼與async / await一塊兒使用。
一般,不須要在應用程序級代碼中建立Future對象。
能夠等待有時經過庫和一些asyncio API公開的將來對象:
async def main(): await function_that_returns_a_future_object() # this is also valid: await asyncio.gather( function_that_returns_a_future_object(), some_python_coroutine() )
返回Future對象的低級函數的一個很好的例子是loop.run_in_executor()
。
一、運行asyncio程序
asyncio.run(coro,*,debug = False )
此函數運行傳遞的協同程序,負責管理asyncio事件循環並最終肯定異步生成器。
當另外一個asyncio事件循環在同一個線程中運行時,沒法調用此函數。
若是是debugTrue
,則事件循環將以調試模式運行。
此函數始終建立一個新的事件循環並在結束時將其關閉。它應該用做asyncio程序的主要入口點,理想狀況下應該只調用一次。
版本3.7中的新功能:重要:此功能已臨時添加到Python 3.7中的asyncio中。
二、建立任務
asyncio.create_task(coro)
將coro coroutine包裝成a Task
並安排執行。返回Task對象。
任務在返回的循環中執行,若是當前線程中沒有運行循環get_running_loop()
, RuntimeError
則引起該任務。
Python 3.7中添加了此功能。在Python 3.7以前,asyncio.ensure_future()
可使用低級函數:
async def coro(): ... # In Python 3.7+ task = asyncio.create_task(coro()) ... # This works in all Python versions but is less readable task = asyncio.ensure_future(coro())
三、sleeping
coroutine asyncio.
sleep
(delay, result=None, *, loop=None)
阻止 delay seconds.。
若是提供了result ,則在協程完成時將其返回給調用者。
leep()
始終掛起當前任務,容許其餘任務運行。
該loop 參數已被棄用,並定於去除在Python 3.10。
協程示例每秒顯示當前日期5秒:
import asyncio import datetime async def display_date(): loop = asyncio.get_running_loop() end_time = loop.time() + 5.0 while True: print(datetime.datetime.now()) if (loop.time() + 1.0) >= end_time: break await asyncio.sleep(1) asyncio.run(display_date())
四、同時運行任務
awaitable asyncio.
gather
(*aws, loop=None, return_exceptions=False)
同時在aws 序列中運行 awaitable objects
若是在aws中任何awaitable 是協程,它將自動安排爲任務
若是全部等待成功完成,則結果是返回值的彙總列表。結果值的順序對應於aws中的等待順序
若是return_exceptions是False
(默認),則第一個引起的異常會當即傳播到等待的任務gather()
。
若是return_exceptions是True
,異常的處理方式同樣成功的結果,並在結果列表彙總。
若是gather()
被取消,全部提交的awaitables(還沒有完成)也被取消。
若是aws序列中的Task or Future被取消,則將其視爲已引起CancelledError
- 在這種狀況下不會取消gather()
呼叫。這是爲了防止取消一個提交的Tasks/Futures 以致使其餘任務/期貨被取消。
import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), ) asyncio.run(main()) # Expected output: # # Task A: Compute factorial(2)... # Task B: Compute factorial(2)... # Task C: Compute factorial(2)... # Task A: factorial(2) = 2 # Task B: Compute factorial(3)... # Task C: Compute factorial(3)... # Task B: factorial(3) = 6 # Task C: Compute factorial(4)... # Task C: factorial(4) = 24
獲取返回結果,異常狀況
import asyncio async def factorial(name, number): print(name) if name == 'A': return name elif name == 'B': raise SyntaxError(name) await asyncio.sleep(number) async def main(): # Schedule three calls *concurrently*: result = await asyncio.gather( factorial("A", 2), factorial("B", 3), factorial("C", 4), return_exceptions=True ) print(result) asyncio.run(main()) # A # B # C # ['A', SyntaxError('B'), None]
版本3.7中已更改:若是取消了彙集自己,則不管return_exceptions如何,都會傳播取消。
五、Shielding From Cancellation
awaitable asyncio.
shield
(aw, *, loop=None)
Protect an awaitable object from being cancelled
.
If aw is a coroutine it is automatically scheduled as a Task.
The statement:
res = await shield(something())
等同於
res = await something()
除非取消包含它的協程,不然something()
不會取消運行的任務。從觀點來看something()
,取消沒有發生。雖然它的來電者仍然被取消,因此「等待」表達仍然提出了一個CancelledError
。
若是something()
經過其餘方式取消(即從內部取消)也會取消shield()
。
若是但願徹底忽略取消(不推薦),則該shield()
函數應與try / except子句結合使用,以下所示:
try: res = await shield(something()) except CancelledError: res = None
六、超時
coroutine asyncio.
wait_for
(aw, timeout, *, loop=None)
Wait for the aw awaitable to complete with a timeout.
If aw is a coroutine it is automatically scheduled as a Task.
timeout能夠是None
或等待的float或int秒數。若是超時是None
,將等到完成
若是發生超時,它將取消任務並加註 asyncio.TimeoutError
。
要避免該任務cancellation
,請將其包裝shield()
。
該函數將一直等到未來實際取消,所以總等待時間可能會超過超時。
若是等待被取消,則將來的aw也會被取消。
該循環參數已被棄用,並定於去除在Python 3.10。
async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!
改變在3.7版本:當AW被取消,因爲超時,wait_for
等待AW被取消。之前,它asyncio.TimeoutError
當即提出 。
七、超時原語
coroutine asyncio.
wait
(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)
同時運行aws中的等待對象 並阻塞,直到return_when指定的條件爲止。
若是在aws中任何等待的是協程,它將自動安排爲任務。wait()
直接傳遞協同程序對象 已被棄用,由於它會致使 混亂的行爲。
返回兩組任務/期貨:。(done, pending)
用法:
done, pending = await asyncio.wait(aws)
該循環參數已被棄用,並定於去除在Python 3.10。
timeout(浮點數或整數),若是指定,可用於控制返回前等待的最大秒數。
請注意,此功能不會引起asyncio.TimeoutError
。超時發生時未完成的期貨或任務僅在第二組中返回。
return_when表示此函數什麼時候返回。它必須是如下常量之一:
不變 | 描述 |
---|---|
FIRST_COMPLETED |
當任何將來完成或取消時,該函數將返回。 |
FIRST_EXCEPTION |
當任何將來經過引起異常完成時,函數將返回。若是沒有將來引起異常則等同於 ALL_COMPLETED 。 |
ALL_COMPLETED |
全部期貨結束或取消時,該功能將返回。 |
不像wait_for()
,wait()
當發生超時不會取消期貨。
注意 wait()
將協同程序自動調度爲任務,而後在 集合中返回隱式建立的任務對象。所以,如下代碼將沒法按預期方式工做:(done, pending)
async def foo(): return 42 coro = foo() done, pending = await asyncio.wait({coro}) if coro in done: # This branch will never be run!
如下是如何修復上述代碼段:
async def foo(): return 42 task = asyncio.create_task(foo()) done, pending = await asyncio.wait({task}) if task in done: # Everything will work as expected now.
wait()
不推薦直接傳遞協程對象。
八、 Scheduling From Other Threads
asyncio.
run_coroutine_threadsafe
(coro, loop)
將協程提交給給定的事件循環。線程安全的。
返回a concurrent.futures.Future
以等待另外一個OS線程的結果。
此函數旨在從與運行事件循環的OS線程不一樣的OS線程調用。例:
# Create a coroutine coro = asyncio.sleep(1, result=3) # Submit the coroutine to a given loop future = asyncio.run_coroutine_threadsafe(coro, loop) # Wait for the result with an optional timeout argument assert future.result(timeout) == 3
若是在協程中引起異常,則會通知返回的Future。它還能夠用於取消事件循環中的任務:
try: result = future.result(timeout) except asyncio.TimeoutError: print('The coroutine took too long, cancelling the task...') future.cancel() except Exception as exc: print(f'The coroutine raised an exception: {exc!r}') else: print(f'The coroutine returned: {result!r}')
請參閱 文檔的併發和多線程部分。
與其餘asyncio函數不一樣,此函數須要 顯式傳遞循環參數。
3.5.1版中的新功能。
九、自省
asyncio.
current_task
(loop = None )
返回當前正在運行的Task
實例,或者None
沒有正在運行的任務。
若是loop是None
get_running_loop()
用來獲取loop。
版本3.7中的新功能。
asyncio.
all_tasks
(loop = None )
返回Task
循環運行的一組還沒有完成的對象。
若是loop是None
,get_running_loop()
則用於獲取當前循環。
版本3.7中的新功能。
class asyncio.
Task
(coro,*,loop = None )
A Future-like
object that runs a Python coroutine. Not thread-safe.
任務用於在事件循環中運行協同程序。若是一個協程在Future上等待,則Task暫停執行協程並等待Future的完成。當Future 完成後,包裝協程的執行將恢復。
事件循環使用協做調度:事件循環一次運行一個任務。當一個Task等待完成Future時,事件循環運行其餘任務,回調或執行IO操做。
使用高級asyncio.create_task()
功能建立任務,或低級別loop.create_task()
或 ensure_future()
功能。不鼓勵手動實例化任務。
要取消正在運行的任務,請使用該cancel()
方法。調用它將致使Task將CancelledError
異常拋出到包裝的協同程序中。若是在取消期間協程正在等待Future對象,則Future對象將被取消。
cancelled()
可用於檢查任務是否被取消。True
若是包裝的協程沒有抑制CancelledError
異常而且實際上被取消,則該方法返回。
asyncio.Task
繼承自Future
其全部API,除了Future.set_result()
和 Future.set_exception()
。
任務支持該contextvars
模塊。建立任務時,它會複製當前上下文,而後在複製的上下文中運行其協程。
版本3.7中已更改:添加了對contextvars
模塊的支持。
cancel
()
請求取消任務。
這會安排CancelledError
在事件循環的下一個循環中將異常拋入包裝的協程。
協程則有機會經過抑制異常與清理,甚至拒絕請求try
... ... ... ... ... ... 塊。所以,不一樣於,不保證任務將被取消,儘管徹底抑制取消並不常見,而且積極地不鼓勵。exceptCancelledError
finally
Future.cancel()
Task.cancel()
如下示例說明了協同程序如何攔截取消請求:
async def cancel_me(): print('cancel_me(): before sleep') try: # Wait for 1 hour await asyncio.sleep(3600) except asyncio.CancelledError: print('cancel_me(): cancel sleep') raise finally: print('cancel_me(): after sleep') async def main(): # Create a "cancel_me" Task task = asyncio.create_task(cancel_me()) # Wait for 1 second await asyncio.sleep(1) task.cancel() try: await task except asyncio.CancelledError: print("main(): cancel_me is cancelled now") asyncio.run(main()) # Expected output: # # cancel_me(): before sleep # cancel_me(): cancel sleep # cancel_me(): after sleep # main(): cancel_me is cancelled now
cancelled
()
True
若是任務被
取消,則返回。
cancel()
而且包裝的協同程序將
CancelledError
異常傳播 到其中。
done
()
True
若是任務
完成則返回。
result
()
CancelledError
異常。
InvalidStateError
異常。
exception
()
None
。
CancelledError
異常。
InvalidStateError
異常。
add_done_callback
(回調,*,上下文=無)
Future.add_done_callback()
詳細信息,請參閱文檔。
remove_done_callback
(回調)
Future.remove_done_callback()
詳細信息,請參閱文檔。
get_stack
(*,limit = None )
print_stack
(*,limit = None,file = None )
get_stack()
。
get_stack()
直接。
sys.stderr
。
all_tasks
(loop = None )
None
,則該
get_event_loop()
函數用於獲取當前循環。
asyncio.all_tasks()
功能。
current_task
(loop = None )
None
。
None
,則該
get_event_loop()
函數用於獲取當前循環。
asyncio.current_task()
功能。
一、async for 運用
import asyncio class AsyncIter: def __init__(self, items): self.items = items async def __aiter__(self): for item in self.items: await asyncio.sleep(1) yield item async def print_iter(things): async for item in things: print(item) if __name__ == '__main__': loop = asyncio.get_event_loop() things = AsyncIter([1, 2, 3]) loop.run_until_complete(print_iter(things)) loop.close()
Python異步IO實現全過程1 https://mp.weixin.qq.com/s/fJaXmfHfYEk6XL2y8NmKmQ
Python異步IO實現全過程2 https://mp.weixin.qq.com/s/RjDh7AITty92jxC8jIOiPA
Python異步IO實現全過程3 https://mp.weixin.qq.com/s/vlH_2S2JIJpf3N0WRNcIJQ