在前一篇《一文完全搞懂Python可迭代(Iterable)、迭代器(Iterator)和生成器(Generator)的概念》 的文中,知道生成器(Generator
)可由如下兩種方式定義:html
yield
定義的函數在Python
早期的版本中協程也是經過生成器來實現的,也就是基於生成器的協程(Generator-based Coroutines)。在前一篇介紹生成器的文章末尾舉了一個生產者-消費者的例子,就是基於生成器的協程來實現的。python
def producer(c):
n = 0
while n < 5:
n += 1
print('producer {}'.format(n))
r = c.send(n)
print('consumer return {}'.format(r))
def consumer():
r = ''
while True:
n = yield r
if not n:
return
print('consumer {} '.format(n))
r = 'ok'
if __name__ == '__main__':
c = consumer()
next(c) # 啓動consumer
producer(c)
複製代碼
看了這段代碼,相信不少初學者和我同樣對基於生成器的協程實現其實很難立刻就可以根據業務寫出本身的協程代碼。Python
實現者們也注意到這個問題,由於它太不Pythonic
了。而基於生成器的協程也將被廢棄,所以本文將重點介紹asyncio
包的使用,以及涉及到的一些相關類概念。
注:我使用的Python
環境是3.7。程序員
協程(Coroutine
)是在線程中執行的,可理解爲微線程,但協程的切換沒有上下文的消耗,它比線程更加輕量些。一個協程能夠隨時中斷本身讓另外一個協程開始執行,也能夠從中斷處恢復並繼續執行,它們之間的調度是由程序員來控制的(能夠看本文開篇處生產者-消費者的代碼)。併發
在Python
3.5+版本新增了aysnc
和await
關鍵字,這兩個語法糖讓咱們很是方便地定義和使用協程。
在函數定義時用async
聲明就定義了一個協程。異步
import asyncio
# 定義了一個簡單的協程
async def simple_async():
print('hello')
await asyncio.sleep(1) # 休眠1秒
print('python')
# 使用asynio中run方法運行一個協程
asyncio.run(simple_async())
# 執行結果爲
# hello
# python
複製代碼
在協程中若是要調用另外一個協程就使用await
。要注意await
關鍵字要在async
定義的函數中使用,而反過來async
函數能夠不出現await
async
# 定義了一個簡單的協程
async def simple_async():
print('hello')
asyncio.run(simple_async())
# 執行結果
# hello
複製代碼
asyncio.run()
將運行傳入的協程,負責管理asyncio
事件循環。
除了run()
方法可直接執行協程外,還可使用事件循環loop
函數
async def do_something(index):
print(f'start {time.strftime("%X")}', index)
await asyncio.sleep(1)
print(f'finished at {time.strftime("%X")}', index)
def test_do_something():
# 生成器產生多個協程對象
task = [do_something(i) for i in range(5)]
# 獲取一個事件循環對象
loop = asyncio.get_event_loop()
# 在事件循環中執行task列表
loop.run_until_complete(asyncio.wait(task))
loop.close()
test_do_something()
# 運行結果
# start 00:04:03 3
# start 00:04:03 4
# start 00:04:03 1
# start 00:04:03 2
# start 00:04:03 0
# finished at 00:04:04 3
# finished at 00:04:04 4
# finished at 00:04:04 1
# finished at 00:04:04 2
# finished at 00:04:04 0
複製代碼
能夠看出幾乎同時啓動了全部的協程。
其實翻閱源碼可知asyncio.run()
的實現也是封裝了loop
對象及其調用。而asyncio.run()
每次都會建立一個新的事件循環對象用於執行協程。oop
在Python
中可等待(Awaitable
)對象有:協程(corountine
)、任務(Task
)、Future
。即這些對象可使用await
關鍵字進行調用post
await awaitable_object
複製代碼
協程由async def
聲明定義,一個協程可由另外一個協程使用await
進行調用spa
async def nested():
print('in nested func')
return 13
async def outer():
# 要使用await 關鍵字 纔會執行一個協程函數返回的協程對象
print(await nested())
asyncio.run(outer())
# 執行結果
# in nested func
# 13
複製代碼
若是在outer()
方法中直接調用nested()
而不使用await
,將拋出一個RuntimeWarning
async def outer():
# 直接調用協程函數不會發生執行,只是返回一個 coroutine 對象
nested()
asyncio.run(outer())
複製代碼
運行程序,控制檯將輸出如下信息
RuntimeWarning: coroutine 'nested' was never awaited
nested()
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
複製代碼
任務(Task
)是能夠用來併發地執行協程。可使用asyncio.create_task()
將一個協程對象封裝成任務,該任務將很快被排入調度隊列並執行。
async def nested():
print('in nested func')
return 13
async def create_task():
# create_task 將一個協程對象打包成一個 任務時,該協程就會被自動調度運行
task = asyncio.create_task(nested())
# 若是要看到task的執行結果
# 可使用await等待協程執行完成,並返回結果
ret = await task
print(f'nested return {ret}')
asyncio.run(create_task())
# 運行結果
# in nested func
# nested return 13
複製代碼
注:關於併發下文還會詳細說明。
Future
是一種特殊的低層級(low-level
)對象,它是異步操做的最終結果(eventual result
)。 當一個 Future 對象 被等待,這意味着協程將保持等待直到該 Future 對象在其餘地方操做完畢。
一般在應用層代碼不會直接建立Future
對象。在某些庫和asyncio
模塊中的會使用到該對象。
async def used_future_func():
await function_that_returns_a_future_object()
複製代碼
前面咱們知道Task
能夠併發地執行。 asyncio.create_task()
就是一個把協程封裝成Task
的方法。
async def do_after(what, delay):
await asyncio.sleep(delay)
print(what)
# 利用asyncio.create_task建立並行任務
async def corun():
task1 = asyncio.create_task(do_after('hello', 1)) # 模擬執行1秒的任務
task2 = asyncio.create_task(do_after('python', 2)) # 模擬執行2秒的任務
print(f'started at {time.strftime("%X")}')
# 等待兩個任務都完成,兩個任務是並行的,因此總時間兩個任務中最大的執行時間
await task1
await task2
print(f'finished at {time.strftime("%X")}')
asyncio.run(corun())
# 運行結果
# started at 23:41:08
# hello
# python
# finished at 23:41:10
複製代碼
task1
是一個執行1秒的任務,task2
是一個執行2秒的任務,兩個任務併發的執行,總共消耗2秒。
除了使用asyncio.create_task()
外還可使用asyncio.gather()
,這個方法接收協程參數列表
async def do_after(what, delay):
await asyncio.sleep(delay)
print(what)
async def gather():
print(f'started at {time.strftime("%X")}')
# 使用gather可將多個協程傳入
await asyncio.gather(
do_after('hello', 1),
do_after('python', 2),
)
print(f'finished at {time.strftime("%X")}')
asyncio.run(gather())
# 運行結果
# started at 23:47:50
# hello
# python
# finished at 23:47:52
複製代碼
兩個任務消耗的時間爲其中消耗時間最長的任務。