Python 中的協程 (4) asyncio模塊

asyncio的核心概念與基本架構python

  本文針對的是python3.4之後的版本的,由於從3.4開始才引入asyncio,後面的3.5 3.6 3.7版本是向前兼容的,只不過語法上面有稍微的改變。好比在3.4版本中使用@asyncio.coroutine裝飾器和yield from語句,可是在3.5之後的版本中使用async、await兩個關鍵字代替,雖然語法上稍微有所差別,可是原理是同樣的。express

 

1 asyncio 組成的基本概念

1 協程函數的做用

(1)result = yield from future,返回future的結果。編程

(2)result = yield from coroutine,等候另外一個協程函數返回結果或者是觸發異常 安全

(3)result= yield from task,返回一個task的結果架構

(4)return expression,做爲一個函數拋出返回值併發

(5)raise exception異步

2 事件循環 event_loop

如何理解事件循環:async

線程一直在各個協程方法之間永不停歇的遊走,遇到一個yield from 或者await就懸掛起來,而後又走到另一個方法,依次進行下去,知道事件循環全部的方法執行完畢。實際上loop是BaseEventLoop的一個實例,咱們能夠查看定義,它到底有哪些方法可調用異步編程

協程函數,不是像普通函數那樣直接調用運行的,必須添加到事件循環中,而後由事件循環去運行,單獨運行協程函數是不會有結果的。函數

import time
import asyncio
async def say_after_time(delay,what):
        await asyncio.sleep(delay)
        print(what)

async def main():
        print(f"開始時間爲: {time.time()}")
        await say_after_time(1,"hello")
        await say_after_time(2,"world")
        print(f"結束時間爲: {time.time()}")
        
''' 直接運行 '''        
# >>> main()
# <coroutine object main at 0x1053bb7c8>       

'''  須要經過事件循環來調用'''
loop=asyncio.get_event_loop()    #建立事件循環對象
#loop=asyncio.new_event_loop()   #與上面等價,建立新的事件循環
loop.run_until_complete(main())  #經過事件循環對象運行協程函數
loop.close()

(1)獲取事件循環對象的幾種方式:

  1. loop=asyncio.get_running_loop(),返回(獲取)在當前線程中正在運行的事件循環,若是沒有正在運行的事件循環,則會顯示錯誤

  2. loop=asyncio.get_event_loop() ,得到一個事件循環,若是當前線程尚未事件循環,則建立一個新的事件循環loop

  3. loop=asyncio.set_event_loop(loop), 設置一個事件循環爲當前線程的事件循環;

  4. loop=asyncio.new_event_loop() ,建立一個新的事件循環

(2)經過事件循環運行協程函數的兩種方式:

  1. 建立事件循環對象loop,即 asyncio.get_event_loop(),經過事件循環運行協程函數

  2. 直接經過 asyncio.run(function_name) 運行協程函數。

  可是須要注意的是,首先run函數是python3.7版本新添加的,前面的版本是沒有的;其次,這個run函數老是會建立一個新的事件循環並在run結束以後關閉事件循環,因此,若是在同一個線程中已經有了一個事件循環,則不能再使用這個函數了,由於同一個線程不能有兩個事件循環,並且這個run函數不能同時運行兩次,由於他已經建立一個了。即同一個線程中是不容許有多個事件循環loop的。 asyncio.run()是python3.7 新添加的內容,也是後面推薦的運行任務的方式,由於它是高層API,後面會講到它與asyncio.run_until_complete()的差別性,run_until_complete()是相對較低層的API。

3 什麼是awaitable對象

有三類對象是可等待的,即 coroutines , Tasks , and Futures .

coroutine :本質上就是一個函數,一前面的生成器yield和yield from爲基礎,再也不贅述;

Tasks : 任務,顧名思義,就是要完成某件事情,其實就是對協程函數進一步的封裝;

Future :它是一個「更底層」的概念,他表明一個異步操做的最終結果,由於異步操做通常用於耗時操做,結果不會當即獲得,會在「未來」獲得異步運行的結果,故而命名爲 Future。

三者的關係,coroutine 能夠自動封裝成 task ,而Task是 Future 的子類。

4 什麼是task任務

Task用來 併發調度的協程, 單純的協程函數僅僅是一個函數而已,將其包裝成任務,任務是能夠包含各類狀態的,異步編程最重要的就是對異步操做狀態的把控了。

(1)建立任務(兩種方法):

方法一:task = asyncio.create_task(coro()) # 這是3.7版本新添加的

方法二:task = asyncio.ensure_future(coro()) ,也可使用loop.create_future()loop.create_task(coro) 也是能夠的。

(2)獲取某一個任務的方法:

方法一:task=asyncio.current_task(loop=None);返回在某一個指定的loop中,當前正在運行的任務,若是沒有任務正在運行,則返回None;若是loop爲None,則默認爲在當前的事件循環中獲取,

方法二:asyncio.all_tasks(loop=None);返回某一個loop中尚未結束的任務;

5 什麼是future?

  Future是一個較低層的可等待(awaitable)對象,他表示的是異步操做的最終結果,當一個Future對象被等待的時候,協程會一直等待,直到Future已經運算完畢。 Future是Task的父類,通常狀況下,已不用去管它們二者的詳細區別,也沒有必要去用Future,用Task就能夠了,返回 future 對象的低級函數的一個很好的例子是 loop.run_in_executor().

  

2 asyncio的基本架構

asyncio分爲高層API和低層API。咱們前面所講的Coroutine和Tasks屬於高層API,而Event Loop 和Future屬於低層API。所謂的高層API主要是指那些asyncio.xxx()的方法。

High-level APIs

●Coroutines and Tasks(本文要寫的) ​ ●Streams ​ ●Synchronization Primitives ​ ●Subprocesses ​ ●Queues ​ ●Exceptions

Low-level APIs

●Event Loop(下一篇要寫的) ​ ●Futures ​ ●Transports and Protocols ​ ●Policies ​ ●Platform Support

1 常見的一些高層API方法
1)運行異步協程
asyncio.run(coro, *, debug=False)  #運行一個一步程序,參見上面

2)建立任務
task=asyncio.create_task(coro)  #python3.7  ,參見上面
task = asyncio.ensure_future(coro()) 

3)睡眠
await asyncio.sleep(delay, result=None, *, loop=None)
這個函數表示的是:當前的那個任務(協程函數)睡眠多長時間,而容許其餘任務執行。這是它與time.sleep()的區別,time.sleep()是當前線程休息

4)併發運行多個任務
await asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)
它自己也是awaitable的。當全部的任務都完成以後,返回的結果是一個列表的形式、

5)防止任務取消
await asyncio.shield(*arg, *, loop=None)

6)設置timeout
await asyncio.wait_for(aw, timeout, *, loop=None)
當異步操做須要執行的時間超過waitfor設置的timeout,就會觸發異常,因此在編寫程序的時候,若是要給異步操做設置timeout,必定要選擇合適,
若是異步操做自己的耗時較長,而你設置的timeout過短,會涉及到她還沒作完,就拋出異常了。
7)多個協程函數時候的等候 await asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED) 與上面的區別是,第一個參數aws是一個集合,要寫成集合set的形式,好比: {func(),func(),func3()} 表示的是一系列的協程函數或者是任務,其中協程會自動包裝成任務。事實上,寫成列表的形式也是能夠的。 該函數的返回值是兩個Tasks/Futures的集合: (done, pending) 其中done是一個集合,表示已經完成的任務tasks;pending也是一個集合,表示尚未完成的任務。 常見的使用方法爲:done, pending = await asyncio.wait(aws)
2 Task 類詳解

(1)他是做爲一個python協程對象,和Future對象很像的這麼一個對象,但不是線程安全的;他繼承了Future全部的API,,除了Future.set_result()和Future.set_Exception();

(2)使用高層API asyncio.create_task()建立任務,或者是使用低層API loop.create_task()或者是loop.ensure_future()建立任務對象;

(3)相比於協程函數,任務時有狀態的,可使用Task.cancel()進行取消,這會觸發CancelledError異常,使用cancelled()檢查是否取消。

cancel函數:

import asyncio

async def cancel_me():
    print('cancel_me(): before sleep')
    try:
        await asyncio.sleep(3600) #模擬一個耗時任務
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    #經過協程建立一個任務,須要注意的是,在建立任務的時候,就會跳入到異步開始執行
    #由於是3.7版本,建立一個任務就至關因而運行了異步函數cancel_me
    task = asyncio.create_task(cancel_me()) 
    #等待一秒鐘
    await asyncio.sleep(1)
    print('main函數休息完了')
    #發出取消任務的請求
    task.cancel()  
    try:
        await task  #由於任務被取消,觸發了異常
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

'''運行結果爲:
cancel_me(): before sleep
main函數休息完了
cancel_me(): cancel sleep
cancel_me(): after sleep
main(): cancel_me is cancelled now
'''
3 異步函數的結果獲取

兩種方法:第一種是直接經過Task.result()來獲取;第二種是綁定一個回調函數來獲取,即函數執行完畢後調用一個函數來獲取異步函數的返回值。

1,經過result函數

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模擬耗時任務3秒
    print("Hello again 01 end")
    return a+b

coroutine=hello1(10,5)
loop = asyncio.get_event_loop()                #第一步:建立事件循環
task=asyncio.ensure_future(coroutine)         #第二步:將多個協程函數包裝成任務列表
loop.run_until_complete(task)                  #第三步:經過事件循環運行
print('-------------------------------------')
print(task.result())
loop.close() 

'''運行結果爲
Hello world 01 begin
Hello again 01 end
-------------------------------------
15
'''

2, 經過定義回調函數

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模擬耗時任務3秒
    print("Hello again 01 end")
    return a+b

def callback(future):   #定義的回調函數
    print(future.result())

loop = asyncio.get_event_loop()                #第一步:建立事件循環
task=asyncio.ensure_future(hello1(10,5))       #第二步:將多個協程函數包裝成任務
task.add_done_callback(callback)                      #並被任務綁定一個回調函數

loop.run_until_complete(task)                  #第三步:經過事件循環運行
loop.close()                                   #第四步:關閉事件循環


'''運行結果爲:
Hello world 01 begin
Hello again 01 end
15
'''

  所謂的回調函數,就是指協程函數coroutine執行結束時候會調用回調函數。並經過參數future獲取協程執行的結果。咱們建立的task和回調裏的future對象,其實是同一個對象,由於task是future的子類。

3 asyncio 的基本模版

針對3.7以前的版本

import asyncio
import time
from functools import partial

async def get_url():
    print('start get url')
    await asyncio.sleep(2)          # await 後面跟的必須是一個 await 對象
    print('end get url')
    return 'stack'

def test(url,future):
    print(url,'hello, stack')

if __name__ == '__main__':
    start = time.time()
    
    loop = asyncio.get_event_loop()

    # loop.run_until_complete(get_url())      # 只是提交了一個請求,時間2s

    tasks = [get_url() for i in range(10)]

    # get_future = asyncio.ensure_future(get_url())
    # 得到返回值用法1,源碼上依然是先判斷loop,而後調用create_task

    # get_future = loop.create_task(get_url())
    # 方法2,還能夠繼續添加函數,執行邏輯
    # get_future.add_done_callback(partial(test, 'Stack'))
    # 函數自己在得到調用時須要一個任意形數,參數便是 get_future 自己,不然報錯
    # 若是函數須要傳遞參數,須要經過 偏函數 partial 模塊來解決,以及函數的形參須要放在前面


    loop.run_until_complete(asyncio.wait(tasks))  # 提交了10次,時間也是2s
     # loop.run_until_complete(asyncio.gather(*tasks)) 效果同上
     # gather 和 wait 的區別
     # gather是更高一級的抽象,且使用更加靈活,可使用分組,以及取消任務
    print(time.time() - start)
    # print(get_future.result())          # 接收返回值

針對3.7的版本

import asyncio
import time


async def hello1(a,b):
    print("Hello world 01 begin")
    await asyncio.sleep(3)  #模擬耗時任務3秒
    print("Hello again 01 end")
    return a+b

async def hello2(a,b):
    print("Hello world 02 begin")
    await asyncio.sleep(2)   #模擬耗時任務2秒
    print("Hello again 02 end")
    return a-b

async def hello3(a,b):
    print("Hello world 03 begin")
    await asyncio.sleep(4)   #模擬耗時任務4秒
    print("Hello again 03 end")
    return a*b

async def main():
    results=await asyncio.gather(hello1(10,5),hello2(10,5),hello3(10,5))
    for result in results:
        print(result)

asyncio.run(main())

'''運行結果爲:
Hello world 01 begin
Hello world 02 begin
Hello world 03 begin
Hello again 02 end
Hello again 01 end
Hello again 03 end
15
5
50
'''

總結:

第一步:構建一個入口函數main 它也是一個異步協程函數,即經過async定義,而且要在main函數裏面await一個或者是多個協程,同前面同樣,我能夠經過gather或者是wait進行組合,對於有返回值的協程函數,通常就在main裏面進行結果的獲取。

第二步:啓動主函數main 這是python3.7新添加的函數,就一句話,即 asyncio.run(main())

相關文章
相關標籤/搜索