2、深刻asyncio協程(任務對象,協程調用原理,協程併發)

  因爲纔開始寫博客,以前都是寫筆記本身看,因此可能會存在表述不清,過於囉嗦等各類各樣的問題,有什麼疑問或者批評歡迎在評論區留言。html

若是你初次接觸協程,請先閱讀上一篇文章初識asyncio協程對asyncio有個初步的認識。python

1、任務對象(task任務)

參照上一篇初識asyncio協程咱們瞭解到了任務對象是對協程的一種封裝,其中包含各類狀態,如阻塞狀態(suspended),運行狀態(running),完成狀態(done);併發

一、建立任務對象的三種方式框架

  • 第一種:loop.create_task(xxx)異步

  • 第二種:asyncio.ensure_future(xxx)async

  • 第三種:asyncio.create_task(xxx)ide

    其實第三種方法內部也是用的第一種方法,不過須要注意的是此方法在建立任務前須要一個已運行的事件循環,否則會拋出RuntimeError:no running event loop函數

二、添加回調oop

import asyncio
async def get_url(url):
    print('start get_url')
    await asyncio.sleep(2)
    print('end get_url')
    return 'Joshua'

def callback(future):  # 回調函數
    print('Hello {}'.format(future.result()))

if __name__=="__main__":
    loop = asyncio.get_event_loop()
    task = loop.create(get_url('https://www.baidu.com'))
    task.add_done_callback(callback)  # 添加回調
    loop.run_until_complete(task)

三、向回調函數傳遞參數url

from functools import partial
# partial(偏函數)能夠把函數包裝成另一個函數
import asyncio
async def get_url(url):
    print('start get_url')
    await asyncio.sleep(2)
    print('end get_url')
    return 'Joshua'

def callback(url,future):
    # 注意:要想向回調函數傳遞參數須要將參數放在future前面
    print('Hello {}'.format(future.result()))

if __name__=="__main__":
    loop = asyncio.get_event_loop()
    task = loop.create(get_url('https://www.baidu.com'))
    # 在傳遞前須要先用partial將函數封裝成另一個函數
    task.add_done_callback(partical(callback,'https://www.baidu.com'))

四、task取消()

用於請求取消Task對象,這將會再下一輪事件循環中拋出CanceledError;

  • 請求取消:task.cancel()

  • 判斷是否取消:cancelled()

  • 判斷是否結束:`done()`

  • 舉個例子,以下

    import asyncio
    
    async def a():
        print('執行a()')
        await asyncio.sleep(3)
        print('執行a()完成')
    
    async def b():
        print('執行b()')
        await asyncio.sleep(2)
        print('執行b()完成')
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        task1 = loop.create_task(a())
        task2 = loop.create_task(b())
        task1.cancel()  # 取消task1
        loop.run_until_complete(asyncio.gather(task1, task2))
    
    # 運行結果以下
    >>> 執行b()
    >>> Traceback (most recent call last):
      ... # 省略Traceback中的部分
    >>> concurrent.futures._base.CancelledError
    # 若是不想由於拋出異常而中斷運行,能夠在`gather`中設置`return_exception=True`
    # 若是設置`return_exception=True`,這樣異常會和成功的結果同樣處理,並聚合至結果列表,那麼運行結果將以下:
    >>> 執行b()
    >>> 執行b()完成

    能夠看到a()並無執行。

五、shield屏蔽取消

asyncio.shield(aw, **, loop=None),用於保護一個可等待對象被取消

  • aw:可等待對象,若是爲協程,將會在內部將其封裝爲Task

咱們先將上面取消的例子修改一下,以下:

import asyncio

async def a():
    print('執行a()')
    await asyncio.sleep(3)
    print('執行a()完成')

async def b():
    print('執行b()')
    await asyncio.sleep(2)
    print('執行b()完成')

def c():
    loop = asyncio.get_event_loop()
    task1 = loop.create_task(a())
    task1 = asyncio.shield(task1)  # 設置屏蔽取消操做
    task2 = loop.create_task(b())
    task1.cancel()
    loop.run_until_complete(asyncio.gather(task1, task2, return_exceptions=True))

c()
# 運行結果以下
>>> 執行a()
>>> 執行b()
>>> 執行b()完成

看到這裏依然被取消了,小夥伴們確定都是一(沙)臉(雕)懵(博)逼(主),可是這真不怪我啊,它確實是這樣輸出的,那麼爲何會這樣呢?咱們先看下shield()的源碼,以下:

def shield(arg, *, loop=None):
    inner = ensure_future(arg, loop=loop)  # 首先建立一個內部的Task 賦給inner
    if inner.done():  # 若是內部的Task執行完畢則返回inner
        # Shortcut.
        return inner
    loop = futures._get_loop(inner)
    outer = loop.create_future()  # 在事件循環上建立一個外部的Task

    def _done_callback(inner):  # 回調函數,在inner執行完畢後回調
        if outer.cancelled():  # 若是外部Task被取消
            if not inner.cancelled():  # 若是內部Task沒被取消
                # Mark inner's result as retrieved.
                inner.exception()  # 根據註釋的意思,內部Task的結果會被標記爲已檢索
            return

        if inner.cancelled():  # 若是內部的Task取消了,則外部的Task也將取消
            outer.cancel()  
        else:
            exc = inner.exception()  # 返回內部Task的異常
            if exc is not None:  # 若是存在異常,則外部Task的異常被設置爲內部Task的異常
                outer.set_exception(exc)
            else:  # 不然將外部Task結果設置爲內部Task的結果
                outer.set_result(inner.result())

    inner.add_done_callback(_done_callback)
    return outer  # 返回外部Task

由上面源碼能夠看到,真正運行的是內部的Task(inner),事件循環上的外部Task的一切都來自於內部Task,而咱們取消的則是事件循環上的外部Task,對內部的Task不產生任何影響,因此當事件循環上全部Task執行完成就退出了,而內部Task仍然在運行着,這就致使雖然咱們設置了屏蔽取消然而仍是被取消了,那怎麼解決呢?

有以下兩種方法:

  • 讓被取消的Task耗時小於正常Task的耗時,也就是把要被取消的a()協程模擬阻塞延時設置得比b()協程小,這樣使被屏蔽取消的協程的內部Task比事件循環中其餘正常Task執行完成得早;以下:

    async def a():
        print('執行a()')
        await asyncio.sleep(1)  # 由原來的3改成1,使其小於b()的延時
        print('執行a()完成')
    
    async def b():
        print('執行b()')
        await asyncio.sleep(2)
        print('執行b()完成')

    執行結果以下:

    >>> 執行a()
    >>> 執行b()
    >>> 執行a()完成
    >>> 執行b()完成

    能夠看到這下a()協程執行完成了。

  • 讓事件循環持續運行,也就不會由於事件循環上的Task執行完成,程序退出致使被屏蔽取消的協程的內部Task未能執行完成;以下:

    import asyncio
    import time
    async def a():
        print('執行a()')
        await asyncio.sleep(3)
        print('執行a()完成')
    
    async def b():
        print('執行b()')
        await asyncio.sleep(2)
        print('執行b()完成')
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        task1 = loop.create_task(a())
        task2 = loop.create_task(b())
        loop.run_forever()
    # 輸出以下
    >>> 執行a()
    >>> 執行b()
    >>> 執行b()完成
    >>> 執行a()完成

六、任務對象的其餘方法(粘貼自官方文檔)

  • result()

    返回 Task 的結果。若是 Task 對象 已完成,其封包的協程的結果會被返回 (或者當協程引起異常時,該異常會被從新引起。)若是 Task 對象 被取消,此方法會引起一個 CancelledError 異常。若是 Task 對象的結果還不可用,此方法會引起一個 InvalidStateError 異常。

  • exception()

    返回 Task 對象的異常。若是所封包的協程引起了一個異常,該異常將被返回。若是所封包的協程正常返回則該方法將返回 None。若是 Task 對象 被取消,此方法會引起一個 CancelledError 異常。若是 Task 對象還沒有 完成,此方法將引起一個 InvalidStateError 異常。

  • remove_done_callback(callback)

    從回調列表中移除 callback 指定的回調。此方法應該僅在低層級的基於回調的代碼中使用。要了解更多細節請查看 Future.remove_done_callback() 的文檔。

  • get_stack(**, limit=None*)

    返回此 Task 對象的棧框架列表。若是所封包的協程未完成,這將返回其掛起所在的棧。若是協程已成功完成或被取消,這將返回一個空列表。若是協程被一個異常終止,這將返回回溯框架列表。框架老是從按從舊到新排序。每一個被掛起的協程只返回一個棧框架。可選的 limit 參數指定返回框架的數量上限;默認返回全部框架。返回列表的順序要看是返回一個棧仍是一個回溯:棧返回最新的框架,回溯返回最舊的框架。(這與 traceback 模塊的行爲保持一致。)

  • print_stack(**, limit=None, file=None*)

    打印此 Task 對象的棧或回溯。此方法產生的輸出相似於 traceback 模塊經過 get_stack() 所獲取的框架。limit 參數會直接傳遞給 get_stack()file 參數是輸出所寫入的 I/O 流;默認狀況下輸出會寫入 sys.stderr

  • classmethod all_tasks(loop=None)

    返回一個事件循環中全部任務的集合。默認狀況下將返回當前事件循環中全部任務。若是 loopNone,則會使用 get_event_loop() 函數來獲取當前事件循環。This method is deprecated and will be removed in Python 3.9. Use the asyncio.all_tasks() function instead.

  • classmethod current_task(loop=None)

    返回當前運行任務或 None。若是 loopNone,則會使用 get_event_loop() 函數來獲取當前事件循環。

2、協程調用原理

爲了更好的理解後面的併發執行,因此如今爲咱們先了解下協程的調用原理,先舉個協程嵌套的例子:

import asyncio
import time
async def compute(x,y):
    await asyncio.sleep(1)
    return x+y

async def print_sum(x,y):
    reuslt = await compute(x,y)
    print(result)

if __name__ == "__main__":
    start = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1,2))
    loop.close()
    print('Cost:{}'.format(time.time()-start))
    
#輸出以下
>>> 3
>>> Cost:1.0007200241088867

下面是這個例子的時序圖:

  1. 建立一個Event Loop事件循環,調用run_until_complete啓動事件循環併爲print_sum建立一個任務對象Task,Event Loop進入running狀態,Task進入pending狀態;

  2. Task驅動運print_sum運行到await compute(x,y)轉向調用子協程computeprint_sum進入

    suspended狀態,子協程compute進入running狀態;

  3. compute運行到await asyncio.sleep(1)發生阻塞耗時,進入suspended狀態,此時直接與Task溝通,與Task打通一條通道(子協程與調用方Task),能夠粗略的理解爲相似子線程與進程溝通;

    這樣作的好處在於可以保證print_sum中是同步進行,而總體程序是異步運行,後面有例子解釋這個問題。

  4. Task與子協程溝通後轉向與Event Loop溝通,Event Loop中沒有其餘的Task便只能等待阻塞操做完成(睡眠1秒);

  5. 在等待阻塞耗時操做完成後,Event Loop經過Task與子協程compute的通道喚醒compute,此時compute進入running狀態;

  6. compute執行下一步的代碼return x+y,從running狀態轉爲done狀態,同時拋出StopIteration異常,而且攜帶return的結果;

  7. print_sum捕獲compute拋出的異常,並從其中獲取compute返回的結果,此時print_sum由suspended狀態轉爲running狀態,執行下一步的代碼print(result)

  8. print_sum執行完print後由running狀態轉爲done狀態,同時拋出StopIteration異常;

  9. Task捕獲到print_sum拋出的異常後由runing狀態轉爲done狀態;

  10. 此時Event Loop進入stopped狀態;

舉個例子解釋第3條:

import asyncio
import time
async def compute(x,y):
    await asyncio.sleep(1)
    return x+y

async def print_sum(x,y):
    result = await compute(x,y)
    print(result)

if __name__ == "__main__":
    start = time.time()
    loop = asyncio.get_event_loop()
    tasks = [print_sum(1,2),print_sum(1,2)]  # 建立兩個協程對象
    loop.run_until_complete(asyncio.gather(*tasks))  # 同時執行兩個協程對象
    loop.close()
    print('Cost:{}'.format(time.time()-start))
                            
#輸出以下
>>>3
>>>3
>>>Cost:1.0004866123199463

由上述例子能夠看出在執行多個協程對象的時候並無延長運行時間,實際上asyncio.gather的部分做用就是把協程對象轉爲Task並把它們註冊到事件循環上。此例子在上述時序圖解釋中的第3,4步並無等待阻塞,而是轉向調用另外一個print_sum的Task,是異步操做,因此理論上來講這種方法即便執行1000個協程對象依然只耗時1秒鐘。

再來看一個例子:

import asyncio
import time
async def compute(x,y):
    print('執行compute')
    await asyncio.sleep(1)
    return x+y

async def print_sum(x,y):
    result1 = await compute(x, y)  # 調用子協程
    print(result1)
    result2 = await compute(x, y)  # 調用子協程
    print(result2)

if __name__ == "__main__":
    start = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1,2))  # 同時執行兩個協程對象
    loop.close()
    print('Cost:{}'.format(time.time()-start))
# 輸出以下
>>> 執行compute
>>> 3
>>> 執行compute
>>> 3
>>> Cost:2.0158169269561768

能夠看到此例整體運行時間爲2秒,驗證在print_sum中是同步運行的,因此兩次執行子協程耗時加倍,若是想讓其依舊是1秒完成怎麼修改?

天然是將同步操做轉爲異步操做,也就是讓協程對象轉爲任務對象(Task)了,以下:

import asyncio
import time
async def compute(x,y):
    print('執行compute')
    await asyncio.sleep(1)
    return x+y

async def print_sum(x,y):
    task1 = asyncio.create_task(compute(x, y))
    task2 = asyncio.create_task(compute(x, y))
    await task1
    print(task1.result())
    await task2
    print(task2.result())

if __name__ == "__main__":
    start = time.time()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(print_sum(1,2))  
    loop.close()
    print('Cost:{}'.format(time.time()-start))
# 輸出以下
>>> 執行compute
>>> 執行compute
>>> 3
>>> 3
>>> Cost:1.0004889965057373

在此有個坑,你們在寫的時候注意避開,以下:

async def print_sum(x,y):
    await asyncio.create_task(compute(x, y))
    await asyncio.create_task(compute(x, y))
    # 這樣寫依然是同步運行,要將生成的對象賦給一個變量再進行await,即上個例子那樣

3、asyncio併發

在上面協程調用原理的幾個實例中咱們其實已經接觸到了部分併發的知識,下面來詳細講解一下asyncio的併發。

asyncio併發的幾種實現方式:

  1. asyncio.gather(*aws, loop=None, return_exceptions=False)`

    • aws:可等待對象(協程,Task,Future),若是某個可等待對象是協程,那麼asynico.gather內部會自動將其轉換爲Task並加入事件循環。若是全部可等待對象都成功完成,結果將一個由全部返回值聚合的列表,順序與aws順序一致。
    • return_exceptions當其爲默認值False的時候,若是aws中的可等待對象被取消,那麼會拋出一個CancelledError結束程序運行;若是將其設爲True的時候則不拋異常,程序正常運行,在返回結果的列表中將此可等待對象的結果設置爲concurrent.futures._base.CancelledError()
  2. asyncio.wait((aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

    • aws:可等待對象,併發運行aws指定的可等待對象並阻塞線程直到知足 return_when 指定的條件;

    • loop:這個參數能夠不用管它,根據官方文檔描述其將於python3.10中被去除;

    • timeout:支持小數和整數,若是設置了則將被應用於控制返回以前等待的最長秒數,並不會引起TimeoutError,若是超時發生則返回未完成的Task或Future;

    • return_when:指定函數的返回時機條件:

      FIRST_COMPLETED: 第一個可等待對象完成時返回;

      FIRST_EXCEPTION:第一個異常發生時返回,若是無異常則在全部可等待對象完成時返回;

      ALL_COMPLETED:默認,當全部可等待對象完成時返回

asyncio.gatherasyncio.wait的區別:

  1. 二者都用來作協程併發,其中asyncio.gather返回協程的運行結果,asyncio.wait返回兩個Task/Future的列表(完成的協程和未完成的協程兩個列表);

  2. asyncio.gather至關於全程黑盒,只告訴你協程的結果;

  3. asyncio.gather的能夠將Task分組,示例以下:

    import asyncio
    async def get_url(url):
       print('start get_url')
       await asyncio.sleep(2)
       print('end get_url')
       return 'Joshua'
    
    if __name__=="__main__":
       loop = asyncio.get_event_loop()
       # 將url分組
       group1 = [get_url('https://www.baidu.com') for _ in range(10)]
       group2= [get_url('https://www.baidu.com') for _ in range(10)]
       group1.cancel()  # 將第一組任務取消
       loop.run_until_complete(asyncio.gather(*group1,*group2))
  4. asyncio.wait會返回執行完成的和未完成的任務,你能夠在其運行中作一些操做,如上面的return_when返回實際條件來控制何時返回,若是你想要執行的結果須要自行從完成的任務裏面用result方法取;

  5. 獲取返回結果的示例以下:

    tasks = [Task1,Task2,Task3]
    # gather的獲取結果方法
    results = await asyncio.gather(*tasks)  # 注意此處gather方法不支持可迭代對象作參數,須要加*脫掉[]
    
    # wait的獲取結果方法
    # 第一種
    done,pending = await asyncio.wait(tasks)  # 標準格式的wait方法
    results = [task.result() for task in done]
    # 第二種
    await asyncio.wait(tasks)
    results = [task.result() for task in tasks]
相關文章
相關標籤/搜索