asyncio模塊實現單線程-多任務的異步協程

本篇介紹基於asyncio模塊,實現單線程-多任務的異步協程html

基本概念

協程函數

  • 協程函數: 定義形式爲 async def 的函數;

aysnc

  • Python3.5+版本新增了aysncawait關鍵字,這兩個語法糖讓咱們很是方便地定義和使用協程。python

  • 若是一個函數的定義被async修飾後,則該函數就是一個特殊的函數(協程函數)express

1
2
3
4
5
6
7
# 使用 async 關鍵字修飾函數後,調用該函數,但不會執行函數,而是返回一個coroutine協程對象
async def get_request(url):
print("正在請求: ", url)
sleep(1)
print('請求結束:', url)

get_request('www.b.com')

運行分析:編程

  • 直接調用這個函數的話並不會被執行,也會出現一條警告 RuntimeWarning: coroutine 'get_request' was never awaited網絡

  • 對於它的解釋 官方文檔 裏提到,當協程程序被調用而不是被等待時(即執行 get_request('www.b.com') 而不是 await get_request('www.b.com') )或者協程沒有經過 asyncio.create_task() 被排入計劃日程(建立任務對象),asyncio 將會發出一條 RuntimeWarning併發

  • 固然 asyncio.create_task( get_request) 是py3.7中的,在以前的版本中是用到的 asyncio.ensure_future( get_request )app

await

  • 在協程中若是要調用另外一個協程就使用await要注意await關鍵字要在async定義的函數中使用,而反過來async函數能夠不出現await
  • 若是一個對象能夠在 await 語句中使用,那麼它就是 可等待 對象。許多 asyncio API 都被設計爲接受可等待對象。
  • 可等待 對象有三種主要類型: 協程, 任務Future.
    • 經過 ensure_futurecreate_task 函數打包協程對象便可獲得任務。
    • Future 是一種特殊的 低層級 可等待對象,表示一個異步操做的 最終結果
      • 不用回調方法編寫異步代碼後,爲了獲取異步調用的結果,引入一個 Future 將來對象。Future 封裝了與 loop 的交互行爲,add_done_callback 方法向 epoll 註冊回調函數,當 result 屬性獲得返回值後,會運行以前註冊的回調函數,向上傳遞給 coroutine。
      • 一般狀況下 沒有必要 在應用層級的代碼中建立 Future 對象
1
2
3
4
5
6
7
8
9
10
11
12
import asyncio

async def producer():
for i in range(1, 6):
print(f'生產:{i}')
await consumer(i)

async def consumer(i):
print(f'消費:{i}')

asyncio.run(producer())
# asyncio.run() 是py3.7更新出來的,在py3.7中,使用這個能夠簡單直接的運行 asyncio 程序。
  • asyncio.run() 函數用來運行最高層級的入口點 「main()」 函數,更多解釋詳見 官方文檔異步

  • 此函數老是會建立一個新的事件循環並在結束時關閉之。它應當被用做 asyncio 程序的主入口點,理想狀況下應當只被調用一次。async

協程對象

  • 協程對象*:調用 *協程函數 所返回的對象。ide

    • 特殊函數被調用後,函數內部的實現語句不會被當即執行,而後該函數調用會返回一個協程對象。
  • 結論:協程對象 == 特殊的函數調用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
async def get_request(url):
print("正在請求: ", url)
sleep(1)
print('請求結束:', url)

# 函數調用:返回的就是一個協程對象
c = get_request('www.b.com')
print(c)
# <coroutine object get_request at 0x000002A6DFA026D0>

# 建立3個協程對象
urls = [
'1.com', '2.com', '3.com'
]
coroutine_list = []
for url in urls:
c = get_request(url)
coroutine_list.append(c)
print(coroutine_list)
# [<coroutine object get_request at 0x0000022FE5313F10>, <coroutine object get_request at 0x0000022FE52426D0>, <coroutine object get_request at 0x0000022FE5313EB8>]

任務對象

  • 任務對象其實就是對協程對象的進一步封裝
  • 任務 被用來設置日程以便 併發 執行協程。

結論:任務對象 == 高級的協程對象 == 特殊的函數調用

特性:能夠綁定回調(爬蟲中回調函數經常使用來作數據解析)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import asyncio
from time import sleep

# 協程函數的定義
async def get_request(url):
print("正在請求: ", url)
sleep(1)
print('請求結束:', url)


# 函數調用:返回的就是一個協程對象
c = get_request('www.b.com')

# 建立一個任務對象:基於協程對象建立的
task = asyncio.ensure_future(c) # ensure_future 是py3.7以前的

# 建立3個任務對象
urls = [
'1.com', '2.com', '3.com'
]
task_list = [] # 存放多個任務對象的列表
for url in urls:
c = get_request(url)
task = asyncio.ensure_future(c)
task_list.append(task)

綁定回調

回調函數何時被執行?

  • 任務對象執行結束後執行

task.add_done_callback(func)

  • func必需要有一個參數,該參數表示的是該回調函數對應的任務對象
  • 回調函數的參數.result() : 任務對象對應的特殊函數執行結束的返回值。

事件循環對象

  • 做用:將其內部註冊的任務對象進行異步執行。
  • 事件循環是異步編程的底層基石
  • 在py3.6中咱們須要手動建立事件循環對象。
  • 在py3.7中,有了高層級的 asyncio 函數,例如 asyncio.run(),就不多有必要使用 低層級函數 來手動建立和關閉事件循環。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
import asyncio
import time


# 函數的定義
# 使用 async 關鍵字修飾函數後,調用該函數,但不會執行函數,而是返回一個coroutine協程對象
async def get_request(url):
print("正在請求: ", url)
# asyncio.sleep(1) # 阻塞1s沒有成功
await asyncio.sleep(1) # 加上await關鍵字便可,這裏的 await 表示等待
print('請求結束:', url)


# 建立3個協程對象
urls = [
'1.com', '2.com', '3.com'
]
start = time.time()

# 任務列表:存儲多個任務對象

# py3.6
tasks = []
for url in urls:
c = get_request(url)
task = asyncio.ensure_future(c)
tasks.append(task)
# 獲取當前事件循環,若是當前os線程沒有設置而且 set_event_loop() 尚未被調用,asyncio建立一個新的事件循環
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks)) # 直接列表會報錯,須要修飾如下,這裏的 wait 表示掛起

print('總耗時:', time.time() - start)


# py3.7
# 異步實現
# async def main():
# tasks = []
# for url in urls:
# c = get_request(url)
# task = asyncio.create_task (c)
# tasks.append(task)
# await asyncio.gather(*tasks)
# print('總耗時:', time.time() - start)
#
# asyncio.run(main())

# 固然這樣的寫法還是同步
# async def main():
# for url in urls:
# c = get_request(url)
# task = asyncio.create_task(c)
# await task
# print('總耗時:', time.time() - start)
#
# asyncio.run(main())
  • 與py3.6相比,都是先作一個任務列表,而後py3.6須要手動建立事件循環對象get_event_loop 並使用 run_until_complete 來達到異步執行,而在py3.7中,gather會併發的執行傳入的可等待對象並在run的調用下完成異步執行。因此在新版py3.7中,咱們無需手動建立和關閉事件循環了。
  • py3.7用 create_task 代替 ensure_future。

編碼流程

  • 定義協程函數

  • 建立協程對象

  • 封裝任務對象

    • 綁定回調函數
  • 建立事件循環對象

  • 將任務對象註冊到事件循環對象中,而且開啓事件循環。

按照流程完整的py3.6代碼以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import time


# 定義協程函數
async def get_request(url):
print("正在請求: ", url)
# asyncio.sleep(1) # 阻塞1s沒有成功
await asyncio.sleep(1) # 加上await關鍵字便可,這裏的 await 表示等待
print('請求結束:', url)
return '我去回調啦'


def parse(task): # task 表示與回調函數綁定的任務對象 / 給回調函數傳入任務對象
print('i am task callback() !!!', task.result())


urls = [
'1.com', '2.com', '3.com'
]
start = time.time()

# 任務列表:存儲多個任務對象
tasks = []
for url in urls:
# 建立協程對象
c = get_request(url)
# 封裝任務對象
task = asyncio.ensure_future(c)
# 綁定回調
task.add_done_callback(parse)
tasks.append(task)
# 建立事件循環對象
loop = asyncio.get_event_loop()
# 將任務對象註冊到事件循環對象中,而且開啓事件循環
loop.run_until_complete(asyncio.wait(tasks)) # 直接列表會報錯,須要修飾如下,這裏的 wait 表示掛起

print('總耗時:', time.time() - start)

note:在特殊函數內部的實現語句中不能夠出現不支持異步的模塊對應的代碼,不然就會終止多任務異步協程的異步效果。

在py3.7中,則爲

  • 定義協程函數

  • 定義 asyncio 程序的主入口

    • 建立協程對象
    • 封裝任務對象
    • 綁定回調函數
  • asyncio.run(main())

按照流程完整的py3.7代碼以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import asyncio
import time


# 定義協程函數
async def get_request(url):
print("正在請求: ", url)
await asyncio.sleep(1)
print('請求結束:', url)
return '我去回調啦'


def parse(task): # task 表示與回調函數綁定的任務對象 / 給回調函數傳入任務對象
print('i am task callback() !!!', task.result())


urls = [
'1.com', '2.com', '3.com'
]
start = time.time()


# 定義 asyncio 程序的入口點
async def main():
tasks = []
for url in urls:
# 建立協程對象
c = get_request(url)
# 封裝任務對象
task = asyncio.create_task(c)
# 綁定回調函數
task.add_done_callback(parse)
tasks.append(task)
await asyncio.gather(*tasks)
print('總耗時:', time.time() - start)


asyncio.run(main())

異步的本質

  • 按照註冊順序執行,遇到阻塞就會掛起,執行下一個任務。

  • 當上一個任務的阻塞結束後,就會繼續執行該任務。

  • 真正的掛起是由 asyncio.wait(tasks) 作到的

事件循環

圖片來自: 談談Python協程技術的演進

底層流程

圖片來自 理解 Python asyncio

底層尚未理解,先把大佬的圖粘過來慢慢研究😄

待補充!!!

總結

執行協程的三種機制

asyncio提供了三種執行協程的機制:

  1. 使用asyncio.run()執行協程。通常用於執行最頂層的入口函數,如main()

  2. await一個協程。通常用於在一個協程中調用另外一協程。

  3. asyncio.create_task()asyncio.ensure_futuer() 方法將Coroutine(協程)封裝爲Task(任務)。通常用於實現異步併發操做。 須要注意的是,只有在當前線程存在事件循環的時候才能建立任務(Task)。

在下一篇中將結合 aiohttp 網絡請求模塊,加速爬取!!!

相關文章
相關標籤/搜索