Python高級編程之生成器(Generator)與coroutine(三):coroutine與pipeline(管道)和Dataflow(數據流_

原創做品,轉載請註明出處:點我

 

在前兩篇文章中,咱們介紹了什麼是Generator和coroutine,在這一篇文章中,咱們會介紹coroutine在模擬pipeline(管道 )和控制Dataflow(數據流)方面的運用。html

coroutine能夠用來模擬pipeline行爲。經過把多個coroutine串聯在一塊兒來實現pipe,在這個管道中,數據是經過send()函數在各個coroutine之間傳遞的:python

可是這些在pipe中傳遞的數據哪裏來的呢?這就須要一個數據源,或者說producer.這個producer驅動整個pipe的運行:數組

一般狀況下,source只是提供數據,驅動整個pipe的運行,其自己並非一個coroutine,一般行爲相似於下面這個模式:函數

其中,target就是一個coroutine,當調用target.send()函數的時候,數據將會傳入整個pipe。spa

既然pipeline有一個起點,一樣的也就必需要有一個sink(end-point,也就是終點)操作系統

sink是收集接受coroutine傳送過來的數據並對這些數據進行處理。sink一般的模式爲:3d

在前面講述Generetor的文章中,咱們用Generator實現了unix中的tail -f命令和tail -f  | grep 命令,在這裏,咱們也用coroutine來實現這兩個命令。unix

先來看看做爲source的代碼unix_tail_f_co()函數code

 1 # A source that mimics Unix "tail -f"
 2 def unix_tail_f_co(thefile, target):
 3     '''
 4     target是一個coroutine
 5     '''
 6     thefile.seek(0, 2)  # 跳到文件末尾
 7     while True:
 8         line = thefile.readline()
 9         if not line:
10             time.sleep(0.1)
11             continue
12         # 把數據發送給coroutine進行處理
13         target.send(line)

在上面的代碼中,能夠看到,target是一個coroutine,函數每次讀取一行數據,讀取到以後,就調用target.send()函數,把數據發送給了target,由target接收進行下一步的處理。orm

如今來看看做爲sink的printer_co()函數,這個sink很簡單,就是簡單地打印它收到的數據。

1 # A sink just prints the lines
2 @coroutine
3 def printer_co():
4     while True:
5         # 在這個地方掛起,等待接收數據
6         line = (yield)
7         print line,

其中coroutine函數裝飾器使咱們在上一篇介紹coroutine的文章中定義的。從代碼中能夠看到,做爲sink,print_co()函數有一個死循環,從第6行能夠看到,在這個死循環中,

函數會一直掛起,等到數據的到來,而後每次接收到數據後,打印輸出,而後再次掛起等待數據的到來。

如今能夠把上面兩個函數結合起來實現tail -f命令:

1 f = open("access-log")
2 unix_tail_f_co(f,printer_co())

代碼首先打開一個文件f,f做爲數據源,把f和printer_co()傳遞給unix_tail_f_co(),這就實現了一個pipeline,只不過在這個pipeline中,數據是直接發送給做爲sink的printer_co()函數的,中間沒有通過其餘的coroutine。

在sink和source之間,能夠根據須要,添加任何的coroutine,好比數據變換(transformation)、過濾(filter)和路由(routing)等

如今,咱們添加一個coroutine,grep_filter_co(pattern,target),其中,target是一個coroutine

1 @coroutine
2 def grep_filter_co(pattern,target):
3     while True:
4         # 掛起,等待接收數據
5         line = (yield)
6         if pattern in line:
7             # 接收到的數據若是符合要求,
8             # 則發送給下一個coroutine進行處理
9             target.send(line)
從代碼中能夠看到,grep_filter_co()有一個死循環,在循環中掛起等待接收數據,一旦接收到了數據,若是數據中存在pattern,則把接收到的數據發送給target,讓target對數據進行下一步處理,而後再次等待接收數據並掛起。
一樣的,如今把這三個函數組合起來,實現tail -f | grep命令,組成一個新的pipeline:
f = open("access-log")
unix_tail_f_co(f,grep_filter_co("python",printer_co()))

unix_tail_f_co()做爲source,從f文件處每次讀取一行數據,發送給grep_filter_co()這個coroutine,grep_filer_co()對接收到的數據進行過濾(filter):若是接收到的數據包含了"python"這個單詞,就把數據發送給printer_co()進行處理,而後source再把下一行數據發送到pipeline中進行處理。

在前面也用Generator實現了tail -f | grep 命令,如今能夠把二者作一個比較:
Generator實現的流程爲:

而coroutine實現的流程爲:

能夠看出,Generator 在最後的的迭代過程當中從pipe中獲取數據,而coroutine經過send()函數把數據發送到pipeline中去。
經過coroutine,能夠把數據發送到不一樣的目的地,以下圖:

下面咱們來實現消息廣播(Broadcasting)機制,首先要先定義一個函數broadcast_co(targets)
1 # 把數據發送給多個不一樣的coroutine
2 @coroutine
3 def broadcast_co(targets):
4     while True:
5         # 掛起,等待接收數據
6         item = (yield)
7         for target in targets:
8             # 接收到了數據,而後分別發送給不一樣的coroutine
9             target.send(item)

broadcats_co()函數接受一個參數targets,這個參數是一個列表(list),其中的每個成員都是coroutine,在一個死循環中,函數接收到數據以後,把數據依次發送給不一樣的coroutine進行處理,而後會掛起等待接收數據。

f = open("access-log")
unix_tail_f_co(f,
            broadcast_co([grep_filter_co('python',printer_co()),
                         grep_filter_c0('ply',printer_co()),
                         grep_filter_co('swig',printer_co())])
            )            

unix_tail_f_co每次從f讀取一行數據,發送給broadcast_co(),broadcast_co()會把接收到的數據依次發送給gerp_filter_co(),每一個grep_filter_co()再會把數據發送給相應的printer_co()進行處理。

                        |---------------> grep_filter_co("python") ------> printer_co()
 unix_tail_f_co()--->broadcast_co() ----> grep_filter_co("ply") ---------> printer_co()
                        |---------------> grep_filter_co("swig")---------> printer_co()

須要注意的是:broadcast_co()會先把數據發送給grep_filter_co("python"),grep_filter_co("python")會把數據發送給printer_co(),當printer_co()執行後掛起再次等待接受數據時,執行權返回到grep_filter_co("python")函數,此時grep_filter_co("python")也會掛起等待接收數據,執行權回到broadcast_co()函數,此時broadcast_co()纔會把消息發送給grep_filter_co("ply"),也只有當grep_filter_co("ply")執行完畢掛起以後,broadcast_co()纔會接着把數據發送給下一個coroutine。

若是把上面的代碼改爲這樣,就會有另一種broadcast的模式:

f = open("access-log")
p = printer_co()
unix_tail_f_co(f,
              broadcast_co([grep_filter_co('python',p)),
                            grep_filter_co('ply',p),
                            grep_filter_co('swig',p)])
              )

此時,broadcast的模式爲

                       |---------------> grep_filter_co("python") ---------->|
unix_tail_f_co()--->broadcast_co() ----> grep_filter_co("ply") ---------> printer_co()
                       |---------------> grep_filter_co("swig")------------->|

最後數據都會傳送到同一個print_co()函數,也就是說最後數據的目的地爲同一個。

 

好了,這篇講解coroutine在模擬pipeline和控制dataflow上的應用已經完畢了,能夠看出coroutine在數據路由方面有很強大的控制能力,能夠把多個不一樣的處理方式組合在一塊兒使用。

下一篇文章會講解如何用coroutine是下班一個簡單的多任務(Multitask)的操做系統,盡請期待O(∩_∩)O。

相關文章
相關標籤/搜索