asyncio之Coroutines,Tasks and Future

asyncio之Coroutines,Tasks and Future

Coroutines and Tasks屬於High-level APIs,也就是高級層的api。html

本節概述用於協程和任務的高級異步api。python

Coroutines

Coroutines翻譯過來意思是協程,
使用async/await語法聲明的協程是編寫asyncio應用程序的首選方法。api

import asyncio


async def main():
    print("hello")
    await asyncio.sleep(1)
    print("world")


if __name__ == '__main__':
    # asyncio.run(main())  # 3.7的用法
    # 阻塞直到hello world()協程結束時返回
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

第一個異步函數是經過建立loop循環去調用,其餘異步函數之間經過await進行調用。
像下面的一個例子安全

import asyncio
import time


async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)


async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # 阻塞直到hello world()協程結束時返回
    loop.run_until_complete(main())
    loop.close()

或者咱們能夠經過asyncio.create_task()將協程say_after封裝任務去調用就像下面這樣。微信

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))

    task2 = asyncio.create_task(
        say_after(2, 'world'))

    print(f"started at {time.strftime('%X')}")

    # 等待兩個子任務完成
    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

Awaitables

咱們說,若是一個對象能夠用在await表達式中,那麼它就是Awaitables的對象。
可等待對象主要有三種類型:coroutines, Tasks, and Futures.併發

Coroutines

 前面的代碼中演示了協程的運做方式,這裏主要強調兩點。app

  • 協程函數:asyc def定義的函數;
  • 協程對象:經過調用協程函數返回的對象。異步

    Tasks

    任務對協程進一步封裝,其中包含任務的各類狀態。
    協程對象不能直接運行,在註冊事件循環的時候,實際上是run_until_complete方法將協程包裝成爲了一個任務(task)對象。
import asyncio


async def nested():
    await asyncio.sleep(2)
    print("等待2s")


async def main():
    # 將協程包裝成任務含有狀態
    # task = asyncio.create_task(nested())
    task = asyncio.ensure_future(nested())
    print(task)
    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task
    print(task)
    print(task.done())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt as e:
        for task in asyncio.Task.all_tasks():
            print(task)
            task.cancel()
            print(task)
        loop.run_forever()  # restart loop
    finally:
        loop.close()

能夠看到async

<Task pending coro=<nested() running at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9>>
等待2s
<Task finished coro=<nested() done, defined at /Users/chennan/pythonproject/asyncproject/asyncio-cn/1-2-1.py:9> result=None>
True

建立task後,task在加入事件循環以前是pending狀態而後調用nested函數等待2s以後打印task爲finished狀態。asyncio.ensure_future(coroutine) 和 loop.create_task(coroutine)均可以建立一個task,python3.7增長了asyncio.create_task(coro)。其中task是Future的一個子類函數

Future

future:表明未來執行或沒有執行的任務的結果。它和task上沒有本質的區別
一般不須要在應用程序級別代碼中建立Future對象。
future對象有幾個狀態:

  • Pending
  • Running
  • Done
  • Cancelled

經過上面的代碼能夠知道建立future的時候,task爲pending,事件循環調用執行的時候是running,調用完畢天然就是done因而調用task.done()打印了true。

若是在命令行中運行上述代碼,ctrl+c後會發現
輸出如下內容

<Task pending coro=<nested() running at 1-2-1.py:9>>
^C<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x10d342978>()]> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>>
<Task pending coro=<main() running at 1-2-1.py:21> wait_for=<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>>
<Task pending coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>
<Task cancelling coro=<nested() running at 1-2-1.py:10> wait_for=<Future cancelled> cb=[<TaskWakeupMethWrapper object at 0x10d342918>()]>

由於咱們調用了task.cancel() 因此能夠看到此時的任務狀態爲取消狀態。

併發的執行任務

經過使用await+asyncio.gather能夠完成併發的操做。
asyncio.gather用法以下。
**asyncio.gather(*aws, loop=None, return_exceptions=False)
aws是一系列協程,協程都成功完成,就返回值一個結果列表。結果值的順序與aws中添加協程的順序相對應。
return_exceptions=False,其實就是若是有一個任務失敗了,就直接拋出異常。若是等於True就把錯誤信息做爲結果返回回來。
首先來一個正常狀況不出錯的例子:

import asyncio


async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        if number == 2:
            1 / 0
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")


async def main():
    # Schedule three calls *concurrently*:
    res = await asyncio.gather(
        *[factorial("A", 2),
          factorial("B", 3),
          factorial("C", 4)]
        , return_exceptions=True)
    for item in res:
        print(item)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    except KeyboardInterrupt as e:
        for task in asyncio.Task.all_tasks():
            print(task)
            task.cancel()
            print(task)
        loop.run_forever()  # restart loop
    finally:
        loop.close()

輸入如下內容:

Task A: Compute factorial(2)...
Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24
division by zero
None
None

能夠發現async.gather最後會返回一系列的結果,若是出現了錯誤就把錯誤信息做爲返回結果,這裏我當數字爲2時人爲加了異常操做1/0,因而返回告終果division by zero,對於其餘的任務由於沒有返回值因此是None。這裏return_exceptions=True來保證了若是其中一個任務出現異常,其餘任務不會受其影響會執行到結束。

asyncio.wait

coroutine asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

asyncio.wait和async.gather用法差很少只是async.wait接收的是個列表。
第三個參數和async.gather有點區別.

參數名 含義
FIRST_COMPLETED 任何一個future完成或取消時返回
FIRST_EXCEPTION 任何一個future出現錯誤將返回,若是出現異常等價於ALL_COMPLETED
ALL_COMPLETED 當全部任務完成或者被取消時返回結果,默認值。

Timeouts

經過使用asyncio.wait_for來完成一個超時函數回調操做,若是函數規定時間內未完成則報錯。
**asyncio.wait_for(aw, timeout, *, loop=None)**
aw表明一個協程,timeout單位秒。

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

1秒內eternity沒有完成就報錯了。
python3.7中發生更改:當aw因爲超時而被取消時,再也不顯示異常而是等待aw被取消。
說到timeout的,若是僅僅是對一個代碼塊作timeout操做而不是等待某個協程此時推薦第三方模塊async_timeout

async_timeout

安裝

pip installa async_timeout

使用方法很簡單以下

async with async_timeout.timeout(1.5) as cm:
    await inner()
print(cm.expired)

若是1.5s能夠運行完打印true,不然打印false,表示超時。

asyncio.as_completed

**asyncio.as_completed(aws, *, loop=None, timeout=None)**
使用as_completed會返回一個能夠迭代的future對象,一樣能夠獲取協程的運行結果,使用方法以下:

async def main():
    coroutine1 = do_some_work(1)
    coroutine2 = do_some_work(2)
    coroutine3 = do_some_work(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))

start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print('TIME: ', now() - start)

協程嵌套

使用async能夠定義協程,協程用於耗時的io操做,咱們也能夠封裝更多的io操做過程,這樣就實現了嵌套的協程,即一個協程中await了另一個協程,如此鏈接起來
官網實例:

圖解:

 一、run_until_complete運行,會註冊task(協程:print_sum)並開啓事件循環 →

 二、print_sum協程中嵌套了子協程,此時print_sum協程暫停(相似委託生成器),轉到子協程(協程:compute)中運行代碼,期間子協程需sleep1秒鐘,直接將結果反饋到event loop中,即將控制權轉回調用方,而中間的print_sum暫停不操做 →

 三、1秒後,調用方將控制權給到子協程(調用方與子協程直接通訊),子協程執行接下來的代碼,直到再遇到wait(此實例沒有)→

 四、 最後執行到return語句,子協程向上級協程(print_sum拋出異常:StopIteration),同時將return返回的值返回給上級協程(print_sum中的result接收值),print_sum繼續執行暫時時後續的代碼,直到遇到return語句 →

 五、向 event loop 拋出StopIteration異常,此時協程任務都已經執行完畢,事件循環執行完成(event loop :the loop is stopped),close事件循環。

調度線程

asyncio.run_coroutine_threadsafe(coro, loop)
等待其餘線程返回一個concurrent.futures.Future對象,這是一個線程安全的方法。
這個函數應該從不一樣的OS線程調用,而不是從事件循環所在的線程調用。

def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()

async def do_some_work(x):
    print('Waiting {}'.format(x))
    await asyncio.sleep(x)
    print('Done after {}s'.format(x))

def more_work(x):
    print('More work {}'.format(x))
    time.sleep(x)
    print('Finished more work {}'.format(x))

start = now()
new_loop = asyncio.new_event_loop()
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print('TIME: {}'.format(time.time() - start))

asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主線程中建立一個new_loop,而後在另外的子線程中開啓一個無限事件循環。主線程經過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件循環的併發操做,同時主線程又不會被block。一共執行的時間大概在6s左右。
run_in_executor

import time
import asyncio


async def main():
    print(f'{time.ctime()} Hello')
    await asyncio.sleep(1.0)
    print(f'{time.ctime()} Goodbye')
    loop.stop()


def blocking():  # 1
    time.sleep(0.5)  # 2
    print(f'{time.ctime()} Hello from a thread!')


loop = asyncio.get_event_loop()
loop.create_task(main())
loop.run_in_executor(None, blocking)  # 3

loop.run_forever()
pending = asyncio.Task.all_tasks(loop=loop)  # 4
group = asyncio.gather(*pending)
loop.run_until_complete(group)
loop.close()

輸出

Fri Jan  4 15:32:03 2019 Hello
Fri Jan  4 15:32:04 2019 Hello from a thread!
Fri Jan  4 15:32:04 2019 Goodbye

下面對上面的函數的序號進行講解:

1 這個函數調用了常規的sleep(),這會阻塞主線程並阻止loop運行,咱們不能使這個函數變成協程,更糟糕的是不能在主線程運行loop時調用它,解決辦法是用一個executor來運行它;
2 注意一點,這個sleep運行時間比協程中的sleep運行時間要短,後文再討論若是長的話會發生什麼;
3 該方法幫助咱們在事件loop裏用額外的線程或進程執行函數,這個方法的返回值是一個Future對象,意味着能夠用await來切換它;
4 掛起的task中不包含前面的阻塞函數,而且這個方法只返回task對象,絕對不會返回Future對象。

綁定回調

綁定回調,在task執行完畢的時候能夠獲取執行的結果,回調的最後一個參數是future對象,經過該對象能夠獲取協程返回值。若是回調須要多個參數,能夠經過偏函數導入

import time
import asyncio
 
now = lambda : time.time()
 
async def do_some_work(x):
    print('Waiting: ', x)
    return 'Done after {}s'.format(x)
 
def callback(future):  # 回調函數
    print('Callback: ', future.result())
 
start = now()
 
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()
get_future = asyncio.ensure_future(coroutine)
task.add_done_callback(callback)  # 添加回調函數
loop.run_until_complete(get_future)
 
print('TIME: ', now() - start)

回調函數須要多個參數時,future參數要放最後。執行完成,咱們能夠經過參數future獲取協程的執行結果:future.result()

import functools   # functools.partial:偏函數,能將帶參數的函數包裝成一個新的函數
def callback(t, future): # 回調函數 ,future放最後
    print('Callback:', t, future.result())
 
task.add_done_callback(functools.partial(callback, 2)

asyncio.iscoroutine(obj)

Return True if obj is a coroutine object.
判斷是否爲coroutine對象,若是是返回True

asyncio.iscoroutinefunction(func)

判斷是否爲coroutine函數,若是是返回True

參考資料

https://docs.python.org/3.7/library/asyncio-task.html
https://www.jianshu.com/p/b5e347b3a17c

微信公衆號:python學習開發 加微信italocxa 入羣。

相關文章
相關標籤/搜索