一份詳細的asyncio入門教程

asyncio模塊提供了使用協程構建併發應用的工具。它使用一種單線程單進程的的方式實現併發,應用的各個部分彼此合做, 能夠顯示的切換任務,通常會在程序阻塞I/O操做的時候發生上下文切換如等待讀寫文件,或者請求網絡。同時asyncio也支持調度代碼在未來的某個特定事件運行,從而支持一個協程等待另外一個協程完成,以處理系統信號和識別其餘一些事件。html

異步併發的概念

對於其餘的併發模型大多數採起的都是線性的方式編寫。而且依賴於語言運行時系統或操做系統的底層線程或進程來適當地改變上下文,而基於asyncio的應用要求應用代碼顯示的處理上下文切換。
asyncio提供的框架以事件循環(event loop)爲中心,程序開啓一個無限的循環,程序會把一些函數註冊到事件循環上。當知足事件發生的時候,調用相應的協程函數。python

事件循環

事件循環是一種處理多併發量的有效方式,在維基百科中它被描述爲「一種等待程序分配事件或消息的編程架構」,咱們能夠定義事件循環來簡化使用輪詢方法來監控事件,通俗的說法就是「當A發生時,執行B」。事件循環利用poller對象,使得程序員不用控制任務的添加、刪除和事件的控制。事件循環使用回調方法來知道事件的發生。它是asyncio提供的「中央處理設備」,支持以下操做:程序員

  • 註冊、執行和取消延遲調用(超時)
  • 建立可用於多種類型的通訊的服務端和客戶端的Transports
  • 啓動進程以及相關的和外部通訊程序的Transports
  • 將耗時函數調用委託給一個線程池
  • 單線程(進程)的架構也避免的多線程(進程)修改可變狀態的鎖的問題。

與事件循環交互的應用要顯示地註冊將運行的代碼,讓事件循環在資源可用時嚮應用代碼發出必要的調用。如:一個套接字再沒有更多的數據能夠讀取,那麼服務器會把控制全交給事件循環。數據庫

Future

future是一個數據結構,表示還未完成的工做結果。事件循環能夠監視Future對象是否完成。從而容許應用的一部分等待另外一部分完成一些工做。編程

Task

task是Future的一個子類,它知道如何包裝和管理一個協程的執行。任務所需的資源可用時,事件循環會調度任務容許,並生成一個結果,從而能夠由其餘協程消費。服務器

異步方法

使用asyncio也就意味着你須要一直寫異步方法。
一個標準方法是這樣的:微信

def regular_double(x):
    return 2 * x

而一個異步方法:網絡

async def async_double(x):
    return 2 * x

從外觀上看異步方法和標準方法沒什麼區別只是前面多了個async。
「Async」 是「asynchronous」的簡寫,爲了區別於異步函數,咱們稱標準函數爲同步函數,
從用戶角度異步函數和同步函數有如下區別:session

要調用異步函數,必須使用await關鍵字。 所以,不要寫regular_double(3),而是寫await async_double(3).
不能在同步函數裏使用await,不然會出錯。
句法錯誤:數據結構

def print_double(x):
    print(await async_double(x))   # <-- SyntaxError here

可是在異步函數中,await是被容許的:

async def print_double(x):
    print(await async_double(x))   # <-- OK!

協程

啓動一個協程

通常異步方法被稱之爲協程(Coroutine)。asyncio事件循環能夠經過多種不一樣的方法啓動一個協程。通常對於入口函數,最簡答的方法就是使用run_until_complete(),並將協程直接傳入這個方法。

import asyncio


async def foo():
    print("這是一個協程")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print("開始運行協程")
        coro = foo()
        print("進入事件循環")
        loop.run_until_complete(coro)
    finally:
        print("關閉事件循環")
        loop.close()

輸出

開始運行協程
進入事件循環
這是一個協程
關閉事件循環

這就是最簡單的一個協程的例子,下面讓咱們瞭解一下上面的代碼.
第一步首先獲得一個事件循環的應用也就是定義的對象loop。可使用默認的事件循環,也能夠實例化一個特定的循環類(好比uvloop),這裏使用了默認循環run_until_complete(coro)方法用這個協程啓動循環,協程返回時這個方法將中止循環。
run_until_complete的參數是一個futrue對象。當傳入一個協程,其內部會自動封裝成task,其中task是Future的子類。關於task和future後面會提到。

從協程中返回值

將上面的代碼,改寫成下面代碼

import asyncio


async def foo():
    print("這是一個協程")
    return "返回值"


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print("開始運行協程")
        coro = foo()
        print("進入事件循環")
        result = loop.run_until_complete(coro)
        print(f"run_until_complete能夠獲取協程的{result},默認輸出None")
    finally:
        print("關閉事件循環")
        loop.close()

run_until_complete能夠獲取協程的返回值,若是沒有給定返回值,則像函數同樣,默認返回None。

協程調用協程

一個協程能夠啓動另外一個協程,從而能夠任務根據工做內容,封裝到不一樣的協程中。咱們能夠在協程中使用await關鍵字,鏈式的調度協程,來造成一個協程任務流。向下面的例子同樣。

import asyncio


async def main():
    print("主協程")
    print("等待result1協程運行")
    res1 = await result1()
    print("等待result2協程運行")
    res2 = await result2(res1)
    return (res1,res2)


async def result1():
    print("這是result1協程")
    return "result1"


async def result2(arg):
    print("這是result2協程")
    return f"result2接收了一個參數,{arg}"


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        result = loop.run_until_complete(main())
        print(f"獲取返回值:{result}")
    finally:
        print("關閉事件循環")
        loop.close()

輸出

主協程
等待result1協程運行
這是result1協程
等待result2協程運行
這是result2協程
獲取返回值:('result1', 'result2接收了一個參數,result1')
關閉事件循環

協程中調用普通函數

在協程中能夠經過一些方法去調用普通的函數。可使用的關鍵字有call_soon,call_later,call_at。

call_soon

能夠經過字面意思理解調用當即返回。

loop.call_soon(callback, *args, context=None)

在下一個迭代的時間循環中馬上調用回調函數,大部分的回調函數支持位置參數,而不支持」關鍵字參數」,若是是想要使用關鍵字參數,則推薦使用functools.aprtial()對方法進一步包裝.可選關鍵字context容許指定要運行的回調的自定義contextvars.Context。當沒有提供上下文時使用當前上下文。在Python 3.7中, asyncio
協程加入了對上下文的支持。使用上下文就能夠在一些場景下隱式地傳遞變量,好比數據庫鏈接session等,而不須要在全部方法調用顯示地傳遞這些變量。
下面來看一下具體的使用例子。

import asyncio
import functools


def callback(args, *, kwargs="defalut"):
    print(f"普通函數作爲回調函數,獲取參數:{args},{kwargs}")


async def main(loop):
    print("註冊callback")
    loop.call_soon(callback, 1)
    wrapped = functools.partial(callback, kwargs="not defalut")
    loop.call_soon(wrapped, 2)
    await asyncio.sleep(0.2)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main(loop))
finally:
    loop.close()

輸出結果

註冊callback
普通函數作爲回調函數,獲取參數:1,defalut
普通函數作爲回調函數,獲取參數:2,not defalut

經過輸出結果咱們能夠發現咱們在協程中成功調用了一個普通函數,順序的打印了1和2。

有時候咱們不想當即調用一個函數,此時咱們就能夠call_later延時去調用一個函數了。

call_later

loop.call_later(delay, callback, *args, context=None)

首先簡單的說一下它的含義,就是事件循環在delay多長時間以後才執行callback函數.
配合上面的call_soon讓咱們看一個小例子

import asyncio


def callback(n):
    print(f"callback {n} invoked")


async def main(loop):
    print("註冊callbacks")
    loop.call_later(0.2, callback, 1)
    loop.call_later(0.1, callback, 2)
    loop.call_soon(callback, 3)
    await asyncio.sleep(0.4)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

輸出

註冊callbacks
callback 3 invoked
callback 2 invoked
callback 1 invoked

經過上面的輸出能夠獲得以下結果:
1.call_soon會在call_later以前執行,和它的位置在哪無關
2.call_later的第一個參數越小,越先執行。

call_at

loop.call_at(when, callback, *args, context=None)

call_at第一個參數的含義表明的是一個單調時間,它和咱們平時說的系統時間有點差別,
這裏的時間指的是事件循環內部時間,能夠經過loop.time()獲取,而後能夠在此基礎上進行操做。後面的參數和前面的兩個方法同樣。實際上call_later內部就是調用的call_at。

import asyncio


def call_back(n, loop):
    print(f"callback {n} 運行時間點{loop.time()}")


async def main(loop):
    now = loop.time()
    print("當前的內部時間", now)
    print("循環時間", now)
    print("註冊callback")
    loop.call_at(now + 0.1, call_back, 1, loop)
    loop.call_at(now + 0.2, call_back, 2, loop)
    loop.call_soon(call_back, 3, loop)
    await asyncio.sleep(1)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print("進入事件循環")
        loop.run_until_complete(main(loop))
    finally:
        print("關閉循環")
        loop.close()

輸出

進入事件循環
當前的內部時間 4412.152849525
循環時間 4412.152849525
註冊callback
callback 3 運行時間點4412.152942526
callback 1 運行時間點4412.253202825
callback 2 運行時間點4412.354262512
關閉循環

由於call_later內部實現就是經過call_at因此這裏就很少說了。

Future

獲取Futrue裏的結果

future表示尚未完成的工做結果。事件循環能夠經過監視一個future對象的狀態來指示它已經完成。future對象有幾個狀態:

  • Pending
  • Running
  • Done
  • Cancelled
    建立future的時候,task爲pending,事件循環調用執行的時候固然就是running,調用完畢天然就是done,若是須要中止事件循環,就須要先把task取消,狀態爲cancel。
import asyncio


def foo(future, result):
    print(f"此時future的狀態:{future}")
    print(f"設置future的結果:{result}")
    future.set_result(result)
    print(f"此時future的狀態:{future}")


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        all_done = asyncio.Future()
        loop.call_soon(foo, all_done, "Future is done!")
        print("進入事件循環")
        result = loop.run_until_complete(all_done)
        print("返回結果", result)
    finally:
        print("關閉事件循環")
        loop.close()
    print("獲取future的結果", all_done.result())

輸出

進入事件循環
此時future的狀態:<Future pending cb=[_run_until_complete_cb() at /Library/Frameworks/Python.framework/Versions/3.6/lib/python3.6/asyncio/base_events.py:176]>
設置future的結果:Future is done!
此時future的狀態:<Future finished result='Future is done!'>
返回結果 Future is done!
關閉事件循環
獲取future的結果 Future is done!

能夠經過輸出結果發現,調用set_result以後future對象的狀態由pending變爲finished
,Future的實例all_done會保留提供給方法的結果,能夠在後續使用。

Future對象使用await

future和協程同樣可使用await關鍵字獲取其結果。

import asyncio


def foo(future, result):
    print("設置結果到future", result)
    future.set_result(result)


async def main(loop):
    all_done = asyncio.Future()
    print("調用函數獲取future對象")
    loop.call_soon(foo, all_done, "the result")

    result = await all_done
    print("獲取future裏的結果", result)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

Future回調

Future 在完成的時候能夠執行一些回調函數,回調函數按註冊時的順序進行調用:

import asyncio
import functools


def callback(future, n):
    print('{}: future done: {}'.format(n, future.result()))


async def register_callbacks(all_done):
    print('註冊callback到future對象')
    all_done.add_done_callback(functools.partial(callback, n=1))
    all_done.add_done_callback(functools.partial(callback, n=2))


async def main(all_done):
    await register_callbacks(all_done)
    print('設置future的結果')
    all_done.set_result('the result')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        all_done = asyncio.Future()
        loop.run_until_complete(main(all_done))
    finally:
        loop.close()

經過add_done_callback方法給funtrue任務添加回調函數,當funture執行完成的時候,就會調用回調函數。並經過參數future獲取協程執行的結果。
到此爲止,咱們就學會了如何在協程中調用一個普通函數並獲取其結果。

併發的執行任務

任務(Task)是與事件循環交互的主要途徑之一。任務能夠包裝協程,能夠跟蹤協程什麼時候完成。任務是Future的子類,因此使用方法和future同樣。協程能夠等待任務,每一個任務都有一個結果,在它完成以後能夠獲取這個結果。
由於協程是沒有狀態的,咱們經過使用create_task方法能夠將協程包裝成有狀態的任務。還能夠在任務運行的過程當中取消任務。

import asyncio


async def child():
    print("進入子協程")
    return "the result"


async def main(loop):
    print("將協程child包裝成任務")
    task = loop.create_task(child())
    print("經過cancel方法能夠取消任務")
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("取消任務拋出CancelledError異常")
    else:
        print("獲取任務的結果", task.result())


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main(loop))
    finally:
        loop.close()

輸出

將協程child包裝成任務
經過cancel方法能夠取消任務
取消任務拋出CancelledError異常

若是把上面的task.cancel()註釋了咱們能夠獲得正常狀況下的結果,以下。

將協程child包裝成任務
經過cancel方法能夠取消任務
進入子協程
獲取任務的結果 the result

另外出了使用loop.create_task將協程包裝爲任務外還可使用asyncio.ensure_future(coroutine)建一個task。在python3.7中可使用asyncio.create_task建立任務。

組合協程

一系列的協程能夠經過await鏈式的調用,可是有的時候咱們須要在一個協程裏等待多個協程,好比咱們在一個協程裏等待1000個異步網絡請求,對於訪問次序有沒有要求的時候,就可使用另外的關鍵字wait或gather來解決了。wait能夠暫停一個協程,直到後臺操做完成。

等待多個協程

Task的使用

import asyncio


async def num(n):
    try:
        await asyncio.sleep(n*0.1)
        return n
    except asyncio.CancelledError:
        print(f"數字{n}被取消")
        raise


async def main():
    tasks = [num(i) for i in range(10)]
    complete, pending = await asyncio.wait(tasks, timeout=0.5)
    for i in complete:
        print("當前數字",i.result())
    if pending:
        print("取消未完成的任務")
        for p in pending:
            p.cancel()


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

輸出

當前數字 1
當前數字 2
當前數字 0
當前數字 4
當前數字 3
取消未完成的任務
數字5被取消
數字9被取消
數字6被取消
數字8被取消
數字7被取消

能夠發現咱們的結果並無按照數字的順序顯示,在內部wait()使用一個set保存它建立的Task實例。由於set是無序的因此這也就是咱們的任務不是順序執行的緣由。wait的返回值是一個元組,包括兩個集合,分別表示已完成和未完成的任務。wait第二個參數爲一個超時值
達到這個超時時間後,未完成的任務狀態變爲pending,當程序退出時還有任務沒有完成此時就會看到以下的錯誤提示。

Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e0558>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e06d8>()]>>
Task was destroyed but it is pending!
task: <Task pending coro=<num() done, defined at 11.py:12> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x1093e0738>()]>>

此時咱們能夠經過迭代調用cancel方法取消任務。也就是這段代碼

if pending:
        print("取消未完成的任務")
        for p in pending:
            p.cancel()

gather的使用

gather的做用和wait相似不一樣的是。
1.gather任務沒法取消。
2.返回值是一個結果列表
3.能夠按照傳入參數的順序,順序輸出。
咱們將上面的代碼改成gather的方式

import asyncio


async def num(n):
    try:
        await asyncio.sleep(n * 0.1)
        return n
    except asyncio.CancelledError:
        print(f"數字{n}被取消")
        raise


async def main():
    tasks = [num(i) for i in range(10)]
    complete = await asyncio.gather(*tasks)
    for i in complete:
        print("當前數字", i)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

輸出

當前數字 0
當前數字 1
....中間部分省略
當前數字 9

gather一般被用來階段性的一個操做,作完第一步才能作第二步,好比下面這樣

import asyncio

import time


async def step1(n, start):
    await asyncio.sleep(n)
    print("第一階段完成")
    print("此時用時", time.time() - start)
    return n


async def step2(n, start):
    await asyncio.sleep(n)
    print("第二階段完成")
    print("此時用時", time.time() - start)
    return n


async def main():
    now = time.time()
    result = await asyncio.gather(step1(5, now), step2(2, now))
    for i in result:
        print(i)
    print("總用時", time.time() - now)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        loop.run_until_complete(main())
    finally:
        loop.close()

輸出

第二階段完成
此時用時 2.0014898777008057
第一階段完成
此時用時 5.002960920333862
5
2
總用時 5.003103017807007

能夠經過上面結果獲得以下結論:
1.step1和step2是並行運行的。
2.gather會等待最耗時的那個完成以後才返回結果,耗時總時間取決於其中任務最長時間的那個。

任務完成時進行處理

as_complete是一個生成器,會管理指定的一個任務列表,並生成他們的結果。每一個協程結束運行時一次生成一個結果。與wait同樣,as_complete不能保證順序,不過執行其餘動做以前沒有必要等待因此後臺操做完成。

import asyncio
import time


async def foo(n):
    print('Waiting: ', n)
    await asyncio.sleep(n)
    return n


async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)

    tasks = [
        asyncio.ensure_future(coroutine1),
        asyncio.ensure_future(coroutine2),
        asyncio.ensure_future(coroutine3)
    ]
    for task in asyncio.as_completed(tasks):
        result = await task
        print('Task ret: {}'.format(result))


now = lambda : time.time()
start = now()

loop = asyncio.get_event_loop()
done = loop.run_until_complete(main())
print(now() - start)

輸出

Waiting:  1
Waiting:  2
Waiting:  4
Task ret: 1
Task ret: 2
Task ret: 4
4.004292249679565

能夠發現結果逐個輸出。

到此爲止第一部分就結束了,對於asyncio入門級學習來講這些內容就夠了。若是想繼續跟進asyncio的內容,敬請期待後面的內容。

參考資料

更多異步內容請關注微信公衆號:python學習開發

相關文章
相關標籤/搜索