做者:Sameer Ajmani | 原文:blog.golang.org/pipelines算法
這篇文章來自 Go 官網,不愧是官方的博客,寫的很是詳細。在開始翻譯這篇文章前,先簡單說明兩點。編程
首先,這篇文章我以前已經翻譯過一遍,但最近再讀,發現以前的翻譯真是有點爛。因而,決定在徹底不參考以前譯文的狀況下,把這篇文章從新翻譯一遍。bash
其二,文章中有一些專有名字,計劃仍是用英文來表達,以保證原汁原味,好比 pipeline(管道)、stage (階段)、goroutine (協程)、channel (通道)。markdown
關於它們之間的關係,按本身的理解簡單畫了張草圖,但願能幫助更好地理解它們之間的關係。以下:併發
強調一點,若是你們在閱讀這篇文章時,感到了迷糊,建議能夠回頭再看一下這張圖。app
翻譯的正文部分以下。分佈式
Go 的併發原語使咱們很是輕鬆地就構建出能夠高效利用 IO 和多核 CPU 的流式數據 pipeline。這篇文章將會此爲基礎進行介紹。在這個過程當中,咱們將會遇到一些異常狀況,關於它們的處理方法,文中也會詳細介紹。函數
關於什麼是 pipeline, Go 中並無給出明確的定義,它只是衆多併發編程方式中的一種。非正式的解釋,咱們理解爲,它是由一系列經過 chanel 鏈接起來的 stage 組成,而每一個 stage 都是由一組運行着相同函數的 goroutine 組成。每一個 stage 的 goroutine 一般會執行以下的一些工做:工具
除了第一個 stage 和最後一個 stage ,每一個 stage 都包含必定數量的輸入和輸出 channel。第一個 stage 只有輸出,一般會把它稱爲 "生產者",最後一個 stage 只有輸入,一般咱們會把它稱爲 "消費者"。
咱們先來看一個很簡單例子,經過它來解釋上面提到那些與 pipeline 相關的概念和技術。瞭解了這些後,咱們再看其它的更實際的例子。
一個涉及三個 stage 的 pipeline。
第一個 stage,gen 函數。它負責將把從參數中拿到的一系列整數發送給指定 channel。它啓動了一個 goroutine 來發送數據,當數據所有發送結束,channel 會被關閉。
func gen(nums ...int) <-chan int { out := make(chan int) go func() { for _, n := range nums { out <- n } close(out) }() return out } 複製代碼
第二個 stage,sq 函數。它負責從輸入 channel 中接收數據,並會返回一個新的 channel,即輸出 channel,它負責將通過平方處理過的數據傳輸給下游。當輸入 channel 關閉,而且全部數據都已發送到下游,就能夠關閉這個輸出 channel 了。
func sq(in <-chan int) <-chan int { out := make(chan int) go func() { for n := range in { out <- n * n } close(out) }() return out } 複製代碼
main 函數負責建立 pipeline 並執行最後一個 stage 的任務。它將從第二個 stage 接收數據,並將它們打印出來,直到 channel 關閉。
func main() { // Set up the pipeline. c := gen(2, 3) out := sq(c) // Consume the output. fmt.Println(<-out) // 4 fmt.Println(<-out) // 9 } 複製代碼
既然,sq 的輸入和輸出的 channel 類型相同,那麼咱們就能夠把它進行組合,從而造成多個 stage。好比,咱們能夠把 main 函數重寫爲以下的形式:
func main() { // Set up the pipeline and consume the output. for n := range sq(sq(gen(2, 3))) { fmt.Println(n) // 16 then 81 } } 複製代碼
當多個函數從一個 channel 中讀取數據,直到 channel 關閉,這稱爲扇出 fan-out。利用它,咱們能夠實現了一種分佈式的工做方式,經過一組 workers 實現並行的 CPU 和 IO。
當一個函數從多個 channel 中讀取數據,直到全部 channel 關閉,這稱爲扇入 fan-in。扇入是經過將多個輸入 channel 的數據合併到同一個輸出 channel 實現的,當全部的輸入 channel 關閉,輸出的 channel 也將關閉。
咱們來改變一下上面例子中的 pipeline,在它上面運行兩個 sq 函數試試。它們將都從同一個輸入 channel 中讀取數據。咱們引入了一個新的函數,merge,負責 fan-in 處理結果,即 merge 兩個 sq 的處理結果。
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. // 分佈式處理來自 in channel 的數據 c1 := sq(in) c2 := sq(in) // Consume the merged output from c1 and c2. // 從 channel c1 和 c2 的合併後的 channel 中接收數據 for n := range merge(c1, c2) { fmt.Println(n) // 4 then 9, or 9 then 4 } } 複製代碼
merge 函數負責將從一系列輸入 channel 中接收的數據合併到一個 channel 中。它爲每一個輸入 channel 都啓動了一個 goroutine,並將它們中接收到的值發送到唯一的輸出 channel 中。在全部的 goroutines 啓動後,還會再另外啓動一個 goroutine,它的做用是,當全部的輸入 channel 關閉後,負責關閉惟一的輸出 channel 。
在已關閉的 channel 發送數據將致使 panic,所以要保證在關閉 channel 前,全部數據都發送完成,是很是重要的。sync.WaitGroup 提供了一種很是簡單的方式來完成這樣的同步。
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed, then calls wg.Done. // 爲每一個輸入 channel 啓動一個 goroutine output := func(c <-chan int) { for n := range c { out <- n } wg.Done() } wg.Add(len(cs)) for _, c := range cs { go output(c) } // Start a goroutine to close out once all the output goroutines are // done. This must start after the wg.Add call. // 啓動一個 goroutine 負責在全部的輸入 channel 關閉後,關閉這個惟一的輸出 channel go func() { wg.Wait() close(out) }() return out } 複製代碼
pipeline 中的函數包含一個模式:
咱們能夠經過編寫 range loop 來保證全部 goroutine 是在全部數據都已經發送到下游的時候退出。
但在一個真實的場景下,每一個 stage 都接收完 channel 中的全部數據,是不可能的。有時,咱們的設計是:接收方只須要接收數據的部分子集便可。更常見的,若是 channel 在上游的 stage 出現了錯誤,那麼,當前 stage 就應該提前退出。不管如何,接收方都不應再繼續等待接收 channel 中的剩餘數據,並且,此時上游應該中止生產數據,畢竟下游已經不須要了。
咱們的例子中,即便 stage 沒有成功消費完全部的數據,上游 stage 依然會嘗試給下游發送數據,這將會致使程序永久阻塞。
// Consume the first value from the output. // 從 output 中接收了第一個數據 out := merge(c1, c2) fmt.Println(<-out) // 4 or 9 return // Since we didn't receive the second value from out, // one of the output goroutines is hung attempting to send it. // 咱們並無從 out channel 中接收第二個數據, // 因此上游的其中一個 goroutine 在嘗試向下遊發送數據時將會被掛起。 } 複製代碼
這是一種資源泄露,goroutine 是須要消耗內存和運行時資源的,goroutine 棧中的堆引用信息也是不會被 gc。
咱們須要提供一種措施,即便當下游從上游接收數據時發生異常,上游也能成功退出。一種方式是,把 channel 改成帶緩衝的 channel,這樣,它就能夠承載指定數量的數據,若是 buffer channel 還有空間,數據的發送將會馬上完成。
// 緩衝大小 2 buffer size 2 c := make(chan int, 2) // 發送馬上成功 succeeds immediately c <- 1 // 發送馬上成功 succeeds immediately c <- 2 //blocks until another goroutine does <-c and receives 1 // 阻塞,直到另外一個 goroutine 從 c 中接收數據 c <- 3 複製代碼
若是咱們在建立 channel 時已經知道將發送的數據量,就能夠把前面的代碼簡化一下。好比,重寫 gen 函數,將數據都發送至一個 buffer channel,這還能避免建立新的 goroutine。
func gen(nums ...int) <-chan int { out := make(chan int, len(nums)) for _, n := range nums { out <- n } close(out) return out } 複製代碼
譯者按:channel 關閉後,不可再寫入數據,不然會 panic,可是仍可讀取已發送數據,並且能夠一直讀取 0 值。
繼續往下游 stage,將又會返回到阻塞的 goroutine 中,咱們也能夠考慮給 merge 的輸出 channel 加點緩衝。
func merge(cs ...<-chan int) <-chan int { var wg sync.WaitGroup // enough space for the unread inputs // 給未讀的輸入 channel 預留足夠的空間 out := make(chan int, 1) // ... the rest is unchanged ... 複製代碼
雖然經過這個方法,咱們能解決了 goroutine 阻塞的問題,可是這並不是一個優秀的設計。好比 merge 中的 buffer 的大小 1 是基於咱們已經知道了接下來接收數據的大小,以及下游將能消費的數量。很明顯,這種設計很是脆弱,若是上游多發送了一些數據,或下游並沒接收那麼多的數據,goroutine 將又會被阻塞。
於是,當下遊再也不準備接收上游的數據時,須要有一種方式,能夠通知到上游。
若是 main 函數在沒把 out 中全部數據接收完就退出,它必需要通知上游中止繼續發送數據。如何作到?咱們能夠在上下游之間引入一個新的 channel,一般稱爲 done。
示例中有兩個可能阻塞的 goroutine,因此, done 須要發送兩個值來通知它們。
func main() { in := gen(2, 3) // Distribute the sq work across two goroutines that both read from in. c1 := sq(in) c2 := sq(in) // Consume the first value from output. done := make(chan struct{}, 2) out := merge(done, c1, c2) fmt.Println(<-out) // 4 or 9 // Tell the remaining senders we're leaving. // 通知發送方,咱們已經中止接收數據了 done <- struct{}{} done <- struct{}{} } 複製代碼
發送方 merge 用 select 語句替換了以前的發送操做,它負責經過 out channel 發送數據或者從 done 接收數據。done 接收的值是沒有實際意義的,只是表示 out 應該中止繼續發送數據了,用空 struct 便可。output 函數將會不停循環,由於上游,即 sq ,並無阻塞。咱們過會再討論如何退出這個循環。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c is closed or it receives a value // from done, then output calls wg.Done. output := func(c <-chan int) { for n := range c { select { case out <- n: case <-done: } } wg.Done() } // ... the rest is unchanged ... 複製代碼
這種方法有個問題,下游只有知道了上游可能阻塞的 goroutine 數量,才能向每一個 goroutine 都發送了一個 done 信號,從而確保它們都能成功退出。但多維護一個 count 是很使人討厭的,並且很容易出錯。
咱們須要一種方式,能夠告訴上游的全部 goroutine 中止向下遊繼續發送信息。在 Go 中,其實可經過關閉 channel 實現,由於在一個已關閉的 channel 接收數據會馬上返回,而且會獲得一個零值。
這也就意味着,main 僅需經過關閉 done channel,就可讓全部的發送方解除阻塞。關閉操做至關於一個廣播信號。爲確保任意返回路徑下都成功調用,咱們能夠經過 defer 語句關閉 done。
func merge(done <-chan struct{}, cs ...<-chan int) <-chan int { var wg sync.WaitGroup out := make(chan int) // Start an output goroutine for each input channel in cs. output // copies values from c to out until c or done is closed, then calls // wg.Done. // 爲每一個輸入 channel 啓動一個 goroutine,將輸入 channel 中的數據拷貝到 // out channel 中,直到輸入 channel,即 c,或 done 關閉。 // 接着,退出循環並執行 wg.Done() output := func(c <-chan int) { defer wg.Done() for n := range c { select { case out <- n: case <-done: return } } } // ... the rest is unchanged ... 複製代碼
一樣地,一旦 done 關閉,sq 也將退出。sq 也是經過 defer 語句來確保本身的輸出 channel,即 out,必定被成功關閉釋放。
func sq(done <-chan struct{}, in <-chan int) <-chan int { out := make(chan int) go func() { defer close(out) for n := range in { select { case out <- n * n: case <-done: return } } }() return out } 複製代碼
都這裏,Go 中如何構建一個 pipeline,已經介紹的差很少了。
簡單總結下如何正確構建一個 pipeline。
Pipeline 中有量方式能夠解除發送方的阻塞,一是發送方建立充足空間的 channel 來發送數據,二是當接收方中止接收數據時,明確通知發送方。
一個真實的案例。
MD5,消息摘要算法,可用於文件校驗和的計算。下面的輸出是命令行工具 md5sum 輸出的文件摘要信息。
$ md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
複製代碼
咱們的例子和 md5sum 相似,不一樣的是,傳遞給這個程序的參數是一個目錄。程序的輸出是目錄下每一個文件的摘要值,輸出的順序按文件名排序。
$ go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65 bounded.go
ee869afd31f83cbb2d10ee81b2b831dc parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96 serial.go
複製代碼
主函數,第一步調用 MD5All,它返回的是一個以文件名爲 key,摘要值爲 value 的 map,而後對返回結果進行排序和打印。
func main() { // Calculate the MD5 sum of all files under the specified directory, // then print the results sorted by path name. m, err := MD5All(os.Args[1]) if err != nil { fmt.Println(err) return } var paths []string for path := range m { paths = append(paths, path) } sort.Strings(paths) for _, path := range paths { fmt.Printf("%x %s\n", m[path], path) } } 複製代碼
MD5All 函數將是咱們接下來討論的重點。串行版的實現沒有併發,僅僅是從文件中讀取數據再計算。
// MD5All reads all the files in the file tree rooted at root and returns a map // from file path to the MD5 sum of the file's contents. If the directory walk // fails or any read operation fails, MD5All returns an error. func MD5All(root string) (map[string][md5.Size]byte, error) { m := make(map[string][md5.Size]byte) err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } data, err := ioutil.ReadFile(path) if err != nil { return err } m[path] = md5.Sum(data) return nil }) if err != nil { return nil, err } return m, nil } 複製代碼
在 並行版 中,咱們會把 MD5All 的計算拆分開含有兩個 stage 的 pipeline。第一個 stage,sumFiles,負責遍歷目錄和計算文件摘要值,摘要的計算會啓動一個 goroutine 來執行,計算結果將經過一個類型 result 的 channel 發出。
type result struct { path string sum [md5.Size]byte err error } 複製代碼
sumFiles 返回了 2 個 channel,一個用於接收計算的結果,一個用於接收 filepath.Walk 的 err 返回。walk 會爲每一個文件啓動一個 goroutine 執行摘要計算和檢查 done。若是 done 關閉,walk 將馬上中止。
func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) { // For each regular file, start a goroutine that sums the file and sends // the result on c. Send the result of the walk on errc. c := make(chan result) errc := make(chan error, 1) go func() { var wg sync.WaitGroup err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } wg.Add(1) go func() { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: } wg.Done() }() // Abort the walk if done is closed. select { case <-done: return errors.New("walk canceled") default: return nil } }) // Walk has returned, so all calls to wg.Add are done. Start a // goroutine to close c once all the sends are done. go func() { wg.Wait() close(c) }() // No select needed here, since errc is buffered. // 不須要使用 select,由於 errc 是帶有 buffer 的 channel errc <- err }() return c, errc } 複製代碼
MD5All 將從 c channel 中接收計算的結果,若是發生錯誤,將經過 defer 關閉 done。
func MD5All(root string) (map[string][md5.Size]byte, error) { // MD5All closes the done channel when it returns; it may do so before // receiving all the values from c and errc. done := make(chan struct{}) defer close(done) c, errc := sumFiles(done, root) m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } if err := <-errc; err != nil { return nil, err } return m, nil } 複製代碼
在 並行版本 中,MD5All 爲每一個文件啓動了一個 goroutine。但若是一個目錄中文件太多,這可能會致使分配的內存過大以致於超過了當前機器的限制。
咱們能夠經過限制並行讀取的文件數,限制內存分配。在 併發限制版本中,咱們建立了固定數量的 goroutine 讀取文件。如今,咱們的 pipeline 涉及 3 個 stage:遍歷目錄、文件讀取與摘要計算、結果收集。
第一個 stage,遍歷目錄並經過 paths channel 發出文件。
func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) { paths := make(chan string) errc := make(chan error, 1) go func() { // Close the paths channel after Walk returns. defer close(paths) // No select needed for this send, since errc is buffered. errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.Mode().IsRegular() { return nil } select { case paths <- path: case <-done: return errors.New("walk canceled") } return nil }) }() return paths, errc } 複製代碼
第二個 stage,啓動固定數量的 goroutine,從 paths channel 中讀取文件名稱,處理結果發送到 c channel。
func digester(done <-chan struct{}, paths <-chan string, c chan<- result) { for path := range paths { data, err := ioutil.ReadFile(path) select { case c <- result{path, md5.Sum(data), err}: case <-done: return } } } 複製代碼
和以前的例子不一樣,digester 將不會關閉 c channel,由於多個 goroutine 共享這個 channel,計算結果都將發給這個 channel 上。
相應地,MD5All 會負責在全部摘要完成後關閉這個 c channel。
// Start a fixed number of goroutines to read and digest files. c := make(chan result) var wg sync.WaitGroup const numDigesters = 20 wg.Add(numDigesters) for i := 0; i < numDigesters; i++ { go func() { digester(done, paths, c) wg.Done() }() } go func() { wg.Wait() close(c) }() 複製代碼
咱們也能夠爲每一個 digester 建立一個單獨的 channel,經過本身的 channel 傳輸結果。但這種方式,咱們還要再啓動一個新的 goroutine 合併結果。
最後一個 stage,負責從 c 中接收處理結果,經過 errc 檢查是否有錯誤發生。該檢查沒法提早進行,由於提早執行將會阻塞 walkFile 往下游發送數據。
m := make(map[string][md5.Size]byte) for r := range c { if r.err != nil { return nil, r.err } m[r.path] = r.sum } // Check whether the Walk failed. if err := <-errc; err != nil { return nil, err } return m, nil } 複製代碼
這篇文章介紹了,在 Go 中如何正確地構建流式數據 pipeline。它的異常處理很是複雜,pipeline 中的每一個 stage 均可能致使上游阻塞,而下游可能再也不關心接下來的數據。關閉 channel 能夠給全部運行中的 goroutine 發送 done 信號,這能幫助咱們成功解除阻塞。如何正確地構建一條流式數據 pipeline,文中也總結了一些指導建議。