asyncio之Coroutines,Tasks and Future

asyncio之Coroutines,Tasks and Future

Coroutines and Tasks屬於High-level APIs,也就是高級層的api。html

本節概述用於協程和任務的高級異步api。python

Coroutines

Coroutines翻譯過來意思是協程,
使用async/await語法聲明的協程是編寫asyncio應用程序的首選方法。nginx

import asyncio async def main(): print("hello") await asyncio.sleep(1) print("world") if __name__ == '__main__': # asyncio.run(main()) # 3.7的用法 # 阻塞直到hello world()協程結束時返回 loop = asyncio.get_event_loop() loop.run_until_complete(main())

第一個異步函數是經過建立loop循環去調用,其餘異步函數之間經過await進行調用。
像下面的一個例子django

import asyncio import time async def say_after(delay, what): await asyncio.sleep(delay) print(what) async def main(): print(f"started at {time.strftime('%X')}") await say_after(1, 'hello') await say_after(2, 'world') print(f"finished at {time.strftime('%X')}") if __name__ == '__main__': loop = asyncio.get_event_loop() # 阻塞直到hello world()協程結束時返回 loop.run_until_complete(main()) loop.close()

或者咱們能夠經過asyncio.create_task()將協程say_after封裝任務去調用就像下面這樣。api

async def main(): task1 = asyncio.create_task( say_after(1, 'hello')) task2 = asyncio.create_task( say_after(2, 'world')) print(f"started at {time.strftime('%X')}") # 等待兩個子任務完成 await task1 await task2 print(f"finished at {time.strftime('%X')}")

若是報錯async沒有create_task,能夠用
ensure_future代替
 

Awaitables

咱們說,若是一個對象能夠用在await表達式中,那麼它就是Awaitables的對象。
可等待對象主要有三種類型:coroutines, Tasks, and Futures.安全

Coroutines

 前面的代碼中演示了協程的運做方式,這裏主要強調兩點。微信

  • 協程函數:asyc def定義的函數;
  • 協程對象:經過調用協程函數返回的對象。併發

    Tasks

    任務對協程進一步封裝,其中包含任務的各類狀態。
    協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象。
import asyncio async def nested(): await asyncio.sleep(2) print("等待2s") async def main(): # 將協程包裝成任務含有狀態 # task = asyncio.create_task(nested()) task = asyncio.ensure_future(nested()) print(task) # "task" can now be used to cancel "nested()", or # can simply be awaited to wait until it is complete: await task print(task) print(task.done()) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): print(task) task.cancel() print(task) loop.run_forever() # restart loop finally: loop.close() 

能夠看到app

<Task pending coro=<nested() running at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9>> 等待2s <Task finished coro=<nested() done, defined at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9> result=None> True

建立task後,task在加入事件循環以前是pending狀態而後調用nested函數等待2s以後打印task爲finished狀態。asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)均可以建立一個task,python3.7增長了asyncio.create_task(coro)。其中task是Future的一個子類less

Future

future:表明未來執行或沒有執行的任務的結果。它和task上沒有本質的區別
一般不須要在應用程序級別代碼中建立Future對象。
future對象有幾個狀態:

  • Pending
  • Running
  • Done
  • Cancelled

經過上面的代碼能夠知道建立future的時候,task爲pending,事件循環調用執行的時候是running,調用完畢天然就是done因而調用task.done()打印了true。

若是在命令行中運行上述代碼,ctrl+c後會發現
輸出如下內容

<Task pending coro=<nested() running at 1-2-1.py:9>> ^C<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d342978>()]> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>> <Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>> <Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]> <Task cancelling coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>

由於咱們調用了task.cancel() 因此能夠看到此時的任務狀態爲取消狀態。

併發的執行任務

經過使用await+asyncio.gather能夠完成併發的操做。
asyncio.gather用法以下。
**asyncio.gather(*aws, loop=None, return_exceptions=False)
aws是一系列協程,協程都成功完成,就返回值一個結果列表。結果值的順序與aws中添加協程的順序相對應。
return_exceptions=False,其實就是若是有一個任務失敗了,就直接拋出異常。若是等於True就把錯誤信息做爲結果返回回來。
首先來一個正常狀況不出錯的例子:

import asyncio async def factorial(name, number): f = 1 for i in range(2, number + 1): print(f"Task {name}: Compute factorial({i})...") if number == 2: 1 / 0 await asyncio.sleep(1) f *= i print(f"Task {name}: factorial({number}) = {f}") async def main(): # Schedule three calls *concurrently*: res = await asyncio.gather( *[factorial("A", 2), factorial("B", 3), factorial("C", 4)] , return_exceptions=True) for item in res: print(item) if __name__ == '__main__': loop = asyncio.get_event_loop() try: loop.run_until_complete(main()) except KeyboardInterrupt as e: for task in asyncio.Task.all_tasks(): print(task) task.cancel() print(task) loop.run_forever() # restart loop finally: loop.close()

輸入如下內容:

Task A: Compute factorial(2)... Task B: Compute factorial(2)... Task C: Compute factorial(2)... Task B: Compute factorial(3)... Task C: Compute factorial(3)... Task B: factorial(3) = 6 Task C: Compute factorial(4)... Task C: factorial(4) = 24 division by zero None None

能夠發現async.gather最後會返回一系列的結果,若是出現了錯誤就把錯誤信息做爲返回結果,這裏我當數字爲2時人爲加了異常操做1/0,因而返回告終果division by zero,對於其餘的任務由於沒有返回值因此是None。這裏return_exceptions=True來保證了若是其中一個任務出現異常,其餘任務不會受其影響會執行到結束。

asyncio.wait

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

asyncio.wait和async.gather用法差很少只是async.wait接收的是個列表。
第三個參數和async.gather有點區別.

參數名 含義
FIRST_COMPLETED 任何一個future完成或取消時返回
FIRST_EXCEPTION 任何一個future出現錯誤將返回,若是出現異常等價於ALL_COMPLETED
ALL_COMPLETED 當全部任務完成或者被取消時返回結果,默認值。

Timeouts

經過使用asyncio.wait_for來完成一個超時函數回調操做,若是函數規定時間內未完成則報錯。
**asyncio.wait_for(aw, timeout, *, loop=None)**
aw表明一個協程,timeout單位秒。

async def eternity(): # Sleep for one hour await asyncio.sleep(3600) print('yay!') async def main(): # Wait for at most 1 second try: await asyncio.wait_for(eternity(), timeout=1.0) except asyncio.TimeoutError: print('timeout!') asyncio.run(main()) # Expected output: # # timeout!

1秒內eternity沒有完成就報錯了。
python3.7中發生更改:當aw因爲超時而被取消時,再也不顯示異常而是等待aw被取消。
說到timeout的,若是僅僅是對一個代碼塊作timeout操做而不是等待某個協程此時推薦第三方模塊async_timeout

async_timeout

安裝

pip installa async_timeout

使用方法很簡單以下

async with async_timeout.timeout(1.5) as cm: await inner() print(cm.expired)

若是1.5s能夠運行完打印true,不然打印false,表示超時。

asyncio.as_completed

**asyncio.as_completed(aws, *, loop=None, timeout=None)**
使用as_completed會返回一個能夠迭代的future對象,一樣能夠獲取協程的運行結果,使用方法以下:

async def main(): coroutine1 = do_some_work(1) coroutine2 = do_some_work(2) coroutine3 = do_some_work(4) tasks = [ asyncio.ensure_future(coroutine1), asyncio.ensure_future(coroutine2), asyncio.ensure_future(coroutine3) ] for task in asyncio.as_completed(tasks): result = await task print('Task ret: {}'.format(result)) start = now() loop = asyncio.get_event_loop() done = loop.run_until_complete(main()) print('TIME: ', now() - start)

協程嵌套

使用async能夠定義協程,協程用於耗時的io操做,咱們也能夠封裝更多的io操做過程,這樣就實現了嵌套的協程,即一個協程中await了另一個協程,如此鏈接起來
官網實例:

圖解:

 一、run_until_complete運行,會註冊task(協程:print_sum)並開啓事件循環 →

 二、print_sum協程中嵌套了子協程,此時print_sum協程暫停(相似委託生成器),轉到子協程(協程:compute)中運行代碼,期間子協程需sleep1秒鐘,直接將結果反饋到event loop中,即將控制權轉回調用方,而中間的print_sum暫停不操做 →

 三、1秒後,調用方將控制權給到子協程(調用方與子協程直接通訊),子協程執行接下來的代碼,直到再遇到wait(此實例沒有)→

 四、 最後執行到return語句,子協程向上級協程(print_sum拋出異常:StopIteration),同時將return返回的值返回給上級協程(print_sum中的result接收值),print_sum繼續執行暫時時後續的代碼,直到遇到return語句 →

 五、向 event loop 拋出StopIteration異常,此時協程任務都已經執行完畢,事件循環執行完成(event loop :the loop is stopped),close事件循環。

調度線程

asyncio.run_coroutine_threadsafe(coro, loop)
等待其餘線程返回一個concurrent.futures.Future對象,這是一個線程安全的方法。
這個函數應該從不一樣的OS線程調用,而不是從事件循環所在的線程調用。

def start_loop(loop): asyncio.set_event_loop(loop) loop.run_forever() async def do_some_work(x): print('Waiting {}'.format(x)) await asyncio.sleep(x) print('Done after {}s'.format(x)) def more_work(x): print('More work {}'.format(x)) time.sleep(x) print('Finished more work {}'.format(x)) start = now() new_loop = asyncio.new_event_loop() t = Thread(target=start_loop, args=(new_loop,)) t.start() print('TIME: {}'.format(time.time() - start)) asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop) asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主線程中建立一個new_loop,而後在另外的子線程中開啓一個無限事件循環。主線程經過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件循環的併發操做,同時主線程又不會被block。一共執行的時間大概在6s左右。
run_in_executor

import time import asyncio async def main(): print(f'{time.ctime()} Hello') await asyncio.sleep(1.0) print(f'{time.ctime()} Goodbye') loop.stop() def blocking(): # 1 time.sleep(0.5) # 2 print(f'{time.ctime()} Hello from a thread!') loop = asyncio.get_event_loop() loop.create_task(main()) loop.run_in_executor(None, blocking) # 3 loop.run_forever() pending = asyncio.Task.all_tasks(loop=loop) # 4 group = asyncio.gather(*pending) loop.run_until_complete(group) loop.close() 

輸出

Fri Jan 4 15:32:03 2019 Hello Fri Jan 4 15:32:04 2019 Hello from a thread! Fri Jan 4 15:32:04 2019 Goodbye

下面對上面的函數的序號進行講解:

1 這個函數調用了常規的sleep(),這會阻塞主線程並阻止loop運行,咱們不能使這個函數變成協程,更糟糕的是不能在主線程運行loop時調用它,解決辦法是用一個executor來運行它;
2 注意一點,這個sleep運行時間比協程中的sleep運行時間要短,後文再討論若是長的話會發生什麼;
3 該方法幫助咱們在事件loop裏用額外的線程或進程執行函數,這個方法的返回值是一個Future對象,意味着能夠用await來切換它;
4 掛起的task中不包含前面的阻塞函數,而且這個方法只返回task對象,絕對不會返回Future對象。

綁定回調

綁定回調,在task執行完畢的時候能夠獲取執行的結果,回調的最後一個參數是future對象,經過該對象能夠獲取協程返回值。若是回調須要多個參數,能夠經過偏函數導入

import time import asyncio now = lambda : time.time() async def do_some_work(x): print('Waiting: ', x) return 'Done after {}s'.format(x) def callback(future): # 回調函數 print('Callback: ', future.result()) start = now() coroutine = do_some_work(2) loop = asyncio.get_event_loop() get_future = asyncio.ensure_future(coroutine) task.add_done_callback(callback) # 添加回調函數 loop.run_until_complete(get_future) print('TIME: ', now() - start)

回調函數須要多個參數時,future參數要放最後。執行完成,咱們能夠經過參數future獲取協程的執行結果:future.result()

import functools # functools.partial:偏函數,能將帶參數的函數包裝成一個新的函數 def callback(t, future): # 回調函數 ,future放最後 print('Callback:', t, future.result()) task.add_done_callback(functools.partial(callback, 2)

asyncio.iscoroutine(obj)

Return True if obj is a coroutine object.
判斷是否爲coroutine對象,若是是返回True

asyncio.iscoroutinefunction(func)

判斷是否爲coroutine函數,若是是返回True

參考資料

https://docs.python.org/3.7/library/asyncio-task.html
https://www.jianshu.com/p/b5e347b3a17c

微信公衆號:python學習開發 加微信italocxa 入羣。

原文地址:https://www.cnblogs.com/c-x-a/p/10220398.html

相關文章
相關標籤/搜索