Python開發【異步】:asyncio

異步asyncio

asyncio是一個使用async / await語法編寫併發代碼的庫html

asyncio用做多個Python異步框架的基礎,這些框架提供高性能的網絡和Web服務器,數據庫鏈接庫,分佈式任務隊列等。python

asyncio一般很是適合IO綁定和高級 結構化網絡代碼。數據庫

 

asyncio提供了一組高級 API:express

此外,還有一些用於庫和框架開發人員的低級 API api

 

Conroutines

使用async / await語法聲明的協同程序是編寫asyncio應用程序的首選方法。例如,如下代碼片斷(須要Python 3.7+)打印「hello」,等待1秒,而後打印「world」:安全

import asyncio

async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')

asyncio.run(main())

# hello
# world

上面代碼等同於下面(不推薦使用基於生成器的協同程序的支持,並計劃在Python 3.10中刪除。服務器

import asyncio

@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')

asyncio.run(main())

asyncio實際等同於下面的工做(參數爲An asyncio.Future, a coroutine or an awaitable is required)網絡

import asyncio

@asyncio.coroutine
def main():
     print('hello')
     yield  from asyncio.sleep(1)
     print('world')

# asyncio.run(main())
loop = asyncio.events.new_event_loop()
asyncio.events.set_event_loop(loop)
loop.run_until_complete(main())

# hello
# world
 1     This function runs the passed coroutine, taking care of
 2     managing the asyncio event loop and finalizing asynchronous
 3     generators.
 4 
 5     This function cannot be called when another asyncio event loop is
 6     running in the same thread.
 7 
 8     If debug is True, the event loop will be run in debug mode.
 9 
10     This function always creates a new event loop and closes it at the end.
11     It should be used as a main entry point for asyncio programs, and should
12     ideally only be called once.
asyncio.run功能介紹

實際運行協程asyncio提供了三種主要機制:多線程

一、The asyncio.run()函數來運行頂層入口點「main()」函數(見上面的例子)併發

二、Awaiting on a coroutine 如下代碼片斷將在等待1秒後打印「hello」,而後在等待另外 2秒後打印「world」 

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"started at {time.strftime('%X')}")

    await say_after(1, 'hello')
    await say_after(2, 'world')

    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 11:54:48
# hello
# world
# finished at 11:54:51

三、asyncio.create_task()與asyncio同時運行協同程序功能Tasks讓咱們修改上面的例子並同時運行兩個say_after協同程序 :

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)

    await task1
    await task2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 14:27:22
# hello at 14:27:23
# world at 14:27:24
# finished at 14:27:24

 稍微改變一下形式,能夠理解的更多

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(f"{what} at {time.strftime('%X')}")

async def main():
    task1 = asyncio.create_task(
        say_after(1, 'hello'))
    task2 = asyncio.create_task(
        say_after(2, 'world'))
    print(f"started at {time.strftime('%X')}")
    # Wait until both tasks are completed (should take
    # around 2 seconds.)
    await asyncio.sleep(3)

    # await task1
    # await task2
    print(f"finished at {time.strftime('%X')}")

asyncio.run(main())

# started at 14:29:41
# hello at 14:29:42
# world at 14:29:43
# finished at 14:29:44

  

 Awaitables

咱們說若是一個對象能夠在表達式中使用,那麼它就是一個等待對象await許多asyncio API旨在接受等待。

有三種主要類型的等待對象:coroutinesTasks, and Futures.

Coroutines

Python coroutines are awaitables and therefore can be awaited from other coroutines:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

# 42

重要

在本文檔中,術語「coroutine」可用於兩個密切相關的概念:

  • 一個協程功能:一個功能;async def
  • 一個協程對象:經過調用協同程序函數返回的對象 

Tasks

任務用於調度協同程序併發。

當一個協程被包裝到一個Task中時,會像asyncio.create_task()同樣  conroutine自動安排很快運行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future是一個特殊的低級別等待對象,它表示異步操做最終結果

等待 Future對象時,它意味着協程將等到Future在其餘地方解析。

須要asyncio中的將來對象以容許基於回調的代碼與async / await一塊兒使用。

一般,不須要在應用程序級代碼中建立Future對象。

能夠等待有時經過庫和一些asyncio API公開的將來對象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

返回Future對象的低級函數的一個很好的例子是loop.run_in_executor()

 

Asyncio方法

一、運行asyncio程序 

 asyncio.runcoro*debug = False 

  此函數運行傳遞的協同程序,負責管理asyncio事件循環並最終肯定異步生成器

  當另外一個asyncio事件循環在同一個線程中運行時,沒法調用此函數。

  若是debugTrue,則事件循環將以調試模式運行。

  此函數始終建立一個新的事件循環並在結束時將其關閉。它應該用做asyncio程序的主要入口點,理想狀況下應該只調用一次。

  版本3.7中的新功能重要:此功能已臨時添加到Python 3.7中的asyncio中

 

二、建立任務 

asyncio.create_task(coro)

  將coro coroutine包裝成a Task 並安排執行。返回Task對象。

  任務在返回的循環中執行,若是當前線程中沒有運行循環get_running_loop(), RuntimeError則引起該任務

  Python 3.7中添加了此功能在Python 3.7以前,asyncio.ensure_future()可使用低級函數:

async def coro():
    ...

# In Python 3.7+
task = asyncio.create_task(coro())
...

# This works in all Python versions but is less readable
task = asyncio.ensure_future(coro())

 

三、sleeping 

coroutine asyncio.sleep(delayresult=None*loop=None)

  阻止 delay seconds.

  若是提供了result ,則在協程完成時將其返回給調用者。

  leep() 始終掛起當前任務,容許其餘任務運行。

  該loop 參數已被棄用,並定於去除在Python 3.10。

  協程示例每秒顯示當前日期5秒:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

  

四、同時運行任務

awaitable asyncio.gather(*awsloop=Nonereturn_exceptions=False)

  同時aws 序列中運行 awaitable objects

  若是在aws中任何awaitable 是協程,它將自動安排爲任務

  若是全部等待成功完成,則結果是返回值的彙總列表。結果值的順序對應於aws中的等待順序

  若是return_exceptionsFalse(默認),則第一個引起的異常會當即傳播到等待的任務gather()

  若是return_exceptionsTrue,異常的處理方式同樣成功的結果,並在結果列表彙總。

  若是gather()取消,全部提交的awaitables(還沒有完成)也被取消

  若是aws序列中的Task or Future取消,則將其視爲已引起CancelledError在這種狀況下不會取消gather() 呼叫這是爲了防止取消一個提交的Tasks/Futures 以致使其餘任務/期貨被取消。

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({i})...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")

async def main():
    # Schedule three calls *concurrently*:
    await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2)...
#     Task B: Compute factorial(2)...
#     Task C: Compute factorial(2)...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3)...
#     Task C: Compute factorial(3)...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4)...
#     Task C: factorial(4) = 24

 獲取返回結果,異常狀況

import asyncio

async def factorial(name, number):

    print(name)
    if name == 'A':
        return name
    elif name == 'B':
        raise SyntaxError(name)
    await asyncio.sleep(number)


async def main():
    # Schedule three calls *concurrently*:
    result = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
        return_exceptions=True
    )
    print(result)

asyncio.run(main())

# A
# B
# C
# ['A', SyntaxError('B'), None]

版本3.7中已更改:若是取消彙集自己,則不管return_exceptions如何,都會傳播取消

 

五、Shielding From Cancellation

awaitable asyncio.shield(aw*loop=None)

  Protect an awaitable object from being cancelled.

  If aw is a coroutine it is automatically scheduled as a Task.

  The statement:

res = await shield(something())

  等同於

res = await something()

  除非取消包含它的協程,不然something()不會取消運行的任務從觀點來看something(),取消沒有發生。雖然它的來電者仍然被取消,因此「等待」表達仍然提出了一個CancelledError

  若是something()經過其餘方式取消(即從內部取消)也會取消shield()

  若是但願徹底忽略取消(不推薦),則該shield()函數應與try / except子句結合使用,以下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

 

六、超時

coroutine asyncio.wait_for(aw, timeout, *, loop=None)

  Wait for the aw awaitable to complete with a timeout.

  If aw is a coroutine it is automatically scheduled as a Task.

  timeout能夠是None或等待的float或int秒數。若是超時None,將等到完成

  若是發生超時,它將取消任務並加註 asyncio.TimeoutError

  要避免該任務cancellation,請將其包裝shield()

  該函數將一直等到未來實際取消,所以總等待時間可能會超過超時

  若是等待被取消,則將來的aw也會被取消。

  該循環參數已被棄用,並定於去除在Python 3.10。

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

改變在3.7版本:AW被取消,因爲超時,wait_for等待AW被取消。之前,它asyncio.TimeoutError當即提出 

 

七、超時原語

coroutine asyncio.wait(aws*loop=Nonetimeout=Nonereturn_when=ALL_COMPLETED)

  同時運行aws中的等待對象 並阻塞,直到return_when指定的條件爲止

  若是在aws中任何等待的是協程,它將自動安排爲任務。wait()直接傳遞協同程序對象 已被棄用,由於它會致使 混亂的行爲

  返回兩組任務/期貨:(done, pending)

  用法:

done, pending = await asyncio.wait(aws)

  該循環參數已被棄用,並定於去除在Python 3.10。

  timeout(浮點數或整數),若是指定,可用於控制返回前等待的最大秒數。

  請注意,此功能不會引起asyncio.TimeoutError超時發生時未完成的期貨或任務僅在第二組中返回。

  return_when表示此函數什麼時候返回。它必須是如下常量之一:

不變 描述
FIRST_COMPLETED 當任何將來完成或取消時,該函數將返回。
FIRST_EXCEPTION 當任何將來經過引起異常完成時,函數將返回。若是沒有將來引起異常則等同於 ALL_COMPLETED
ALL_COMPLETED 全部期貨結束或取消時,該功能將返回。

  不像wait_for()wait()當發生超時不會取消期貨。   

  注意 wait()將協同程序自動調度爲任務,而後在 集合中返回隱式建立的任務對象。所以,如下代碼將沒法按預期方式工做:(done, pending)

async def foo():
    return 42

coro = foo()
done, pending = await asyncio.wait({coro})

if coro in done:
    # This branch will never be run!

  如下是如何修復上述代碼段:

async def foo():
    return 42

task = asyncio.create_task(foo())
done, pending = await asyncio.wait({task})

if task in done:
    # Everything will work as expected now.

  wait()不推薦直接傳遞協程對象。

 

八、 Scheduling From Other Threads

asyncio.run_coroutine_threadsafe(coroloop)

  將協程提交給給定的事件循環。線程安全的。

  返回a concurrent.futures.Future以等待另外一個OS線程的結果。

  此函數旨在從與運行事件循環的OS線程不一樣的OS線程調用。例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

  若是在協程中引起異常,則會通知返回的Future。它還能夠用於取消事件循環中的任務:

try:
    result = future.result(timeout)
except asyncio.TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

  請參閱 文檔併發和多線程部分。

  與其餘asyncio函數不一樣,此函數須要 顯式傳遞循環參數。

  3.5.1版中的新功能。

 

九、自省

asyncio.current_taskloop = None 

  返回當前正在運行的Task實例,或者None沒有正在運行的任務。

  若是loopNone get_running_loop()用來獲取loop。

  版本3.7中的新功能。

asyncio.all_tasksloop = None 

  返回Task循環運行的一組還沒有完成的對象。

  若是loopNoneget_running_loop()則用於獲取當前循環。

  版本3.7中的新功能。

 

任務對象

class asyncio.Task(coro,*,loop = None 

Future-like object that runs a Python coroutine. Not thread-safe.

任務用於在事件循環中運行協同程序。若是一個協程在Future上等待,則Task暫停執行協程並等待Future的完成。當Future 完成後,包裝協程的執行將恢復。

事件循環使用協做調度:事件循環一次運行一個任務。當一個Task等待完成Future時,事件循環運行其餘任務,回調或執行IO操做。

使用高級asyncio.create_task()功能建立任務,或低級別loop.create_task()或 ensure_future()功能。不鼓勵手動實例化任務。

要取消正在運行的任務,請使用該cancel()方法。調用它將致使Task將CancelledError異常拋出到包裝的協同程序中。若是在取消期間協程正在等待Future對象,則Future對象將被取消。

cancelled()可用於檢查任務是否被取消。True若是包裝的協程沒有抑制CancelledError異常而且實際上被取消,則該方法返回

asyncio.Task繼承自Future其全部API,除了Future.set_result()和 Future.set_exception()

任務支持該contextvars模塊。建立任務時,它會複製當前上下文,而後在複製的上下文中運行其協程。

版本3.7中已更改:添加了對contextvars模塊的支持

cancel

  請求取消任務。

  這會安排CancelledError在事件循環的下一個循環中將異常拋入包裝的協程。

  協程則有機會經過抑制異常與清理,甚至拒絕請求try... ... ... ... ... ... 塊。所以,不一樣於不保證任務將被取消,儘管徹底抑制取消並不常見,而且積極地不鼓勵。exceptCancelledErrorfinallyFuture.cancel()Task.cancel()

  如下示例說明了協同程序如何攔截取消請求:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled
   True若是任務被 取消,則返回。
  請求 取消時取消任務,  cancel()而且包裝的協同程序將 CancelledError異常傳播 到其中。
done
   True若是任務 完成則返回。
  一個任務 完成時,包裹協程要麼返回的值,引起異常,或者任務被取消。
result
  返回任務的結果。
  若是任務 完成,則返回包裝協程的結果(或者若是協程引起異常,則從新引起該異常。)
  若是已 取消任務,則此方法會引起 CancelledError異常。
  若是Task的結果尚不可用,則此方法會引起 InvalidStateError異常。
exception
  返回Task的例外。
  若是包裝的協同程序引起異常,則返回異常。若是包裝的協程正常返回,則此方法返回 None
  若是已 取消任務,則此方法會引起  CancelledError異常。
  若是還沒有 完成任務,則此方法會引起  InvalidStateError異常。
add_done_callback回調*上下文=無
  添加要在任務 完成時運行的回調。
  此方法僅應在基於低級回調的代碼中使用。 
  有關 Future.add_done_callback() 詳細信息,請參閱文檔。
remove_done_callback回調
  從回調列表中刪除 回調
  此方法僅應在基於低級回調的代碼中使用。
  有關 Future.remove_done_callback() 詳細信息,請參閱文檔。
get_stack*limit = None 
  返回此任務的堆棧幀列表。
  若是未完成包裝的協同程序,則會返回掛起它的堆棧。若是協程已成功完成或被取消,則返回一個空列表。若是協程被異常終止,則返回回溯幀列表。
  幀始終從最舊到最新排序。
  對於掛起的協程,只返回一個堆棧幀。
  可選的 limit參數設置要返回的最大幀數; 默認狀況下,返回全部可用的幀。返回列表的排序取決因而返回堆棧仍是返回:返回堆棧的最新幀,但返回最舊的回溯幀。(這與回溯模塊的行爲相匹配。)
print_stack*limit = Nonefile = None 
  打印此任務的堆棧或回溯。
  這會爲檢索到的幀生成相似於回溯模塊的輸出 get_stack()
  該 極限參數傳遞給 get_stack()直接。
  的 文件參數是其中輸出被寫入的I / O流; 默認輸出寫入 sys.stderr
classmethod all_tasks(loop = None 
  返回一組事件循環的全部任務。
  默認狀況下,返回當前事件循環的全部任務。若是是 loop None,則該 get_event_loop()函數用於獲取當前循環。
   不推薦使用此方法,將在Python 3.9中刪除。請改用此 asyncio.all_tasks()功能。
classmethod current_task(loop = None 
  返回當前正在運行的任務或 None
  若是是 loop None,則該 get_event_loop()函數用於獲取當前循環。
   不推薦使用此方法,將在Python 3.9中刪除。請改用此 asyncio.current_task()功能。

 其餘

一、async for 運用

import asyncio


class AsyncIter:
    def __init__(self, items):
        self.items = items

    async def __aiter__(self):
        for item in self.items:
            await asyncio.sleep(1)
            yield item


async def print_iter(things):
    async for item in things:
        print(item)


if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    things = AsyncIter([1, 2, 3])
    loop.run_until_complete(print_iter(things))
    loop.close()

  

 

 

資料

Python異步IO實現全過程1 https://mp.weixin.qq.com/s/fJaXmfHfYEk6XL2y8NmKmQ

Python異步IO實現全過程2 https://mp.weixin.qq.com/s/RjDh7AITty92jxC8jIOiPA

Python異步IO實現全過程3 https://mp.weixin.qq.com/s/vlH_2S2JIJpf3N0WRNcIJQ

相關文章
相關標籤/搜索