Python併發編程之學習異步IO框架:asyncio 中篇(十)

你們好,併發編程 進入第十章。
好了,今天的內容其實還挺多的,我準備了三天,到今天才整理完畢。但願你們看完,有所收穫的,能給小明一個贊。這就是對小明最大的鼓勵了。
爲了更好地銜接這一節,咱們先來回顧一下上一節的內容。python

上一節「」,咱們首先介紹了,如何建立一個協程對象.
主要有兩種方法編程

  • 經過async關鍵字,
  • 經過@asyncio.coroutine 裝飾函數。

而後有了協程對象,就須要一個事件循環容器來運行咱們的協程。其主要的步驟有以下幾點:併發

  • 將協程對象轉爲task任務對象
  • 定義一個事件循環對象容器用來存放task
  • 將task任務扔進事件循環對象中並觸發

爲了讓你們,對生成器和協程有一個更加清晰的認識,我還介紹了yieldasync/await的區別。app

最後,咱們還講了,如何給一個協程添加回調函數。dom

好了,用個形象的比喻,上一節,其實就只是講了協程中的單任務。哈哈,是否是還挺難的?但願你們必定要多看幾遍,多敲代碼,不要光看哦。async

那麼這一節,咱們就來看下,協程中的多任務函數

. 本文目錄

  • 協程中的併發
  • 協程中的嵌套
  • 協程中的狀態
  • gather與wait

. 協程中的併發

協程的併發,和線程同樣。舉個例子來講,就好像 一我的同時吃三個饅頭,咬了第一個饅頭一口,就得等這口嚥下去,才能去啃第其餘兩個饅頭。就這樣交替換着吃。oop

asyncio實現併發,就須要多個協程來完成任務,每當有任務阻塞的時候就await,而後其餘協程繼續工做。spa

第一步,固然是建立多個協程的列表。線程

# 協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)

# 協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

# 將協程轉成task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

第二步,如何將這些協程註冊到事件循環中呢。

有兩種方法,至於這兩種方法什麼區別,稍後會介紹。

  • 使用asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
  • 使用asyncio.gather()
# 千萬注意,這裏的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))

最後,return的結果,能夠用task.result()查看。

for task in tasks:
print('Task ret: ', task.result())

完整代碼以下

import asyncio

# 協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)

# 協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

# 將協程轉成task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))

for task in tasks:
print('Task ret: ', task.result())

輸出結果

Waiting:  1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s

. 協程中的嵌套

使用async能夠定義協程,協程用於耗時的io操做,咱們也能夠封裝更多的io操做過程,這樣就實現了嵌套的協程,即一個協程中await了另一個協程,如此鏈接起來。

來看個例子。

import asyncio

# 用於內部的協程函數
async def do_some_work(x):
print('Waiting: ', x)
await asyncio.sleep(x)
return 'Done after {}s'.format(x)

# 外部的協程函數
async def main():
# 建立三個協程對象
coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

# 將協程轉爲task,並組成list
tasks = [
asyncio.ensure_future(coroutine1),
asyncio.ensure_future(coroutine2),
asyncio.ensure_future(coroutine3)
]

# 【重點】:await 一個task列表(協程)
# dones:表示已經完成的任務
# pendings:表示未完成的任務
dones, pendings = await asyncio.wait(tasks)

for task in dones:
print('Task ret: ', task.result())

loop = asyncio.get_event_loop()
loop.run_until_complete(main())

若是這邊,使用的是asyncio.gather(),是這麼用的

# 注意這邊返回結果,與await不同

results = await asyncio.gather(*tasks)
for result in results:
print('Task ret: ', result)

輸出仍是同樣的。

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s

仔細查看,能夠發現這個例子徹底是由 上面「協程中的併發」例子改編而來。結果徹底同樣。只是把建立協程對象,轉換task任務,封裝成在一個協程函數裏而已。外部的協程,嵌套了一個內部的協程。

其實你若是去看下asyncio.await()的源碼的話,你會發現下面這種寫法

loop.run_until_complete(asyncio.wait(tasks))

看似沒有嵌套,實際上內部也是嵌套的。

這裏也把源碼,貼出來,有興趣能夠看下,沒興趣,能夠直接跳過。

# 內部協程函數
async def _wait(fs, timeout, return_when, loop):
assert fs, 'Set of Futures is empty.'
waiter = loop.create_future()
timeout_handle = None
if timeout is not None:
timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
counter = len(fs)

def _on_completion(f):
nonlocal counter
counter -= 1
if (counter <= 0 or
return_when == FIRST_COMPLETED or
return_when == FIRST_EXCEPTION and (not f.cancelled() and
f.exception() is not None)):
if timeout_handle is not None:
timeout_handle.cancel()
if not waiter.done():
waiter.set_result(None)

for f in fs:
f.add_done_callback(_on_completion)

try:
await waiter
finally:
if timeout_handle is not None:
timeout_handle.cancel()

done, pending = set(), set()
for f in fs:
f.remove_done_callback(_on_completion)
if f.done():
done.add(f)
else:
pending.add(f)
return done, pending

# 外部協程函數
async def wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED):
if futures.isfuture(fs) or coroutines.iscoroutine(fs):
raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
if not fs:
raise ValueError('Set of coroutines/Futures is empty.')
if return_when not in (FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED):
raise ValueError(f'Invalid return_when value: {return_when}')

if loop is None:
loop = events.get_event_loop()

fs = {ensure_future(f, loop=loop) for f in set(fs)}
# 【重點】:await一個內部協程
return await _wait(fs, timeout, return_when, loop)

. 協程中的狀態

還記得咱們在講生成器的時候,有說起過生成器的狀態。一樣,在協程這裏,咱們也瞭解一下協程(準確的說,應該是Future對象,或者Task任務)有哪些狀態。

Pending:建立future,還未執行
Running:事件循環正在調用執行任務
Done:任務執行完畢
Cancelled:Task被取消後的狀態

可手工 python3 xx.py 執行這段代碼,

import asyncio
import threading
import time

async def hello():
print("Running in the loop...")
flag = 0
while flag < 1000:
with open("F:\\test.txt", "a") as f:
f.write("------")
flag += 1
print("Stop the loop")

if __name__ == '__main__':
coroutine = hello()
loop = asyncio.get_event_loop()
task = loop.create_task(coroutine)

# Pending:未執行狀態
print(task)
try:
t1 = threading.Thread(target=loop.run_until_complete, args=(task,))
# t1.daemon = True
t1.start()

# Running:運行中狀態
time.sleep(1)
print(task)
t1.join()
except KeyboardInterrupt as e:
# 取消任務
task.cancel()
# Cacelled:取消任務
print(task)
finally:
print(task)

順利執行的話,將會打印 Pending -> Pending:Runing -> Finished 的狀態變化

假如,執行後 立馬按下 Ctrl+C,則會觸發task取消,就會打印 Pending -> Cancelling -> Cancelling 的狀態變化。

. gather與wait

還記得上面我說,把多個協程註冊進一個事件循環中有兩種方法嗎?

  • 使用asyncio.wait()
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
  • 使用asyncio.gather()
# 千萬注意,這裏的 「*」 不能省略
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(*tasks))

asyncio.gatherasyncio.wait 在asyncio中用得的比較普遍,這裏有必要好好研究下這兩貨。

仍是照例用例子來講明,先定義一個協程函數

import asyncio

async def factorial(name, number):
f = 1
for i in range(2, number+1):
print("Task %s: Compute factorial(%s)..." % (name, i))
await asyncio.sleep(1)
f *= i
print("Task %s: factorial(%s) = %s" % (name, number, f))

接收參數方式

asyncio.wait

接收的tasks,必須是一個list對象,這個list對象裏,存放多個的task。

它能夠這樣,用asyncio.ensure_future轉爲task對象

tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait(tasks))

也能夠這樣,不轉爲task對象。

loop = asyncio.get_event_loop()

tasks=[
factorial("A", 2),
factorial("B", 3),
factorial("C", 4)
]

loop.run_until_complete(asyncio.wait(tasks))

asyncio.gather

接收的就比較普遍了,他能夠接收list對象,可是 * 不能省略

tasks=[
asyncio.ensure_future(factorial("A", 2)),
asyncio.ensure_future(factorial("B", 3)),
asyncio.ensure_future(factorial("C", 4))
]

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(*tasks))

還能夠這樣,和上面的 * 做用一致,這是由於asyncio.gather()的第一個參數是 *coros_or_futures,它叫 非命名鍵值可變長參數列表,能夠集合全部沒有命名的變量。

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.gather(
factorial("A", 2),
factorial("B", 3),
factorial("C", 4),
))

甚至還能夠這樣

loop = asyncio.get_event_loop()

group1 = asyncio.gather(*[factorial("A" ,i) for i in range(1, 3)])
group2 = asyncio.gather(*[factorial("B", i) for i in range(1, 5)])
group3 = asyncio.gather(*[factorial("B", i) for i in range(1, 7)])

loop.run_until_complete(asyncio.gather(group1, group2, group3))

返回結果不一樣

asyncio.wait

asyncio.wait 返回donespendings

  • dones:表示已經完成的任務
  • pendings:表示未完成的任務

若是咱們須要獲取,運行結果,須要手工去收集獲取。

dones, pendings = await asyncio.wait(tasks)

for task in dones:
print('Task ret: ', task.result())

asyncio.gather

asyncio.gather 它會把值直接返回給咱們,不須要手工去收集。

results = await asyncio.gather(*tasks)

for result in results:
print('Task ret: ', result)

wait有控制功能

import asyncio
import random


async def coro(tag):
await asyncio.sleep(random.uniform(0.5, 5))

loop = asyncio.get_event_loop()

tasks = [coro(i) for i in range(1, 11)]


# 【控制運行任務數】:運行第一個任務就返回
# FIRST_COMPLETED :第一個任務徹底返回
# FIRST_EXCEPTION:產生第一個異常返回
# ALL_COMPLETED:全部任務完成返回 (默認選項)
dones, pendings = loop.run_until_complete(
asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED))
print("第一次完成的任務數:", len(dones))


# 【控制時間】:運行一秒後,就返回
dones2, pendings2 = loop.run_until_complete(
asyncio.wait(pendings, timeout=1))
print("第二次完成的任務數:", len(dones2))


# 【默認】:全部任務完成後返回
dones3, pendings3 = loop.run_until_complete(asyncio.wait(pendings2))

print("第三次完成的任務數:", len(dones3))

loop.close()

輸出結果

第一次完成的任務數: 1
第二次完成的任務數: 4
第三次完成的任務數: 5

快關注一下,成爲Python高手
快關注一下,成爲Python高手
相關文章
相關標籤/搜索