Go 譯文之如何構建併發 Pipeline

本文首發於個人博客,若是以爲有用,歡迎點贊收藏,讓更多的朋友看到。golang

做者:Sameer Ajmani | 原文:blog.golang.org/pipelines算法

譯者前言

這篇文章來自 Go 官網,不愧是官方的博客,寫的很是詳細。在開始翻譯這篇文章前,先簡單說明兩點。編程

首先,這篇文章我以前已經翻譯過一遍,但最近再讀,發現以前的翻譯真是有點爛。因而,決定在徹底不參考以前譯文的狀況下,把這篇文章從新翻譯一遍。bash

其二,文章中有一些專有名字,計劃仍是用英文來表達,以保證原汁原味,好比 pipeline(管道)、stage (階段)、goroutine (協程)、channel (通道)。markdown

關於它們之間的關係,按本身的理解簡單畫了張草圖,但願能幫助更好地理解它們之間的關係。以下:併發

強調一點,若是你們在閱讀這篇文章時,感到了迷糊,建議能夠回頭再看一下這張圖。app

翻譯的正文部分以下。分佈式


Go 的併發原語使咱們很是輕鬆地就構建出能夠高效利用 IO 和多核 CPU 的流式數據 pipeline。這篇文章將會此爲基礎進行介紹。在這個過程當中,咱們將會遇到一些異常狀況,關於它們的處理方法,文中也會詳細介紹。函數

什麼是管道(pipleline)

關於什麼是 pipeline, Go 中並無給出明確的定義,它只是衆多併發編程方式中的一種。非正式的解釋,咱們理解爲,它是由一系列經過 chanel 鏈接起來的 stage 組成,而每一個 stage 都是由一組運行着相同函數的 goroutine 組成。每一個 stage 的 goroutine 一般會執行以下的一些工做:工具

  • 從上游的輸入 channel 中接收數據;
  • 對接收到的數據進行一些處理,(一般)併產生新的數據;
  • 將數據經過輸出 channel 發送給下游;

除了第一個 stage 和最後一個 stage ,每一個 stage 都包含必定數量的輸入和輸出 channel。第一個 stage 只有輸出,一般會把它稱爲 "生產者",最後一個 stage 只有輸入,一般咱們會把它稱爲 "消費者"。

咱們先來看一個很簡單例子,經過它來解釋上面提到那些與 pipeline 相關的概念和技術。瞭解了這些後,咱們再看其它的更實際的例子。

計算平方數

一個涉及三個 stage 的 pipeline。

第一個 stage,gen 函數。它負責將把從參數中拿到的一系列整數發送給指定 channel。它啓動了一個 goroutine 來發送數據,當數據所有發送結束,channel 會被關閉。

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}
複製代碼

第二個 stage,sq 函數。它負責從輸入 channel 中接收數據,並會返回一個新的 channel,即輸出 channel,它負責將通過平方處理過的數據傳輸給下游。當輸入 channel 關閉,而且全部數據都已發送到下游,就能夠關閉這個輸出 channel 了。

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}
複製代碼

main 函數負責建立 pipeline 並執行最後一個 stage 的任務。它將從第二個 stage 接收數據,並將它們打印出來,直到 channel 關閉。

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}
複製代碼

既然,sq 的輸入和輸出的 channel 類型相同,那麼咱們就能夠把它進行組合,從而造成多個 stage。好比,咱們能夠把 main 函數重寫爲以下的形式:

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}
複製代碼

扇出和扇入(Fan-out and Fan-in)

當多個函數從一個 channel 中讀取數據,直到 channel 關閉,這稱爲扇出 fan-out。利用它,咱們能夠實現了一種分佈式的工做方式,經過一組 workers 實現並行的 CPU 和 IO。

當一個函數從多個 channel 中讀取數據,直到全部 channel 關閉,這稱爲扇入 fan-in。扇入是經過將多個輸入 channel 的數據合併到同一個輸出 channel 實現的,當全部的輸入 channel 關閉,輸出的 channel 也將關閉。

咱們來改變一下上面例子中的 pipeline,在它上面運行兩個 sq 函數試試。它們將都從同一個輸入 channel 中讀取數據。咱們引入了一個新的函數,merge,負責 fan-in 處理結果,即 merge 兩個 sq 的處理結果。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    // 分佈式處理來自 in channel 的數據
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    // 從 channel c1 和 c2 的合併後的 channel 中接收數據
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}
複製代碼

merge 函數負責將從一系列輸入 channel 中接收的數據合併到一個 channel 中。它爲每一個輸入 channel 都啓動了一個 goroutine,並將它們中接收到的值發送到唯一的輸出 channel 中。在全部的 goroutines 啓動後,還會再另外啓動一個 goroutine,它的做用是,當全部的輸入 channel 關閉後,負責關閉惟一的輸出 channel 。

在已關閉的 channel 發送數據將致使 panic,所以要保證在關閉 channel 前,全部數據都發送完成,是很是重要的。sync.WaitGroup 提供了一種很是簡單的方式來完成這樣的同步。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed, then calls wg.Done.
    // 爲每一個輸入 channel 啓動一個 goroutine
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done. This must start after the wg.Add call.
    // 啓動一個 goroutine 負責在全部的輸入 channel 關閉後,關閉這個惟一的輸出 channel
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}
複製代碼

中途中止

pipeline 中的函數包含一個模式:

  • 當數據發送完成,每一個 stage 都應該關閉它們的輸入 channel;
  • 只要輸入 channel 沒有關閉,每一個 stage 就要持續從中接收數據;

咱們能夠經過編寫 range loop 來保證全部 goroutine 是在全部數據都已經發送到下游的時候退出。

但在一個真實的場景下,每一個 stage 都接收完 channel 中的全部數據,是不可能的。有時,咱們的設計是:接收方只須要接收數據的部分子集便可。更常見的,若是 channel 在上游的 stage 出現了錯誤,那麼,當前 stage 就應該提前退出。不管如何,接收方都不應再繼續等待接收 channel 中的剩餘數據,並且,此時上游應該中止生產數據,畢竟下游已經不須要了。

咱們的例子中,即便 stage 沒有成功消費完全部的數據,上游 stage 依然會嘗試給下游發送數據,這將會致使程序永久阻塞。

// Consume the first value from the output.
    // 從 output 中接收了第一個數據
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
    // 咱們並無從 out channel 中接收第二個數據,
    // 因此上游的其中一個 goroutine 在嘗試向下遊發送數據時將會被掛起。
}
複製代碼

這是一種資源泄露,goroutine 是須要消耗內存和運行時資源的,goroutine 棧中的堆引用信息也是不會被 gc。

咱們須要提供一種措施,即便當下游從上游接收數據時發生異常,上游也能成功退出。一種方式是,把 channel 改成帶緩衝的 channel,這樣,它就能夠承載指定數量的數據,若是 buffer channel 還有空間,數據的發送將會馬上完成。

// 緩衝大小 2 buffer size 2 
c := make(chan int, 2)
// 發送馬上成功 succeeds immediately 
c <- 1
// 發送馬上成功 succeeds immediately
c <- 2 
//blocks until another goroutine does <-c and receives 1
// 阻塞,直到另外一個 goroutine 從 c 中接收數據
c <- 3
複製代碼

若是咱們在建立 channel 時已經知道將發送的數據量,就能夠把前面的代碼簡化一下。好比,重寫 gen 函數,將數據都發送至一個 buffer channel,這還能避免建立新的 goroutine。

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}
複製代碼

譯者按:channel 關閉後,不可再寫入數據,不然會 panic,可是仍可讀取已發送數據,並且能夠一直讀取 0 值。

繼續往下游 stage,將又會返回到阻塞的 goroutine 中,咱們也能夠考慮給 merge 的輸出 channel 加點緩衝。

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup

    // enough space for the unread inputs
    // 給未讀的輸入 channel 預留足夠的空間
    out := make(chan int, 1)    
    // ... the rest is unchanged ...
複製代碼

雖然經過這個方法,咱們能解決了 goroutine 阻塞的問題,可是這並不是一個優秀的設計。好比 merge 中的 buffer 的大小 1 是基於咱們已經知道了接下來接收數據的大小,以及下游將能消費的數量。很明顯,這種設計很是脆弱,若是上游多發送了一些數據,或下游並沒接收那麼多的數據,goroutine 將又會被阻塞。

於是,當下遊再也不準備接收上游的數據時,須要有一種方式,能夠通知到上游。

明確的取消

若是 main 函數在沒把 out 中全部數據接收完就退出,它必需要通知上游中止繼續發送數據。如何作到?咱們能夠在上下游之間引入一個新的 channel,一般稱爲 done。

示例中有兩個可能阻塞的 goroutine,因此, done 須要發送兩個值來通知它們。

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    // 通知發送方,咱們已經中止接收數據了
    done <- struct{}{}
    done <- struct{}{}
}
複製代碼

發送方 merge 用 select 語句替換了以前的發送操做,它負責經過 out channel 發送數據或者從 done 接收數據。done 接收的值是沒有實際意義的,只是表示 out 應該中止繼續發送數據了,用空 struct 便可。output 函數將會不停循環,由於上游,即 sq ,並無阻塞。咱們過會再討論如何退出這個循環。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...
複製代碼

這種方法有個問題,下游只有知道了上游可能阻塞的 goroutine 數量,才能向每一個 goroutine 都發送了一個 done 信號,從而確保它們都能成功退出。但多維護一個 count 是很使人討厭的,並且很容易出錯。

咱們須要一種方式,能夠告訴上游的全部 goroutine 中止向下遊繼續發送信息。在 Go 中,其實可經過關閉 channel 實現,由於在一個已關閉的 channel 接收數據會馬上返回,而且會獲得一個零值。

這也就意味着,main 僅需經過關閉 done channel,就可讓全部的發送方解除阻塞。關閉操做至關於一個廣播信號。爲確保任意返回路徑下都成功調用,咱們能夠經過 defer 語句關閉 done。

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs. output
    // copies values from c to out until c or done is closed, then calls
    // wg.Done.
    // 爲每一個輸入 channel 啓動一個 goroutine,將輸入 channel 中的數據拷貝到
    // out channel 中,直到輸入 channel,即 c,或 done 關閉。
    // 接着,退出循環並執行 wg.Done()
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...
複製代碼

一樣地,一旦 done 關閉,sq 也將退出。sq 也是經過 defer 語句來確保本身的輸出 channel,即 out,必定被成功關閉釋放。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}
複製代碼

都這裏,Go 中如何構建一個 pipeline,已經介紹的差很少了。

簡單總結下如何正確構建一個 pipeline。

  • 當全部的發送已經完成,stage 應該關閉輸出 channel;
  • stage 應該持續從輸入 channel 中接收數據,除非 channel 關閉或主動通知到發送方中止發送。

Pipeline 中有量方式能夠解除發送方的阻塞,一是發送方建立充足空間的 channel 來發送數據,二是當接收方中止接收數據時,明確通知發送方。

摘要樹

一個真實的案例。

MD5,消息摘要算法,可用於文件校驗和的計算。下面的輸出是命令行工具 md5sum 輸出的文件摘要信息。

$ md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go
複製代碼

咱們的例子和 md5sum 相似,不一樣的是,傳遞給這個程序的參數是一個目錄。程序的輸出是目錄下每一個文件的摘要值,輸出的順序按文件名排序。

$ go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go
複製代碼

主函數,第一步調用 MD5All,它返回的是一個以文件名爲 key,摘要值爲 value 的 map,而後對返回結果進行排序和打印。

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x %s\n", m[path], path)
    }
}
複製代碼

MD5All 函數將是咱們接下來討論的重點。串行版的實現沒有併發,僅僅是從文件中讀取數據再計算。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents. If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if !info.Mode().IsRegular() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}
複製代碼

並行計算

並行版 中,咱們會把 MD5All 的計算拆分開含有兩個 stage 的 pipeline。第一個 stage,sumFiles,負責遍歷目錄和計算文件摘要值,摘要的計算會啓動一個 goroutine 來執行,計算結果將經過一個類型 result 的 channel 發出。

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}
複製代碼

sumFiles 返回了 2 個 channel,一個用於接收計算的結果,一個用於接收 filepath.Walk 的 err 返回。walk 會爲每一個文件啓動一個 goroutine 執行摘要計算和檢查 done。若是 done 關閉,walk 將馬上中止。

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c. Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done. Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        // 不須要使用 select,由於 errc 是帶有 buffer 的 channel
        errc <- err
    }()
    return c, errc
}
複製代碼

MD5All 將從 c channel 中接收計算的結果,若是發生錯誤,將經過 defer 關閉 done。

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}
複製代碼

並行限制

並行版本 中,MD5All 爲每一個文件啓動了一個 goroutine。但若是一個目錄中文件太多,這可能會致使分配的內存過大以致於超過了當前機器的限制。

咱們能夠經過限制並行讀取的文件數,限制內存分配。在 併發限制版本中,咱們建立了固定數量的 goroutine 讀取文件。如今,咱們的 pipeline 涉及 3 個 stage:遍歷目錄、文件讀取與摘要計算、結果收集。

第一個 stage,遍歷目錄並經過 paths channel 發出文件。

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if !info.Mode().IsRegular() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}
複製代碼

第二個 stage,啓動固定數量的 goroutine,從 paths channel 中讀取文件名稱,處理結果發送到 c channel。

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}
複製代碼

和以前的例子不一樣,digester 將不會關閉 c channel,由於多個 goroutine 共享這個 channel,計算結果都將發給這個 channel 上。

相應地,MD5All 會負責在全部摘要完成後關閉這個 c channel。

// Start a fixed number of goroutines to read and digest files.
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()
複製代碼

咱們也能夠爲每一個 digester 建立一個單獨的 channel,經過本身的 channel 傳輸結果。但這種方式,咱們還要再啓動一個新的 goroutine 合併結果。

最後一個 stage,負責從 c 中接收處理結果,經過 errc 檢查是否有錯誤發生。該檢查沒法提早進行,由於提早執行將會阻塞 walkFile 往下游發送數據。

m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}
複製代碼

總結

這篇文章介紹了,在 Go 中如何正確地構建流式數據 pipeline。它的異常處理很是複雜,pipeline 中的每一個 stage 均可能致使上游阻塞,而下游可能再也不關心接下來的數據。關閉 channel 能夠給全部運行中的 goroutine 發送 done 信號,這能幫助咱們成功解除阻塞。如何正確地構建一條流式數據 pipeline,文中也總結了一些指導建議。

相關文章
相關標籤/搜索