Python 協程模塊 asyncio 使用指南

Python 協程模塊 asyncio 使用指南python

前面咱們經過5 分鐘入門 Python 協程瞭解了什麼是協程,協程的優勢和缺點和如何在 Python 中實現一個協程。沒有看過的同窗建議去看看。這篇文章,將再也不對理論性的東西作過多的解說。而是傾向於 asyncio 的使用上,另外爲了保證文章時效性這裏咱們使用 Python3.8 來進行對後面內容的操做。git

協程的演變

其實早在 Python3.4 的時候就有協程,當時的協程是經過 @asyncio.coroutine 和 yeild from 實現的。在一些很老教程中你可能看到的是下面這種形式:編程

import asyncio

@asyncio.coroutine
def print_hello():
    print("Hello world!")
    r = yield from asyncio.sleep(1)
    print("Hello again!")

# 建立並獲取EventLoop:
loop = asyncio.get_event_loop()
# 執行協程
loop.run_until_complete(print_hello())
loop.close()

由於如今幾乎沒有人這樣寫了因此僅做爲了解便可。
而後到了 Python3.5 引入了 async/await 語法糖,一直到如今Python3.8 都是用這種形式來表示協程,示例以下。api

import asyncio

async def print_hello():
     print("Hello world!")
     await asyncio.sleep(1)
     print("Hello again!")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    try:
        print("開始運行協程")
        coro = print_hello()
        print("進入事件循環")
        loop.run_until_complete(coro)
    finally:
        print("關閉事件循環")
        loop.close()

這種是目前應用範圍最廣的,能夠看到比以前的代碼舒服了很多,不用再使用裝飾器的形式了。而後就到了 Python3.7 和 Python3.8 ,協程發生了不少細小的變化,可是最大的一個變化就是,啓動協程的方法變簡單了,一句就能夠搞定,不用再像上面那樣建立循環而後再仍到事件循環去執行了。安全

import asyncio

async def print_hello():
     print("Hello world!")
     await asyncio.sleep(1)
     print("Hello again!")

if __name__ == '__main__':
        print("開始運行協程")
        asyncio.run(print_hello())
        print("進入事件循環")

怎麼樣是否是代碼更少了,啓動協程更簡單了。因此這也正是咱們使用 3.8 做爲本教程的 Python 版,與時俱進嘛。網絡

Asyncio 的組成部分

根據目前的官方文檔,總的來講分爲了兩部分:高層級 API 和低層級API。
首先看高層級 API 也是接下來重點要講的。數據結構

高層級API

  • 協程對象和 Tasks 對象
  • 數據流
  • 同步源語
  • 子進程
  • 隊列
  • 異常

低層級API

  • 事件循環
  • Futures 對象
  • 傳輸和協議
  • 策略
  • 平臺支持

上面列出了這麼多的項目咱們怎麼去選擇本身所須要的呢,總的來講對於剛入門的新手或者只是寫一個本身用的程序通常都只會用到高級 API 的部分,這部分就屬於開箱即用的那種,對於高級用戶好比框架開發者,每每能夠須要去適應各類須要,須要從新改寫一些內部的結構,這個時候就須要用到低層級的 API,可是這兩個層級呢只能是一個大概方向吧,主要是方便 API 的查看,下面呢我將圍繞者高層級API和低層級API在平常實際工做中常常用到的內容作一些講解。多線程

瞭解幾個概念

在學習 asyncio 以前須要知道這樣的幾個概念。架構

事件循環

事件循環是一種處理多併發量的有效方式,在維基百科中它被描述爲「一種等待程序分配事件或消息的編程架構」,咱們能夠定義事件循環來簡化使用輪詢方法來監控事件,通俗的說法就是「當A發生時,執行B」。所謂的事件,其實就是函數。事件循環,就是有一個隊列,裏面存放着一堆函數,從第一個函數開始執行,在函數執行的過程當中,可能會有新的函數繼續加入到這個隊列中。一直到隊列中全部的函數被執行完畢,而且不再會有新的函數被添加到這個隊列中,程序就結束了。併發

Future

Future 是一個數據結構,表示還未完成的工做結果。事件循環能夠監視Future 對象是否完成。從而容許應用的一部分等待另外一部分完成一些工做。
簡答說,Future 就是一個類,用生成器實現了回調。

Task

Task 是 Future 的一個子類,它知道如何包裝和管理一個協程的執行。任務所需的資源可用時,事件循環會調度任務容許,並生成一個結果,從而能夠由其餘協程消費。通常操做最多的仍是 Task。用Task來封裝協程,給本來沒有狀態的協程增長一些狀態。

awaitable objects(可等待對象)

若是一個對象能夠用在 wait 表達式中,那麼它就是一個可等待的對象。在 asyncio 模塊中會一直提到這個概念,其中協程函數,Task,Future 都是 awaitable 對象。
用於 await 表達式中的對象。能夠是的 coroutine 也能夠是實現了 __await__() 方法的對象,參見 PEP 492。類比於 Iterable 對象是 Generator 或實現了__iter__() 方法的對象。

**object.__await__(self)**

必須返回生成器,asyncio.Future 類也實現了該方法,用於兼容 await 表達式。

而 Task 繼承自 Future,所以 awaitable 對象有三種:coroutines、Tasks 和 Futures。

await 的目的:

  • 獲取協程的結果
  • 掛起當前協程,將控制交由事件循環,切換到其餘協程,而後等待結果,最後恢復協程繼續執行

啓動一個協程

如今咱們使用 async/await 語法來聲明一個協程。 代碼以下

import asyncio

async def main():
     print('hello')
     await asyncio.sleep(1)
     print('world')

if __name__ == '__main__':
    asyncio.run(main())

asyncio.run 只能用來啓動程序入口協程,反過來你在程序中若是使用asyncio.run 就會出錯,直接咱們提到對於其餘的協程經過await鏈來實現,這裏也是同樣的。下面說下代碼的含義,首先啓動 main 這個協程,main 方法就是先打印 hello,而後在打印過程當中經過使用 asyncio.sleep 來等待1秒,以後再打印 world。前面咱們提到用協程就意味着咱們要一直使用非阻塞的代碼,才能達到速度提高,因此這裏咱們用了非阻塞版的 time.sleep 即 asyncio.sleep 。

協程中調用協程

以前咱們提到了在協程中,可使用 await 來調用一個協程。
就像下面的代碼:

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    #使用f-string拼接字符串
    print(f"開始運行 {time.strftime('%X')}")

    child1=await say_after(1, 'hello') #經過await調用協程,而後接收一下返回值
    child2=await say_after(2, 'world')
    print("child1",child1)
    print("child2",child2)

    print(f"結束運行 {time.strftime('%X')}")
if __name__ == '__main__':
      asyncio.run(main())

運行結果:

開始運行 11:17:26
hello
world
child1 None
child2 None
結束運行 11:17:29
[Finished in 3.1s]

代碼是沒什麼問題,正常運行。可是通常狀況下咱們用到更多的是下面的方式。將協程封裝爲 Task 讓本來沒有狀態標示的協程添加上狀態 。
咱們能夠經過 asyncio.create_task 方法來實現。

asyncio.create_task

create_task(在3.6版本中須要使用低層級的API asyncio.ensure_future。)是 3.7之後加入的語法,做用是將協程包裝爲一個任務(Task),相比3.6版本的ensure_future可讀性提升。
將上面的代碼作以下修改。

import asyncio
import time

async def say_after(delay, what):
    await asyncio.sleep(delay)
    print(what)

async def main():
    print(f"開始運行 {time.strftime('%X')}")

    child1=asyncio.create_task(say_after(1, 'hello')) #經過await調用協程,而後接收一下返回值
    child2=asyncio.create_task(say_after(2, 'world'))
    print("調用任務child1前",child1)
    print("調用任務child2前",child2)
    await child1
    await child2
    print("調用任務child1後",child1)
    print("調用任務child2前",child2)

    print(f"結束運行 {time.strftime('%X')}")
if __name__ == '__main__':
      asyncio.run(main())

運行結果以下:

開始運行 11:37:54
調用任務child1前 <Task pending name='Task-2' coro=<say_after() running at /Users/chennan/Desktop/2019/aiochatuse/hello.py:4>>
調用任務child2前 <Task pending name='Task-3' coro=<say_after() running at /Users/chennan/Desktop/2019/aiochatuse/hello.py:4>>
hello
world
調用任務child1後 <Task finished name='Task-2' coro=<say_after() done, defined at /Users/chennan/Desktop/2019/aiochatuse/hello.py:4> result=None>
調用任務child2前 <Task finished name='Task-3' coro=<say_after() done, defined at /Users/chennan/Desktop/2019/aiochatuse/hello.py:4> result=None>
結束運行 11:37:56

能夠發現,咱們的結果中多了"<Task pending ..."和"<Task finised ..."幾行語句。這就是 Task 的一個狀態變化,知道狀態的好處就是咱們能夠根據任務的狀態作進一步操做,不像協程函數那樣沒有狀態標示,固然 Task 的狀態不僅有這些。
前面說到 Task 是 Future 的子類,因此 Tas k擁有 Future 的一些狀態。

Future的狀態

大概有以下幾種:

  • Pending
  • Running
  • Done
  • Cancelled
    建立 future 的時候,task 爲 pending,事件循環調用執行的時候固然就是 running,調用完畢天然就是 done,若是須要中止事件循環,就須要先把 task 取消,狀態爲 cancel。這裏先作了解知道 Task 是有狀態的就夠了。

    併發運行任務

    一系列的協程能夠經過 await 鏈式的調用,可是有的時候咱們須要在一個協程裏等待多個協程,好比咱們在一個協程裏等待 1000 個異步網絡請求,對於訪問次序有沒有要求的時候,就可使用另外的關鍵字asyncio.wait 或 asyncio.gather 來解決了。

    asyncio.gather

    使用方法
asyncio.gather(*aws, loop=None, return_exceptions=False)¶

也就是說使用 gather 語句併發協程,就得用 await 去執行它。這個方法能夠接收三個參數,第一個 aws,
aws 通常是一個列表,若是裏面的元素是 awaitable 類型,在運行的時候它將自動被包裝成 Task,gather 會根據 aws 中元素添加的順序。順序執行並返回結果列表。
第二個 loop 能夠傳入一個事件循環對象,通常不用管,最後一個return_exceptions 默認是 False,若是 return_exceptions 爲 True,異常將被視爲成功結果,而後添加到結果列表中。
下面是一個10個數字並輸出例子。

import asyncio

async def foo(num):
    return num
async def main():
    coro = [asyncio.create_task(foo(i)) for i in range(10) ]
    done= await asyncio.gather(*coro)
    for i in done:
        print(i)
    

if __name__ == '__main__':
    asyncio.run(main())

運行以後結果以下

0
1
2
3
4
5
6
7
8
9

gather 返回的結果是一個列表,迭代這個列表能夠看到任務依次輸出。
gather 一般被用來階段性的一個操做,作完第一步才能作第二步,好比下面這樣

import asyncio

import time


async def step1(n, start):
    await asyncio.sleep(n)
    print("第一階段完成")
    print("此時用時", time.time() - start)
    return n


async def step2(n, start):
    await asyncio.sleep(n)
    print("第二階段完成")
    print("此時用時", time.time() - start)
    return n


async def main():
    now = time.time()
    result = await asyncio.gather(step1(5, now), step2(2, now))
    for i in result:
        print(i)
    print("總用時", time.time() - now)


if __name__ == '__main__':
   asyncio.run(main())

輸出內容

第二階段完成
此時用時 2.0041821002960205
第一階段完成
此時用時 5.0009942054748535
5
2
總用時 5.001508951187134

能夠經過上面結果獲得以下結論:
1.step1 和 step2 是並行運行的。
2.gather 會等待最耗時的那個完成以後才返回結果,耗時總時間取決於其中任務最長時間的那個。

asyncio.task

咱們先看一下 wait 的語法結構:

asyncio.wait(aws, *, loop=None, timeout=None, return_when=ALL_COMPLETED)¶

wait 一共有 4 個參數,第一個參數 aws,通常是一個任務列表。
第二個*以後的都是強制關鍵字參數,即loop,timeout,return_when。
loop通gather的參數是一個事件循環,該參數計劃在Python 3.10中刪除。
timeout能夠指定這組任務的超時時間,請注意,此函數不會引起asyncio.TimeoutErro, 超時的時候會返回已完成的任務。

return_when能夠指定什麼條件下返回結果,默認是因此任務完成就返回結果列表。return_when的具體參數看下面的表格:
|參數名|含義|
|----|----|
|FIRST_COMPLETED|任何一個future完成或取消時返回|
| FIRST_EXCEPTION |任何一個future出現錯誤將返回,若是出現異常等價於ALL_COMPLETED|
|ALL_COMPLETED|當全部任務完成或者被取消時返回結果,默認值。|

wait返回的結果是一個元組,第一部分是完成的任務,第二部分是準備中的任務。

done, pending = await asyncio.wait(aws)

其中done表示是完成的任務,能夠經過迭代獲取每一個任務。
pending表示的是還沒執行的任務。
下面看一個例子來進一步瞭解

import asyncio

async def foo(num):
    await asyncio.sleep(0.99991)
    return num
async def main():
    #coro = foo()
    coro = [asyncio.create_task(foo(i)) for i in range(10) ]
    done, pending = await asyncio.wait(coro,timeout=1,return_when="ALL_COMPLETED")
    
    for coro in done:
        print(coro.result())
    print("pending",pending)
    for item in pending:
         print(item)    

if __name__ == '__main__':
    asyncio.run(main())

運行結果以下:

2
5
3
0
6
4
1
7
pending {<Task pending name='Task-10' coro=<foo() running at /Users/chennan/Desktop/2019/aiochatuse/waitdemo.py:4> wait_for=<Future finished result=None>>, <Task pending name='Task-11' coro=<foo() running at /Users/chennan/Desktop/2019/aiochatuse/waitdemo.py:4> wait_for=<Future finished result=None>>}
<Task pending name='Task-10' coro=<foo() running at /Users/chennan/Desktop/2019/aiochatuse/waitdemo.py:4> wait_for=<Future finished result=None>>
<Task pending name='Task-11' coro=<foo() running at /Users/chennan/Desktop/2019/aiochatuse/waitdemo.py:4> wait_for=<Future finished result=None>>

首先說代碼,使用wait實現併發的程序是無序的因此咱們看到數字不是一次出現的。這個是和gather的不一樣之處,另外在返回的參數上也有差異,wait返回兩個參數done和pending。

上面的代碼指定了一個timeout,由於任務沒在指定時間完成因此就致使,只有完成的任務輸出告終果,沒有完成的部分能夠看到它們的狀態是pending。

總結

最後咱們,總結一下wait和gather的相同之處和不一樣之處:
相同之處:均可以完成多個任務的併發操做。
不一樣以外:gather適合按照順序去作的任務,或者按照階段去作的任務,返回的是結果列表,而wait不講究任務的順序,這個在作爬蟲中常用到,而後wait能夠返回2個結果,done和pending。

任務完成時處理

asyncio.as_completed

as_complete是一個生成器,會管理指定的一個任務列表,並生成他們的結果。每一個協程結束運行時一次生成一個結果。與wait同樣,as_complete不能保證順序,不過執行其餘動做以前沒有必要等待因此後臺操做完成。

咱們看下這個函數都有哪些參數

asyncio.as_completed(aws, *, loop=None, timeout=None)

和前面的wait相似,第一個參數awas,而後loop,最後timeout,須要注意的是timeout若是指定了,那麼在指定時間沒完成的話會拋出asyncio.exceptions.TimeoutError異常。
下面看一個例子:

import asyncio
import time


async def foo(n):
    print(f'等待{n}秒')
    await asyncio.sleep(n)
    return n


async def main():
    coroutine1 = foo(1)
    coroutine2 = foo(2)
    coroutine3 = foo(4)

    tasks = [asyncio.create_task(coroutine1),asyncio.create_task(coroutine2),asyncio.create_task(coroutine3)]
    for task in asyncio.as_completed(tasks):
        result = await task
        print(f'獲取返回結果: {result}')


if __name__ == '__main__':
    now = lambda : time.time()
    start = now()
    asyncio.run(main())
    print(now() - start)

輸出結果

等待1秒
等待2秒
等待4秒
獲取返回結果: 1
獲取返回結果: 2
獲取返回結果: 4
4.002715826034546

能夠看出整個執行過程總用時取決於 等待時間最長的那個即4秒。
接下來,對上面的代碼稍做修改,

for task in asyncio.as_completed(tasks):

改成

for task in asyncio.as_completed(tasks,timeout=2):

其餘地方不變,改完運行以後會看到上面提到的錯誤。

等待1秒
等待2秒
等待4秒
獲取返回結果: 1
Traceback (most recent call last):
  File "/Users/chennan/Desktop/2019/aiochatuse/ascomplete.py", line 25, in <module>
    asyncio.run(main())
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 589, in run_until_complete
    return future.result()
  File "/Users/chennan/Desktop/2019/aiochatuse/ascomplete.py", line 18, in main
    result = await task
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 570, in _wait_for_one
    raise exceptions.TimeoutError
asyncio.exceptions.TimeoutError

取消任務的時候保證其餘協程運行完畢

在取消任務的時候存在一個問題,首先先看一段代碼:

import asyncio

async def coro():
    print('開始休眠')
    await asyncio.sleep(2)
    print('結束休眠')

async def cancel_it(some_task):
    await asyncio.sleep(0.5)
    some_task.cancel()
    print('取消some_task任務')

async def main():
    real_task = asyncio.create_task(coro())
    await cancel_it(real_task)
    await real_task

if __name__ == '__main__':
    asyncio.run(main())

運行以後你會看到以下結果

開始休眠
取消some_task任務
Traceback (most recent call last):
  File "/Users/chennan/Desktop/2019/aiochatuse/shielddemo.py", line 24, in <module>
    asyncio.run(main())      
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 589, in run_until_complete
    return future.result()
asyncio.exceptions.CancelledError

下面說一下代碼中的邏輯,在main協程中將coro協程封裝爲任務real_task,而後cancel_it方法作了一個取消任務的邏輯some_task.cancel()。並打印一句話。而後經過await去運行real_task方法,執行代碼以後看到上面的結果。出現了asyncio.exceptions.CancelledError錯誤,同時看到coro只打印了一個開始休眠,後面的結束休眠沒有打印。也就是說咱們在取消一個任務的時候,裏面對於的協程也被取消了。若是咱們想在取消任務以後協程還能順利執行完,就須要用到另一個函數shield.

asyncio.shield

該方法的做用是,在執行cancel取消一個task以後,task裏面的協程仍然能夠執行結束,不會像上面的coro那樣出現錯誤。

asyncio.shield(aw, *, loop=None)

aw表示須要傳入一個 Task。

接下來咱們就使用這個方法對上面的例子作一個修改。
從代碼中體會它的做用

import asyncio

async def coro():
    print('開始休眠')
    await asyncio.sleep(2)
    print('結束休眠')

async def cancel_it(some_task):
    await asyncio.sleep(0.5)
    some_task.cancel()
    print('取消some_task任務')

async def main():
    real_task = asyncio.create_task(coro())
    shield = asyncio.shield(real_task)
    await cancel_it(shield)
    await real_task
if __name__ == '__main__':
    asyncio.run(main())

運行以後的結果

開始休眠
取消some_task任務
結束休眠

能夠看到儘管some_task任務被取消,可是coro仍然成功的打印了最好的「結束休眠」。經過上面的例子我想你們應該知道shield的做用了。

超時等待

有時候須要等待一個任務完成以後再進行下一個,可是有的時候並不須要運行完就返回。
這個時候可使用wait_for

asyncio.wait_for

該方法的語法以下:

asyncio.wait_for(aw, timeout, *, loop=None)

aw是一個任務,timeout能夠指定超時時間。若是發生超時,它將取消該任務並引起asyncio.TimeoutError,此時爲了保證任務中協程完成可使用上面說的 shield。

import asyncio

async def foo():
     await asyncio.sleep(1)
     print("in foo")
     
async def eternity():
    # Sleep for one hour
    await foo()
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(asyncio.shield(eternity()), timeout=1.0)
    except asyncio.TimeoutError:
        print('timeout!')
if __name__ == '__main__':
    asyncio.run(main())

輸出

in foo
timeout!

按照上面的經驗可知道,若是咱們把asyncio.shield去掉以後,「in foo」就沒法輸出了。

協程配合線程

asyncio.run_coroutine_threadsafe

該方法的語法以下:

asyncio.run_coroutine_threadsafe(coro, loop)

其實在協程中也可使用多線程,有時候咱們須要在主線程中啓動一個子線程去作別的任務,這個時候咱們就要用到下面的方法了,先上一個流暢的Python中的代碼。

import time
import asyncio
from  threading import Thread

now = lambda: time.time()


def start_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


async def do_some_work(x):
    print(f'Waiting {x}')
    await asyncio.sleep(x)
    print(f'Done after {x}s')


def more_work(x):
    print(f'More work {x}')
    time.sleep(x)
    print('Finished more work {x}')


start = now()
# 主線程中建立一個 new_loop
new_loop = asyncio.get_event_loop()
# 建立子線程 在其中開啓無限事件循環
t = Thread(target=start_loop, args=(new_loop,))
t.start()
print(f'TIME: {time.time() - start}')

# 在主線程中新註冊協程對象
# 這樣便可在子線程中進行事件循環的併發操做 同時主線程又不會被 block 
# 一共執行的時間大概在 6 s 左右 
asyncio.run_coroutine_threadsafe(do_some_work(6), new_loop)
asyncio.run_coroutine_threadsafe(do_some_work(4), new_loop)

上述的例子,主線程中建立一個new_loop,而後在另外的子線程中開啓一個無限事件循環。主線程經過run_coroutine_threadsafe新註冊協程對象。這樣就能在子線程中進行事件循環的併發操做,同時主線程又不會被block。一共執行的時間大概在6s左右。

同步原語

儘管asyncio應用一般做爲單線程運行,不過仍被構建爲併發應用。因爲I/O以及其餘外部事件的延遲和中斷,每一個協程或任務可能按一種不可預知的順序執行。爲了支持安全的併發執行,asyncio包含了threading和multiprocessing模塊中的一些底層原語的實現。
這裏介紹兩個常常用到的例子

隊列(Queue)

asyncio.Queue爲協程提供了一個先進先出的數據結構,這與線程的queue.Queue或進程的multiprocessing.Queue很相似,下面先看一個簡單的例子,它是一個非阻塞的隊列。

import asyncio
from asyncio import Queue

queue=Queue()
async def start():
        [queue.put_nowait(i) for i in range(1, 10)]
        await asyncio.create_task(work()) #put_nowait表示放入元素

async def work():
    try:
        while not queue.empty():#判斷隊列的元素是否爲空
            num = queue.get_nowait()#獲取元素
            print(f"獲取數字:{num}")
            queue.task_done()#告訴隊列該任務處理完。
    except asyncio.CancelledError:
        pass

if __name__ == '__main__':
    asyncio.run(start())

輸出結果:

獲取數字:1
獲取數字:2
獲取數字:3
獲取數字:4
獲取數字:5
獲取數字:6
獲取數字:7
獲取數字:8
獲取數字:9

在作爬蟲的時候對於url的處理,常常會用到隊列的操做。另一個要說的同步原語就是信號量。

信號量(Semaphore)

簡單說下什麼是信號量,咱們用停車場和車進行比喻。一個停車場一共就5個車位,因此咱們知道能夠同時容納最多5輛車,這五個車位就是信號量。
而後說信號量的行爲,當有車離開停車場的時候外面的車就會進來補,好比有2輛車離開,那麼就能夠再進來2輛車,依次類推,上面這個過程就是描述了信號量這個東西。下面咱們看如何在程序中使用。
asyncio.Semaphore模塊就是一個維持併發量的模塊,咱們用它起到一個限流的效果。首先來一段代碼。

import asyncio

sem=asyncio.Semaphore(3) #信號量指定爲3

async def branch(num):
    async with sem:  #經過異步上下文關鍵子控制併發量
        print(f"獲取當前數字:{num}")
        await asyncio.sleep(0.5)


async def main():
     
     tasks=[asyncio.create_task(branch(i)) for i in range(10)] #將協程封裝成任務共10個
     await asyncio.wait(tasks) #執行這些任務
      
if __name__ == '__main__':
    asyncio.run(main())

執行以後你會發現

獲取當前數字:0
獲取當前數字:1
獲取當前數字:2
Task exception was never retrieved
future: <Task finished name='Task-11' coro=<branch() done, defined at /Users/chennan/Desktop/2019/aiochatuse/semaphoredemo.py:26> exception=RuntimeError("Task <Task pending name='Task-11' coro=<branch() running at /Users/chennan/Desktop/2019/aiochatuse/semaphoredemo.py:27> cb=[_wait.<locals>._on_completion() at /Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py:478]> got Future <Future pending> attached to a different loop")>

關鍵就是 attached to a different loop,這個地方說是當前的事件循環發生了改變,這個問題在Python3.6的時候是不會出現的。
爲何3.8出錯了,這是由於
個人信號量沒有在循環內建立。也就是在asyncio.run()建立的循環以外建立了它們,所以它們使用events.get_event_loop()這就致使了新的事件循環產生。 asyncio.run()建立一個新循環,而後在一個循環中建立的future不能在另外一個循環中使用。因此問題就明確了咱們須要在循環以內建立。也就是咱們須要定義一個全局變量,而後在主循環內部給其賦值,看到這,可能你們想到了global,Python 3.7 增長了上下文變量 Context Variables,至於爲何不用全局變量,由於可能會被其餘協程修改,不安全,在這裏也可使用。
因此咱們的代碼變成了下面這個樣子

import asyncio
from contextvars import ContextVar

concurrent=ContextVar("concurrent")#定義全局上下文管理器

async def branch(num):
    sem=concurrent.get()#獲取上下文關鍵字
    async with sem:
        print(f"獲取當前數字:{num}")
        await asyncio.sleep(0.5) #爲了看到明顯的效果

async def main():
     concurrent.set(asyncio.Semaphore(3)) #上下文管理器賦值
     tasks=[asyncio.create_task(branch(i)) for i in range(10)]
     await asyncio.wait(tasks)
      
if __name__ == '__main__':
    asyncio.run(main())

而後咱們再次輸出

獲取當前數字:0
獲取當前數字:1
獲取當前數字:2
獲取當前數字:3
獲取當前數字:4
獲取當前數字:5
獲取當前數字:6
獲取當前數字:7
獲取當前數字:8
獲取當前數字:9

能夠看到程序每隔3組輸出一次,這就達到了咱們想要的效果了。

後記

到目前爲止,asyncio 經常使用的操做就是上面這些了,關於更多的asyncio用法你們能夠參考官網的api文檔學習,同時學習這些內容,也是爲了個人後續文章aiohttp異步爬蟲模塊打下基礎,但願你們可以多動手實踐,加深對代碼的影響。後續我將和你們一塊兒走入異步爬蟲aiohttp的實踐部分,敬請期待!

相關文章
相關標籤/搜索