前年我曾寫過一篇《初探 Python 3 的異步 IO 編程》,當時只是初步接觸了一下 yield from 語法和 asyncio 標準庫。前些日子我在 V2EX 看到一篇《爲何只有基於生成器的協程能夠真正的暫停執行並強制性返回給事件循環?》,激起了我再探 Python 3 異步編程的興趣。然而看了不少文章和,才發現極少提到 async 和 await 實際意義的,絕大部分僅止步於對 asyncio 庫的使用,真正有所幫助的只有《How the heck does async/await work in Python 3.5?》和《A tale of event loops》這兩篇。python
在接着寫下去以前,我先列舉一些 PEPs 以供參考:git
從這些 PEPs 中能夠看出 Python 生成器 / 協程的發展歷程:先是 PEP 255 引入了簡單的生成器;接着 PEP 342 賦予了生成器 send() 方法,使其能夠傳遞數據,協程也就有了實際意義;接下來,PEP 380 增長了 yield from 語法,簡化了調用子生成器的語法;而後,PEP 492 將協程和生成器區分開,使得其更不易被用錯;最後,PEP 525 提供了異步生成器,使得編寫異步的數據產生器獲得簡化。github
本文將簡單介紹一下這些 PEPs,着重深刻的則是 PEP 492。編程
首先提一下生成器(generator)。服務器
Generator function 是函數體裏包含 yield 表達式的函數,它在調用時生成一個 generator 對象(如下將其命名爲 gen)。第一次調用 next(gen) 或 gen.send(None) 時,將進入它的函數體:在執行到 yield 表達式時,向調用者返回數據;當函數返回時,拋出 StopIteration 異常。在該函數未執行完以前,可再次調用 next(gen) 進入函數體,也可調用 gen.send(value) 向其傳遞參數,以供其使用(例如提供必要的數據,或者控制其行爲等)。多線程
因爲它主要的做用是產生一系列數據,因此通常使用 for … in gen 的語法來遍歷它,以簡化 next() 的調用和手動捕捉 StopIteration 異常。
即:併發
1
2
3
4
5
6
|
while True:
try:
value = next(gen)
process(value)
except StopIteration:
break
|
能夠簡化爲:app
1
2
|
for value in gen:
process(value)
|
因爲生成器提供了再次進入一個函數體的機制,其實它已經能夠當成協程來使用了。
寫個很簡單的例子:異步
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import select
import socket
def coroutine():
sock = socket.socket()
sock.setblocking(0)
address = yield sock
try:
sock.connect(address)
except BlockingIOError:
pass
data = yield
size = yield sock.send(data)
yield sock.recv(size)
def main():
coro = coroutine()
sock = coro.send(None)
wait_list = (sock.fileno(),)
coro.send(('www.baidu.com', 80))
select.select((), wait_list, ())
coro.send(b'GET / HTTP/1.1\r\nHost: www.baidu.com\r\nConnection: Close\r\n\r\n')
select.select(wait_list, (), ())
print(coro.send(1024))
|
這裏的 coroutine 函數用於處理鏈接和收發數據,而 main 函數則等待讀寫和傳遞參數。雖然看上去和同步的調用沒啥區別,但其實在 main 函數中能夠同時執行多個 coroutine,以實現併發執行。 廈門電動叉車socket
再說一下 yield from。
若是一個生成器內部須要遍歷另外一個生成器,並將數據返回給調用者,你須要遍歷它並處理所遇到的異常;而用了 yield from 後,則能夠一行代碼解決這些問題。具體例子就不列出了,PEP 380 裏有詳細的代碼。
這對於協程而言也是一個利好,這使得它的調用也獲得了簡化:
1
2
3
4
5
|
def coroutine():
...
def caller():
yield from coroutine()
|
接下來就該輪到協程(coroutine)登場了。
從上文也可看出,調用 yield from gen 時,我沒法斷定我是遍歷了一個生成器,仍是調用了一個協程,這種混淆使得接口的設計者和使用者須要花費額外的工夫來約定和檢查。
因而 Python 又前後添加了 asyncio.coroutine 和 types.coroutine 這兩個裝飾器來標註協程,這樣就使得須要使用協程時,不至於誤用了生成器。順帶一提,前者是 asyncio 庫的實現,須要保持向下兼容,本文暫不討論;後者則是 Python 3.5 的語言實現,其實是給函數的 __code__.co_flags 設置 CO_ITERABLE_COROUTINE 標誌。隨後,async def 也被引入以用於定義協程,它則是設置 CO_COROUTINE 標誌。
至此,協程和生成器就得以區分,其中以 types.coroutine 定義的協程稱爲基於生成器的協程(generator-based coroutine),而以 async def 定義的協程則稱爲原生協程(native coroutine)。
這兩種協程之間的區別其實並不大,非要追究的話,主要有這些:
實際使用時,若是不考慮向下兼容,能夠都用原生協程,除非這個協程裏用到了 yield 或 yield from 表達式。
定義了協程函數之後,就能夠調用它們了。
PEP 492 也引入了一個 await 表達式來調用協程,它的用法和 yield from 差很少,可是它只能在協程函數內部使用,且只能接 awaitable 的對象。
所謂 awaitable 的對象,就是其 __await__ 方法返回一個迭代器的對象。原生協程和基於生成器的協程都是 awaitable 的對象。
另外一種調用協程的方法則和生成器同樣,調用其 send 方法,並自行迭代。這種方式主要用於在非協程函數裏調用協程。
舉例來講,調用的代碼會相似這樣:
1
2
3
4
5
6
7
8
9
|
@types.coroutine
def generator_coroutine():
yield 1
async def native_coroutine():
await generator_coroutine()
def main():
native_coroutine().send(None)
|
其中 generator_coroutine 函數裏由於用到了 yield 表達式,因此只能定義成基於生成器的協程;native_coroutine 函數因爲自身是協程,能夠直接用 await 表達式調用其餘協程;main 函數因爲不是協程,於是須要用 native_coroutine().send(None) 這種方式來調用協程。
這個例子其實也解釋了 V2EX 裏提到的那個問題,即爲何原生協程不能「真正的暫停執行並強制性返回給事件循環」。
假設事件循環在 main 函數裏,原生協程是 native_coroutine 函數,那要怎麼才能讓它暫停並返回 main 函數呢?
很顯然 await generator_coroutine() 是不行的,這會進入 generator_coroutine 的函數體,而不是回到 main 函數;若是 yield 一個值,又會遇到以前提到的一個限制,即原生協程裏不能有 yield 表達式;最後僅剩 return 或 raise 這兩種選擇了,但它們雖然能回到 main 函數,卻也不是「暫停」,由於再也無法「繼續」了。
因此通常而言,若是要用 Python 3.5 來作異步編程的話,最外層的事件循環須要調用協程的 send 方法,裏面大部分的異步方法均可以用原生協程來實現,但最底層的異步方法則須要用基於生成器的協程。
爲了有個更直觀的認識,再來舉個例子,抓取 10 個百度搜索的頁面:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
|
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE
import socket
from types import coroutine
from urllib.parse import urlparse
@coroutine
def until_readable(fileobj):
yield fileobj, EVENT_READ
@coroutine
def until_writable(fileobj):
yield fileobj, EVENT_WRITE
async def connect(sock, address):
try:
sock.connect(address)
except BlockingIOError:
await until_writable(sock)
async def recv(fileobj):
result = b''
while True:
try:
data = fileobj.recv(4096)
if not data:
return result
result += data
except BlockingIOError:
await until_readable(fileobj)
async def send(fileobj, data):
while data:
try:
sent_bytes = fileobj.send(data)
data = data[sent_bytes:]
except BlockingIOError:
await until_writable(fileobj)
async def fetch_url(url):
parsed_url = urlparse(url)
if parsed_url.port is None:
port = 443 if parsed_url.scheme == 'https' else 80
else:
port = parsed_url.port
with socket.socket() as sock:
sock.setblocking(0)
await connect(sock, (parsed_url.hostname, port))
path = parsed_url.path if parsed_url.path else '/'
path_with_query = '{}?{}'.format(path, parsed_url.query) if parsed_url.query else path
await send(sock, 'GET {} HTTP/1.1\r\nHost: {}\r\nConnection: Close\r\n\r\n'.format(path_with_query, parsed_url.netloc).encode())
content = await recv(sock)
print('{}: {}'.format(url, content))
def main():
urls = ['http://www.baidu.com/s?wd={}'.format(i) for i in range(10)]
tasks = [fetch_url(url) for url in urls] # 將任務定義成協程對象
with DefaultSelector() as selector:
while tasks or selector.get_map(): # 有要作的任務,或者有等待的 IO 事件
events = selector.select(0 if tasks else 1) # 若是有要作的任務,馬上得到當前已就緒的 IO 事件,不然最多等待 1 秒
for key, event in events:
task = key.data
tasks.append(task) # IO 事件已就緒,能夠執行新 task 了
selector.unregister(key.fileobj) # 取消註冊,避免重複執行
for task in tasks:
try:
fileobj, event = task.send(None) # 開始或繼續執行 task
except StopIteration:
pass
else:
selector.register(fileobj, event, task) # task 還未執行完,須要等待 IO,將 task 註冊爲 key.data
tasks.clear()
main()
|
其餘的函數都沒什麼好說的,主要解釋下 until_readable、until_writable 和 main 函數。
其實 until_readable 和 until_writable 函數都是 yield 一個 (fileobj, event) 元組,用於告知事件循環,這個 fileobj 的 event 事件須要被監聽。
而在 main 函數中,事件循環遍歷並執行 tasks 裏包含的協程。這些協程在等待 IO 時返回事件循環,由事件循環註冊事件及其對應的協程。到下一個事件循環時,取出全部就緒的事件,繼續執行其對應的協程,就完成了整個的異步執行過程。
若是關注到 fetch_url 函數,就會發現業務邏輯用到的代碼其實挺簡單,只是 await 異步函數而已。這雖然簡化了大部分的開發工做,但其實也限制了它的表達能力,由於在一個協程內,不能同時 await 多個異步函數——它其實是順序執行的,只是不一樣協程之間能夠異步執行而已。
考慮一個 HTTP/2 的客戶端,它和服務器之間的鏈接是多路複用的,也就是能夠在一個鏈接裏同時發出和接收多份數據,而這些數據的傳輸是亂序的。若是一份 JavaScript 資源已經下載完畢,不必再等其餘的圖片資源下載完畢才能執行。要作到這點,就須要協程有併發執行多個子協程,共同完成任務的能力。這在使用多線程或回調函數時是很容易作到的,但使用 await 就顯得捉襟見肘了。倒也不是不能作,只是須要拿以前的代碼改下,yield 一些子協程,並在事件循環中判斷一下類型就好了。
雖然僅用上述提到的東西,已經能作異步編程了,但我仍是得補充 2 個漏掉的語法知識:
1.async with
先考慮普通的 with 語句,它的主要做用是在進入和退出一個區域時,作一些初始化和清理工做。
例如:
1
2
3
4
5
|
f = open('1.txt')
try:
content = f.read()
finally:
f.close()
|
就能夠改寫爲:
1
2
|
with open('1.txt') as f:
content = f.read()
|
這裏要求 open 函數返回的 f 對象帶有 __enter__ 和 __exit__ 方法。其中,__enter__ 方法只須要返回一個文件對象就好了,__exit__ 則須要調用這個文件對象的 close 方法。相似的,假設這個 open 函數和 close 方法變成了異步的,你的代碼多是這樣的:
1
2
3
4
5
|
f = await async_open('1.txt')
try:
content = await f.read()
finally:
await f.close()
|
你就能夠用 async with 來改寫它:
1
2
|
async with async_open('1.txt') as f:
content = await f.read()
|
相應的,async_open 函數返回的 f 對象須要實現 __aenter__ 和 __aexit__ 這 2 個異步方法。
2.async for
這裏也先考慮普通的 for 語句,它的主要做用是遍歷一個迭代器。
例如:
1
2
3
4
5
6
7
|
f = open('1.txt')
it = iter(f)
while True:
try:
print(next(it))
except StopIteration:
break
|
能夠改寫爲:
1
2
3
|
f = open('1.txt')
for line in f:
print(line)
|
這裏要求 open 函數返回的 f 對象返回一個迭代器對象,即實現了 __iter__ 方法,這個方法要返回一個實現了 __next__ 方法的對象。而 __next__ 方法在每次調用時,都返回下一行的文件內容,直到文件結束時拋出 StopIteration 異常。相似的,假如 __next__ 方法變成了異步的,你的代碼多是這樣的:
1
2
3
4
5
6
7
8
|
f = open('1.txt')
it = iter(f)
while True:
try:
line = await it.__anext__()
print(line)
except StopAsyncIteration:
break
|
你能夠用 async with 來改寫它:
1
2
3
|
f = open('1.txt')
async for line in f:
print(line)
|
相應的,所需實現的方法分別變成了 __aiter__ 和 __anext__。其中,後者是異步方法。順帶一提,PEP 525 引入的異步生成器(asynchronous generator)就實現了這兩個方法。在異步方法中使用 yield 表達式,會將它變成異步生成器函數(Python 3.6 之後可用,3.5 以前是語法錯誤)。值得注意的是,異步生成器沒有實現 __await__ 方法,所以它不是協程,也不能被 await。