原創做品,轉載請註明出處:點我
在前兩篇文章中,咱們介紹了什麼是Generator和coroutine,在這一篇文章中,咱們會介紹coroutine在模擬pipeline(管道 )和控制Dataflow(數據流)方面的運用。html
coroutine能夠用來模擬pipeline行爲。經過把多個coroutine串聯在一塊兒來實現pipe,在這個管道中,數據是經過send()函數在各個coroutine之間傳遞的:python
可是這些在pipe中傳遞的數據哪裏來的呢?這就須要一個數據源,或者說producer.這個producer驅動整個pipe的運行:數組
一般狀況下,source只是提供數據,驅動整個pipe的運行,其自己並非一個coroutine,一般行爲相似於下面這個模式:函數
其中,target就是一個coroutine,當調用target.send()函數的時候,數據將會傳入整個pipe。spa
既然pipeline有一個起點,一樣的也就必需要有一個sink(end-point,也就是終點)操作系統
sink是收集接受coroutine傳送過來的數據並對這些數據進行處理。sink一般的模式爲:3d
在前面講述Generetor的文章中,咱們用Generator實現了unix中的tail -f命令和tail -f | grep 命令,在這裏,咱們也用coroutine來實現這兩個命令。unix
先來看看做爲source的代碼unix_tail_f_co()函數code
1 # A source that mimics Unix "tail -f" 2 def unix_tail_f_co(thefile, target): 3 ''' 4 target是一個coroutine 5 ''' 6 thefile.seek(0, 2) # 跳到文件末尾 7 while True: 8 line = thefile.readline() 9 if not line: 10 time.sleep(0.1) 11 continue 12 # 把數據發送給coroutine進行處理 13 target.send(line)
在上面的代碼中,能夠看到,target是一個coroutine,函數每次讀取一行數據,讀取到以後,就調用target.send()函數,把數據發送給了target,由target接收進行下一步的處理。orm
如今來看看做爲sink的printer_co()函數,這個sink很簡單,就是簡單地打印它收到的數據。
1 # A sink just prints the lines 2 @coroutine 3 def printer_co(): 4 while True: 5 # 在這個地方掛起,等待接收數據 6 line = (yield) 7 print line,
其中coroutine函數裝飾器使咱們在上一篇介紹coroutine的文章中定義的。從代碼中能夠看到,做爲sink,print_co()函數有一個死循環,從第6行能夠看到,在這個死循環中,
函數會一直掛起,等到數據的到來,而後每次接收到數據後,打印輸出,而後再次掛起等待數據的到來。
如今能夠把上面兩個函數結合起來實現tail -f命令:
1 f = open("access-log") 2 unix_tail_f_co(f,printer_co())
代碼首先打開一個文件f,f做爲數據源,把f和printer_co()傳遞給unix_tail_f_co(),這就實現了一個pipeline,只不過在這個pipeline中,數據是直接發送給做爲sink的printer_co()函數的,中間沒有通過其餘的coroutine。
在sink和source之間,能夠根據須要,添加任何的coroutine,好比數據變換(transformation)、過濾(filter)和路由(routing)等
如今,咱們添加一個coroutine,grep_filter_co(pattern,target),其中,target是一個coroutine
1 @coroutine 2 def grep_filter_co(pattern,target): 3 while True: 4 # 掛起,等待接收數據 5 line = (yield) 6 if pattern in line: 7 # 接收到的數據若是符合要求, 8 # 則發送給下一個coroutine進行處理 9 target.send(line)
從代碼中能夠看到,grep_filter_co()有一個死循環,在循環中掛起等待接收數據,一旦接收到了數據,若是數據中存在pattern,則把接收到的數據發送給target,讓target對數據進行下一步處理,而後再次等待接收數據並掛起。
一樣的,如今把這三個函數組合起來,實現tail -f | grep命令,組成一個新的pipeline:
f = open("access-log") unix_tail_f_co(f,grep_filter_co("python",printer_co()))
unix_tail_f_co()做爲source,從f文件處每次讀取一行數據,發送給grep_filter_co()這個coroutine,grep_filer_co()對接收到的數據進行過濾(filter):若是接收到的數據包含了"python"這個單詞,就把數據發送給printer_co()進行處理,而後source再把下一行數據發送到pipeline中進行處理。
在前面也用Generator實現了tail -f | grep 命令,如今能夠把二者作一個比較:
Generator實現的流程爲:

而coroutine實現的流程爲:

能夠看出,Generator 在最後的的迭代過程當中從pipe中獲取數據,而coroutine經過send()函數把數據發送到pipeline中去。
經過coroutine,能夠把數據發送到不一樣的目的地,以下圖:

下面咱們來實現消息廣播(Broadcasting)機制,首先要先定義一個函數broadcast_co(targets)
1 # 把數據發送給多個不一樣的coroutine 2 @coroutine 3 def broadcast_co(targets): 4 while True: 5 # 掛起,等待接收數據 6 item = (yield) 7 for target in targets: 8 # 接收到了數據,而後分別發送給不一樣的coroutine 9 target.send(item)
broadcats_co()函數接受一個參數targets,這個參數是一個列表(list),其中的每個成員都是coroutine,在一個死循環中,函數接收到數據以後,把數據依次發送給不一樣的coroutine進行處理,而後會掛起等待接收數據。
f = open("access-log") unix_tail_f_co(f, broadcast_co([grep_filter_co('python',printer_co()), grep_filter_c0('ply',printer_co()), grep_filter_co('swig',printer_co())]) )
unix_tail_f_co每次從f讀取一行數據,發送給broadcast_co(),broadcast_co()會把接收到的數據依次發送給gerp_filter_co(),每一個grep_filter_co()再會把數據發送給相應的printer_co()進行處理。
|---------------> grep_filter_co("python") ------> printer_co() unix_tail_f_co()--->broadcast_co() ----> grep_filter_co("ply") ---------> printer_co() |---------------> grep_filter_co("swig")---------> printer_co()
須要注意的是:broadcast_co()會先把數據發送給grep_filter_co("python"),grep_filter_co("python")會把數據發送給printer_co(),當printer_co()執行後掛起再次等待接受數據時,執行權返回到grep_filter_co("python")函數,此時grep_filter_co("python")也會掛起等待接收數據,執行權回到broadcast_co()函數,此時broadcast_co()纔會把消息發送給grep_filter_co("ply"),也只有當grep_filter_co("ply")執行完畢掛起以後,broadcast_co()纔會接着把數據發送給下一個coroutine。
若是把上面的代碼改爲這樣,就會有另一種broadcast的模式:
f = open("access-log") p = printer_co() unix_tail_f_co(f, broadcast_co([grep_filter_co('python',p)), grep_filter_co('ply',p), grep_filter_co('swig',p)]) )
此時,broadcast的模式爲
|---------------> grep_filter_co("python") ---------->| unix_tail_f_co()--->broadcast_co() ----> grep_filter_co("ply") ---------> printer_co() |---------------> grep_filter_co("swig")------------->|
最後數據都會傳送到同一個print_co()函數,也就是說最後數據的目的地爲同一個。
好了,這篇講解coroutine在模擬pipeline和控制dataflow上的應用已經完畢了,能夠看出coroutine在數據路由方面有很強大的控制能力,能夠把多個不一樣的處理方式組合在一塊兒使用。
下一篇文章會講解如何用coroutine是下班一個簡單的多任務(Multitask)的操做系統,盡請期待O(∩_∩)O。