若是 Python 書籍有必定的指導做用,那麼(協程就是)文檔最匱乏、最不爲人知的 Python 特性,所以表面上看是最無用的特性。python
——David Beazley算法
Python 圖書做者編程
字典爲動詞「to yield」給出了兩個釋義:產出和讓步。對於 Python 生成器中的 yield 來講,這兩個含義都成立。yield item 這行代碼會產出一個值,提供給 next(...) 的調用方;此外,還會做出讓步,暫停執行生成器,讓調用方繼續工做,直到須要使用另外一個值時再調用next()。調用方會從生成器中拉取值。多線程
從句法上看,協程與生成器相似,都是定義體中包含 yield 關鍵字的函數。但是,在協程中,yield 一般出如今表達式的右邊(例如,datum = yield),能夠產出值,也能夠不產出——若是 yield關鍵字後面沒有表達式,那麼生成器產出 None。協程可能會從調用方接收數據,不過調用方把數據提供給協程使用的是 .send(datum) 方法,而不是 next(...) 函數。一般,調用方會把值推送給協程。閉包
yield 關鍵字甚至還能夠不接收或傳出數據。無論數據如何流動,yield 都是一種流程控制工具,使用它能夠實現協做式多任務:協程能夠把控制器讓步給中心調度程序,從而激活其餘的協程。架構
從根本上把 yield 視做控制流程的方式,這樣就好理解協程了。異步
生成器如何進化成協程異步編程
協程的底層架構在「PEP 342—Coroutines via EnhancedGenerators」(https://www.python.org/dev/peps/pep-0342/)中定義,並在Python 2.5(2006 年)實現了。自此以後,yield 關鍵字能夠在表達式中使用,並且生成器 API 中增長了 .send(value) 方法。生成器的調用方可使用 .send(...) 方法發送數據,發送的數據會成爲生成器函數中 yield 表達式的值。所以,生成器能夠做爲協程使用。協程是指一個過程,這個過程與調用方協做,產出由調用方提供的值。函數
除了 .send(...) 方法,PEP 342 還添加了 .throw(...) 和 .close()方法:前者的做用是讓調用方拋出異常,在生成器中處理;後者的做用是終止生成器。工具
用做協程的生成器的基本行爲
舉個 🌰 演示協程的用法
1 def simple_coroutine(): # 攜程使用生成器函數定義:定義題中有yield關鍵字 2 print('-> coroutine started') # 若是攜程只從客戶那裏接受數據,那麼產出的值是None,這個值是隱式的,由於yield關鍵字右邊沒有表達式 3 x = yield 4 print('-> coroutine received:', x) 5 6 my_coro = simple_coroutine() 7 print(my_coro) # 與建立生成器的方式同樣,調用函數獲得生成器對象 8 next(my_coro) # 首先要調用next(..)函數,由於生成器尚未啓動,沒在yield語句初暫停,因此一開始沒法發送數據 9 10 my_coro.send(10) # 調用這個方法後,攜程定義中的yield表可是會出現10,直到下一個yield出現或者終止
以上代碼執行的結果爲:
<generator object simple_coroutine at 0x102a463b8> -> coroutine started -> coroutine received: 10 Traceback (most recent call last): ........... my_coro.send(10) # 調用這個方法後,攜程定義中的yield表可是會出現10,直到下一個yield出現或者終止 StopIteration
協程能夠身處四個狀態中的一個。當前狀態可使用inspect.getgeneratorstate(...) 函數肯定,該函數會返回下述字符串中的一個。
'GEN_CREATED'
等待開始執行。
'GEN_RUNNING'
解釋器正在執行。
'GEN_SUSPENDED'
在 yield 表達式處暫停。
'GEN_CLOSED'
執行結束。
由於 send 方法的參數會成爲暫停的 yield 表達式的值,因此,僅當協程處於暫停狀態時才能調用 send 方法,例如 my_coro.send(10)。不過,若是協程還沒激活(即,狀態是 'GEN_CREATED'),狀況就不一樣了。所以,始終要調用 next(my_coro) 激活協程——也能夠調用my_coro.send(None),效果同樣。
若是建立協程對象後當即把 None 以外的值發給它,會出現下述錯誤:
>>> my_coro = simple_coroutine() >>> my_coro.send(1729) Traceback (most recent call last): File "<stdin>", line 1, in <module> TypeError: can't send non-None value to a just-started generator
注意錯誤消息,它表述得至關清楚。最早調用 next(my_coro) 函數這一步一般稱爲「預激」(prime)協程(即,讓協程向前執行到第一個 yield 表達式,準備好做爲活躍的協程使用)。
下面舉個產出多個值的例子,以便更好地理解協程的行爲,🌰 以下
>>> def simple_coro2(a): ... print('-> Started: a =', a) ... b = yield a ... print('-> Received: b =', b) ... c = yield a + b ... print('-> Received: c =', c) ... >>> my_coro2 = simple_coro2(14) >>> from inspect import getgeneratorstate >>> getgeneratorstate(my_coro2) # 指明狀態,處於GEN_CREATED狀態,也就是等着next一下 'GEN_CREATED' >>> next(my_coro2) # 向前執行協程到第一個yield表達式,打印-> Started 這個信息之後,而後產出的a的值,而且中止,等待爲b賦值 -> Started: a = 14 14 >>> getgeneratorstate(my_coro2) # 查看協程的狀態,如今處於GEN_SUSPENDED狀態(即協程在yield表達式處暫停) 'GEN_SUSPENDED' >>> my_coro2.send(28) # 把數字28發給暫停的協程,計算yield表達式,獲得28,而後綁定給b,產出a + b的值(42),而後協程暫停,等待爲c賦值 -> Received: b = 28 42 >>> my_coro2.send(99) # 把數字99發送給暫停的協程,計算yield表達式,獲得99,而後把獲得的數字綁定給c,而後協程終止。致使生成器拋出StopIteration -> Received: c = 99 Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration >>> getgeneratorstate(my_coro2) # 協程的狀態處於GEN_CLOSED狀態 'GEN_CLOSED'
關鍵的一點是,協程在 yield 關鍵字所在的位置暫停執行。前面說過,在賦值語句中,= 右邊的代碼在賦值以前執行。所以,對於 b =yield a 這行代碼來講,等到客戶端代碼再激活協程時纔會設定 b 的值。這種行爲要花點時間才能習慣,不過必定要理解,這樣才能弄懂異步編程中 yield 的做用。
simple_coro2 協程的執行過程分爲 3 個階段,如圖下圖所示。
(1) 調用 next(my_coro2),打印第一個消息,而後執行 yield a,產出數字 14。
(2) 調用 my_coro2.send(28),把 28 賦值給 b,打印第二個消息,然後執行 yield a + b,產出數字 42。
(3) 調用 my_coro2.send(99),把 99 賦值給 c,打印第三個消息,協程終止。
執行 simple_coro2 協程的 3 個階段(注意,各個階段都在yield 表達式中結束,並且下一個階段都從那一行代碼開始,而後再把 yield 表達式的值賦給變量)
示例:使用協程計算移動平均值
使用協程的好處是,total 和 count 聲明爲局部變量便可,無需使用實例屬性或閉包在屢次調用之間保持上下文。下面的 🌰 是使用averager 協程的 doctest。
coroaverager0.py:定義一個計算移動平均值的協程
1 def averager(): 2 total = 0 3 count = 0 4 average = None 5 while True: # 無限循環一直會不斷的把值發送給這個協程,它就會一直接受,而後生成結果 6 # 僅當調用方在協程上調用.close()方法,或者沒有對協程引用的時候纔會終止 7 term = yield average # 這裏的yield表達式用於暫停執行協程,把結果發送給調用方,還用於接受調後面發給協程的值 8 total += term 9 count += 1 10 average = total/count
以上代碼執行的結果爲:
>>> coro_avg = averager() # 建立協程對象 >>> next(coro_avg) # 調用next函數,預激協程 >>> coro_avg.send(10) # 計算平均值:屢次調用send(...)方法,產出當前平均值 10.0 >>> coro_avg.send(30) 20.0 >>> coro_avg.send(5) 15.0
在上述 doctest 中,調用 next(coro_avg) 函數後,協程會向前執行到 yield 表達式,產出 average 變量的初始值——None,所以不會出如今控制檯中。此時,協程在 yield 表達式處暫停,等到調用方發送值。coro_avg.send(10) 那一行發送一個值,激活協程,把發送的值賦給 term,並更新 total、count 和 average 三個變量的值,而後開始 while 循環的下一次迭代,產出 average 變量的值,等待下一次爲 term 變量賦值。
預激協程的裝飾器
若是不預激,那麼協程沒什麼用。調用 my_coro.send(x) 以前,記住必定要調用 next(my_coro)。爲了簡化協程的用法,有時會使用一個預激裝飾器。
1 from functools import wraps 2 from inspect import getgeneratorstate 3 4 5 def coroutine(func): 6 @wraps(func) 7 def primer(*args, **kwargs): # 把被裝飾的生成器函數天換成這裏的primer函數,調用peimer函數時,返回預激後的生成器 8 gen = func(*args, **kwargs) # 獲取生成器對象 9 next(gen) # 預激活 10 return gen # 返回生成器 11 return primer 12 13 14 @coroutine # 預激活裝飾器 15 def averager(): 16 total = 0 17 count = 0 18 average = None 19 while True: # 無限循環一直會不斷的把值發送給這個協程,它就會一直接受,而後生成結果 20 # 僅當調用方在協程上調用.close()方法,或者沒有對協程引用的時候纔會終止 21 term = yield average # 這裏的yield表達式用於暫停執行協程,把結果發送給調用方,還用於接受調後面發給協程的值 22 total += term 23 count += 1 24 average = total/count 25 26 coro_avg = averager() # 調用averager()函數建立一個生成器對象,在coroutine裝飾器的primer函數中已預激活 27 print(getgeneratorstate(coro_avg)) # 查看協程的狀態,已是能夠接收值得狀態咯 28 print(coro_avg.send(10)) # 給協程發送數據 29 print(coro_avg.send(30)) 30 print(coro_avg.send(5))
以上代碼的執行結果爲:
GEN_SUSPENDED
10.0
20.0
15.0
終止協程和異常處理
協程中未處理的異常會向上冒泡,傳給 next 函數或 send 方法的調用方(即觸發協程的對象)
>>> from coroaverager1 import averager >>> coro_avg = averager() >>> coro_avg.send(40) # 使用@corotine裝飾器裝飾的averager協程,能夠當即開始發送值 40.0 >>> coro_avg.send(50) 45.0 >>> coro_avg.send('spam') # 發送的值不是數字,致使協程內部有異常拋出 Traceback (most recent call last): ... TypeError: unsupported operand type(s) for +=: 'float' and 'str' >>> coro_avg.send(60) # 因爲異常沒有處理,so...協程會終止。若是試圖從新激活協程,會拋出StopIteration異常 Traceback (most recent call last): File "<stdin>", line 1, in <module> StopIteration
出錯的緣由是,發送給協程的 'spam' 值不能加到 total 變量上
暗示了終止協程的一種方式:發送某個哨符值,讓協程退出。內置的 None 和 Ellipsis 等常量常常用做哨符值。Ellipsis 的優勢是,數據流中不太常有這個值。我還見過有人把 StopIteration類(類自己,而不是實例,也不拋出)做爲哨符值;也就是說,是像這樣使用的:my_coro.send(StopIteration)。
從 Python 2.5 開始,客戶代碼能夠在生成器對象上調用兩個方法,顯式地把異常發給協程。
這兩個方法是 throw 和 close:
generator.throw(exc_type[, exc_value[, traceback]])
導致生成器在暫停的 yield 表達式處拋出指定的異常。若是生成器處理了拋出的異常,代碼會向前執行到下一個 yield 表達式,而產出的值會成爲調用 generator.throw 方法獲得的返回值。若是生成器沒有處理拋出的異常,異常會向上冒泡,傳到調用方的上下文中。
generator.close()
導致生成器在暫停的 yield 表達式處拋出 GeneratorExit 異常。若是生成器沒有處理這個異常,或者拋出了 StopIteration 異常(一般是指運行到結尾),調用方不會報錯。若是收到 GeneratorExit 異常,生成器必定不能產出值,不然解釋器會拋出 RuntimeError 異常。生成器拋出的其餘異常會向上冒泡,傳給調用方。
coro_exc_demo.py:學習在協程中處理異常的測試代碼
1 class DemoException(Exception): 2 """爲了掩飾定義的異常類型""" 3 4 def demo_exc_handling(): 5 print('-> coroutine startedd') 6 while True: 7 try: 8 x = yield 9 except DemoException: # 特別處理 DemoException 異常 10 print('*** DemoException handled. Continuing...') 11 else: # 沒有異常就接收值 12 print('-> coroutine received: {!r}'.format(x)) 13 14 raise RuntimeError('This line should never run.') # while True會不中止的循環,這同樣會一直執行
激活和關閉 demo_exc_handling,沒有異常
>>> exc_coro = demo_exc_handling() >>> next(exc_coro) -> coroutine started >>> exc_coro.send(11) -> coroutine received: 11 >>> exc_coro.send(22) -> coroutine received: 22 >>> exc_coro.close() >>> from inspect import getgeneratorstate >>> getgeneratorstate(exc_coro) 'GEN_CLOSED'
若是把 DemoException 異常傳入 demo_exc_handling 協程,它會處理,而後繼續運行,如 🌰 所示
>>> exc_coro = demo_exc_handling() >>> next(exc_coro) -> coroutine started >>> exc_coro.send(11) -> coroutine received: 11 >>> exc_coro.throw(DemoException) *** DemoException handled. Continuing... >>> getgeneratorstate(exc_coro) 'GEN_SUSPENDED'
可是,若是傳入協程的異常沒有處理,協程會中止,即狀態變成'GEN_CLOSED'
>>> exc_coro = demo_exc_handling() >>> next(exc_coro) -> coroutine started >>> exc_coro.send(11) -> coroutine received: 11 >>> exc_coro.throw(ZeroDivisionError) Traceback (most recent call last): ... ZeroDivisionError >>> getgeneratorstate(exc_coro) 'GEN_CLOSED'
若是無論協程如何結束都想作些清理工做,要把協程定義體中相關的代碼放入 try/finally 塊中
🌰 coro_finally_demo.py:使用 try/finally 塊在協程終止時執行操做
1 class DemoException(Exception): 2 pass 3 4 def demo_finall(): 5 print('-> coroutine started') 6 try: 7 while True: 8 try: 9 x = yield 10 except DemoException: 11 print('*** DemoException handled. Continuing...') 12 else: 13 print('-> coroutine received: {!r}'.format(x)) 14 finally: 15 print('-> coroutine ending')
讓協程返回值
下面的 🌰 是 averager 協程的不一樣版本,這一版會返回結果。爲了說明如何返回值,每次激活協程時不會產出移動平均值。這麼作是爲了強調某些協程不會產出值,而是在最後返回一個值(一般是某種累計值)。
1 from collections import namedtuple 2 3 Result = namedtuple('Result', 'count average') 4 5 def averager(): 6 total = 0.0 7 count = 0 8 average = None 9 while True: 10 term = yield 11 if term is None: # 當send(None)的時候,終止協程 12 break 13 total += term 14 count += 1 15 average = total/count 16 return Result(count, average) # 返回一個namedtuple,包含兩個字段count, average
注意:
return 表達式的值會偷偷傳給調用方,賦值給 StopIteration異常的一個屬性。這樣作有點不合常理,可是能保留生成器對象的常規行爲——耗盡時拋出 StopIteration 異常。
演示🌰 捕獲 StopIteration 異常,獲取 averager 返回的值
>>> coro_avg = averager() >>> next(coro_avg) >>> coro_avg.send(10) >>> coro_avg.send(30) >>> coro_avg.send(6.5) >>> try: ... coro_avg.send(None) ... except StopIteration as exc: ... result = exc.value ... >>> result Result(count=3, average=15.5)
使用yield from
首先要知道,yield from 是全新的語言結構。它的做用比 yield 多不少,所以人們認爲繼續使用那個關鍵字多少會引發誤解。在其餘語言中,相似的結構使用 await 關鍵字,這個名稱好多了,由於它傳達了相當重要的一點:在生成器 gen 中使用 yield from subgen()時,subgen 會得到控制權,把產出的值傳給 gen 的調用方,即調用方能夠直接控制 subgen。與此同時,gen 會阻塞,等待 subgen 終止。
舉個🌰 yield from 可用於簡化 for 循環中的 yield 表達式
1 def gen(): 2 for i in 'AB': 3 yield i 4 5 for i in range(1, 3): 6 yield i 7 8 print(list(gen())) 9 10 11 ''' 12 yield from 版本,能夠簡化內部的for循環 13 ''' 14 def gen(): 15 yield from 'AB' 16 yield from range(1, 3) 17 18 print(list(gen()))
🌰 使用 yield from 連接可迭代的對象
1 def chain(*iterable): 2 for i in iterable: 3 yield from i 4 5 s = 'ABC' 6 t = tuple(range(1, 5)) 7 r = list(chain(s, t)) 8 print(r)
以上代碼執行的結果爲:
['A', 'B', 'C', 1, 2, 3, 4]
yield from x 表達式對 x 對象所作的第一件事是,調用 iter(x),從中獲取迭代器。所以,x 能夠是任何可迭代的對象。
yield from 的主要功能是打開雙向通道,把最外層的調用方與最內層的子生成器鏈接起來,這樣兩者能夠直接發送和產出值,還能夠直接傳入異常,而不用在位於中間的協程中添加大量處理異常的樣板代碼。有了這個結構,協程能夠經過之前不可能的方式委託職責。
委派生成器
包含 yield from <iterable> 表達式的生成器函數。
子生成器
從 yield from 表達式中 <iterable> 部分獲取的生成器。
調用方
PEP 380 使用「調用方」這個術語指代調用委派生成器的客戶端代碼。在不一樣的語境中,我會使用「客戶端」代替「調用方」,以此與委派生成器(也是調用方,由於它調用了子生成器)區分開。
下圖能更好地說明 yield from 結構的用法。圖中把該示例中各個相關的部分標識出來了
委派生成器在 yield from 表達式處暫停時,調用方能夠直接把數據發給子生成器,子生成器再把產出的值發給調用方。子生成器返回以後,解釋器會拋出 StopIteration 異常,並把返回值附加到異常對象上,此時委派生成器會恢復
coroaverager3.py 腳本從一個字典中讀取虛構的七年級男女學生的體重和身高。例如, 'boys;m' 鍵對應於 9 個男學生的身高(單位是米),'girls;kg' 鍵對應於 10 個女學生的體重(單位是千克)。這個腳本把各組數據傳給前面定義的 averager 協程,而後生成一個報告,以下所示:
$ python3 coroaverager3.py 9 boys averaging 40.42kg 9 boys averaging 1.39m 10 girls averaging 42.04kg 10 girls averaging 1.43m
🌰 coroaverager3.py:使用 yield from 計算平均值並輸出統計報告
1 from collections import namedtuple 2 3 4 Result = namedtuple('Result', 'count average') 5 6 7 # 子生成器 8 def averager(): # 子生成器 9 total = 0.0 10 count = 0 11 average = None 12 while True: 13 term = yield # 經過main函數中的gourp.send()接收到term的值 14 if term is None: # 相當重要的終止條件,告訴協程全部的數據已經結束,結束協程 15 break 16 total += term 17 count += 1 18 average = total/count 19 return Result(count, average) # 返回 grouper 中yield from的值 20 21 22 # 委派生成器 23 def grouper(results, key): # 委派生成器 24 while True: # 每次循環都會建立一個averager的實例 25 results[key] = yield from averager() # grouper發送的每一個值都會讓yield from處理,把產出的值綁定給resuluts[key] 26 27 28 # 客戶端代碼,即調用方 29 def main(data): # main函數是客戶端代碼 30 results = {} 31 for key, values in data.items(): 32 group = grouper(results, key) # group是調用grouper的生成器 33 next(group) # 預激group協程 34 for value in values: 35 group.send(value) # 把各個value的值傳遞給grouper,經過grouper傳入averager中term 36 group.send(None) # 全部值傳遞結束之後,終止averager 37 #print(results) # 若是要調試,去掉註釋 38 report(results) 39 40 #輸出報告 41 def report(results): 42 for key, result in sorted(results.items()): 43 group, unit = key.split(';') 44 print('{:2} {:5} averaging {:.2f}{}'.format( 45 result.count, group, result.average, unit)) 46 47 48 data = { 49 'girls;kg': 50 [40.9, 38.5, 44.3, 42.2, 45.2, 41.7, 44.5, 38.0, 40.6, 44.5], 51 'girls;m': 52 [1.6, 1.51, 1.4, 1.3, 1.41, 1.39, 1.33, 1.46, 1.45, 1.43], 53 'boys;kg': 54 [39.0, 40.8, 43.2, 40.8, 43.1, 38.6, 41.4, 40.6, 36.3], 55 'boys;m': 56 [1.38, 1.5, 1.32, 1.25, 1.37, 1.48, 1.25, 1.49, 1.46], 57 } 58 59 if __name__ == '__main__': 60 main(data)
以上代碼執行的結果爲:
9 boys averaging 40.42kg 9 boys averaging 1.39m 10 girls averaging 42.04kg 10 girls averaging 1.43m
下面簡要說明上面🌰的運做方式,還會說明把 main 函數中調用group.send(None) 那一行代碼(帶有「重要!」註釋的那一行)去掉會發生什麼事。
yield from的意義
把迭代器看成生成器使用,至關於把子生成器的定義體內聯在yield from 表達式中。此外,子生成器能夠執行 return 語句,返回一個值,而返回的值會成爲 yield from 表達式的值
批准後的 PEP 380 在「Proposal」一節(https://www.python.org/dev/peps/pep-0380/#proposal)分六點說明了yield from 的行爲。這裏,我幾乎原封不動地引述,不過把有歧義的「迭代器」一詞都換成了「子生成器」,還作了進一步說明。示例闡明瞭下述四點。
yield from 結構的另外兩個特性與異常和終止有關
使用案例:使用協程作離散事件仿真
協程能天然地表述不少算法,例如仿真、遊戲、異步 I/O,以及其餘事件驅動型編程形式或協做式多任務。
離散事件仿真簡介
離散事件仿真(Discrete Event Simulation,DES)是一種把系統建模成一系列事件的仿真類型。在離散事件仿真中,仿真「鍾」向前推動的量不是固定的,而是直接推動到下一個事件模型的模擬時間。假如咱們抽象模擬出租車的運營過程,其中一個事件是乘客上車,下一個事件則是乘客下車。無論乘客坐了 5 分鐘仍是 50 分鐘,一旦乘客下車,仿真鍾就會更新,指向這次運營的結束時間。使用離散事件仿真能夠在不到一秒鐘的時間內模擬一年的出租車運營過程。這與連續仿真不一樣,連續仿真的仿真鍾以固定的量(一般很小)不斷向前推動。
顯然,回合制遊戲就是離散事件仿真的例子:遊戲的狀態只在玩家操做時變化,並且一旦玩家決定下一步怎麼走了,仿真鍾就會凍結。而實時遊戲則是連續仿真,仿真鍾一直在運行,遊戲的狀態在一秒鐘以內更新不少次,所以反應慢的玩家特別吃虧。
這兩種仿真類型都能使用多線程或在單個線程中使用面向事件的編程技術(例如事件循環驅動的回調或協程)實現。能夠說,爲了實現連續仿真,在多個線程中處理實時並行的操做更天然。而協程剛好爲實現離散事件仿真提供了合理的抽象。SimPy 是一個實現離散事件仿真的Python 包,經過一個協程表示離散事件仿真系統中的各個進程。