Go語言併發模型:像Unix Pipe那樣使用channel

簡介

Go語言的併發原語容許開發者以相似於 Unix Pipe 的方式構建數據流水線 (data pipelines),數據流水線可以高效地利用 I/O和多核 CPU 的優點。golang

本文要講的就是一些使用流水線的一些例子,流水線的錯誤處理也是本文的重點。c#

閱讀建議

數據流水線充分利用了多核特性,代碼層面是基於 channel 類型 和 go 關鍵字。緩存

channel 和 go 貫穿本文的始終。若是你對這兩個概念不太瞭解,建議先閱讀以前公衆號發佈的兩篇文章:Go 語言內存模型(上/下)。微信

若是你對操做系統中"生產者"和"消費者"模型比較瞭解的話,也將有助於對本文中流水線的理解。併發

本文中絕大多數講解都是基於代碼進行的。換句話說,若是你看不太懂某些代碼片斷,建議補全之後,在機器或play.golang.org 上運行一下。對於某些不明白的細節,能夠手動添加一些語句以助於理解。ide

因爲 Go語言併發模型 的英文原文 Go Concurrency Patterns: Pipelines and cancellation 篇幅比較長,本文只包含 理論推導和簡單的例子。
下一篇文章咱們會對 "並行MD5" 這個現實生活的例子進行詳細地講解。函數

什麼是 "流水線" (pipeline)?

對於"流水線"這個概念,Go語言中並無正式的定義,它只是不少種併發方式的一種。這裏我給出一個非官方的定義:一條流水線是 是由多個階段組成的,相鄰的兩個階段由 channel 進行鏈接;
每一個階段是由一組在同一個函數中啓動的 goroutine 組成。在每一個階段,這些 goroutine 會執行下面三個操做:spa

  1. 經過 inbound channels 從上游接收數據操作系統

  2. 對接收到的數據執行一些操做,一般會生成新的數據翻譯

  3. 將新生成的數據經過 outbound channels 發送給下游

除了第一個和最後一個階段,每一個階段均可以有任意個 inbound 和 outbound channel。
顯然,第一個階段只有 outbound channel,而最後一個階段只有 inbound channel。
咱們一般稱第一個階段爲"生產者""源頭",稱最後一個階段爲"消費者""接收者"

首先,咱們經過一個簡單的例子來演示這個概念和其中的技巧。後面咱們會更出一個真實世界的例子。

流水線入門:求平方數

假設咱們有一個流水線,它由三個階段組成。

第一階段是 gen 函數,它可以將一組整數轉換爲channel,channel 能夠將數字發送出去。
gen 函數首先啓動一個 goroutine,該goroutine 發送數字到 channel,當數字發送完時關閉channel。
代碼以下:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二階段是 sq 函數,它從 channel 接收一個整數,而後返回 一個channel,返回的channel能夠發送 接收到整數的平方。
當它的 inbound channel 關閉,而且把全部數字均發送到下游時,會關閉 outbound channel。代碼以下:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main 函數 用於設置流水線並運行最後一個階段。最後一個階段會從第二階段接收數字,並逐個打印出來,直到來自於上游的 inbound channel關閉。代碼以下:

func main() {
    // 設置流水線
    c := gen(2, 3)
    out := sq(c)

    // 消費輸出結果
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

因爲 sq 函數的 inbound channel 和 outbound channel 類型同樣,因此組合任意個 sq 函數。好比像下面這樣使用:

func main() {
    // 設置流水線並消費輸出結果
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}

若是咱們稍微修改一下 gen 函數,即可以模擬 haskell的惰性求值。有興趣的讀者能夠本身折騰一下。

流水線進階:扇入和扇出

扇出:同一個 channel 能夠被多個函數讀取數據,直到channel關閉。
這種機制容許將工做負載分發到一組worker,以便更好地並行使用 CPU 和 I/O。

扇入:多個 channel 的數據能夠被同一個函數讀取和處理,而後合併到一個 channel,直到全部 channel都關閉。

下面這張圖對 扇入 有一個直觀的描述:

扇入

咱們修改一下上個例子中的流水線,這裏咱們運行兩個 sq 實例,它們從同一個 channel 讀取數據。
這裏咱們引入一個新函數 merge 對結果進行"扇入"操做:

func main() {
    in := gen(2, 3)

    // 啓動兩個 sq 實例,即兩個goroutines處理 channel "in" 的數據
    c1 := sq(in)
    c2 := sq(in)

    // merge 函數將 channel c1 和 c2 合併到一塊兒,這段代碼會消費 merge 的結果
    for n := range merge(c1, c2) {
        fmt.Println(n) // 打印 4 9, 或 9 4
    }
}

merge 函數 將多個 channel 轉換爲一個 channel,它爲每個 inbound channel 啓動一個 goroutine,用於將數據
拷貝到 outbound channel。
merge 函數的實現見下面代碼 (注意 wg 變量):

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 爲每個輸入channel cs 建立一個 goroutine output
    // output 將數據從 c 拷貝到 out,直到 c 關閉,而後 調用 wg.Done
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 啓動一個 goroutine,用於全部 output goroutine結束時,關閉 out 
    // 該goroutine 必須在 wg.Add 以後啓動
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

在上面的代碼中,每一個 inbound channel 對應一個 output 函數。全部 output goroutine 被建立之後,merge 啓動一個額外的 goroutine,
這個goroutine會等待全部 inbound channel 上的發送操做結束之後,關閉 outbound channel。

對已經關閉的channel 執行發送操做(ch<-)會致使異常,因此咱們必須保證全部的發送操做都在關閉channel以前結束。
sync.WaitGroup 提供了一種組織同步的方式。
它保證 merge 中全部 inbound channel (cs ...<-chan int) 均被正常關閉, output goroutine 正常結束後,關閉 out channel。

停下來思考一下

在使用流水線函數時,有一個固定的模式:

  1. 在一個階段,當全部發送操做 (ch<-) 結束之後,關閉 outbound channel

  2. 在一個階段,goroutine 會持續從 inbount channel 接收數據,直到全部 inbound channel 所有關閉

在這種模式下,每個接收階段均可以寫成 range 循環的方式,
從而保證全部數據都被成功發送到下游後,goroutine可以當即退出。

在現實中,階段並不老是接收全部的 inbound 數據。有時候是設計如此:接收者可能只須要數據的一個子集就能夠繼續執行。
更常見的狀況是:因爲前一個階段返回一個錯誤,致使該階段提早退出。
這兩種狀況下,接收者都不該該繼續等待後面的值被傳送過來。

咱們指望的結果是:當後一個階段不須要數據時,上游階段可以中止生產。

在咱們的例子中,若是一個階段不能消費全部的 inbound 數據,試圖發送這些數據的 goroutine 會永久阻塞。看下面這段代碼片斷:

// 只消費 out 的第一個數據
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // 因爲咱們再也不接收 out 的第二個數據
    // 其中一個 goroutine output 將會在發送時被阻塞
}

顯然這裏存在資源泄漏。一方面goroutine 消耗內存和運行時資源,另外一方面goroutine 棧中的堆引用會阻止 gc 執行回收操做。
既然goroutine 不能被回收,那麼他們必須本身退出。

咱們從新整理一下流水線中的不一樣階段,保證在下游階段接收數據失敗時,上游階段也可以正常退出。
一個方式是使用帶有緩衝的管道做爲 outbound channel。緩存能夠存儲固定個數的數據。
若是緩存沒有用完,那麼發送操做會當即返回。看下面這段代碼示例:

c := make(chan int, 2) // 緩衝大小爲 2
c <- 1  // 當即返回
c <- 2  // 當即返回
c <- 3  // 該操做會被阻塞,直到有一個 goroutine 執行 <-c,並接收到數字 1

若是在建立 channel 時就知道要發送的值的個數,使用buffer就可以簡化代碼。
仍然使用求平方數的例子,咱們對 gen 函數進行重寫。咱們將這組整型數拷貝到一個
緩衝 channel中,從而避免建立一個新的 goroutine:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

回到 流水線中被阻塞的 goroutine,咱們考慮讓 merge 函數返回一個緩衝管道:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // 在本例中存儲未讀的數據足夠了
    // ... 其餘部分代碼不變 ...

儘管這種方法解決了這個程序中阻塞 goroutine的問題,可是從長遠來看,它並非好辦法。
緩存大小選擇爲1 是創建在兩個前提之上:

  1. 咱們已經知道 merge 函數有兩個 inbound channel

  2. 咱們已經知道下游階段會消耗多少個值

這段代碼很脆弱。若是咱們在傳入一個值給 gen 函數,或者下游階段讀取的值變少,goroutine
會再次被阻塞。

爲了從根本上解決這個問題,咱們須要提供一種機制,讓下游階段可以告知上游發送者中止接收的消息。
下面咱們看下這種機制。

顯式取消 (Explicit cancellation)

當 main 函數決定退出,並中止接收 out 發送的任何數據時,它必須告訴上游階段的 goroutine 讓它們放棄
正在發送的數據。 main 函數經過發送數據到一個名爲 done 的channel實現這樣的機制。 因爲有兩個潛在的
發送者被阻塞,它發送兩個值。以下代碼所示:

func main() {
    in := gen(2, 3)

    // 啓動兩個運行 sq 的goroutine
    // 兩個goroutine的數據均來自於 in
    c1 := sq(in)
    c2 := sq(in)

    // 消耗 output 生產的第一個值
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // 告訴其餘發送者,咱們將要離開
    // 再也不接收它們的數據
    done <- struct{}{}
    done <- struct{}{}
}

發送數據的 goroutine 使用一個 select 表達式代替原來的操做,select 表達式只有在接收到 out 或 done
發送的數據後,纔會繼續進行下去。 done 的值類型爲 struct{} ,由於它發送什麼值不重要,重要的是它發送沒發送:
接收事件發生意味着 channel out 的發送操做被丟棄。 goroutine output 基於 inbound channel c 繼續執行
循環,因此上游階段不會被阻塞。(後面咱們會討論如何讓循環提早退出)。 使用 done channel 方式實現的merge 函數以下:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 爲 cs 的的每個 輸入channel
    // 建立一個goroutine。output函數將
    // 數據從 c 拷貝到 out,直到c關閉,
    // 或者接收到 done 信號;
    // 而後調用 wg.Done()
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

這種方法有一個問題:每個下游的接收者須要知道潛在被阻上游發送者的個數,而後向這些發送者發送信號讓它們提早退出。
時刻追蹤這些數目是一項繁瑣且易出錯的工做。

咱們須要一種方式可以讓未知數目、且個數不受限制的goroutine 中止向下遊發送數據。在Go語言中,咱們能夠經過關閉一個
channel 實現,由於在一個已關閉 channel 上執行接收操做(<-ch)老是可以當即返回,返回值是對應類型的零值。關於這點的細節,點擊這裏查看。

換句話說,咱們只要關閉 done channel,就可以讓解開對全部發送者的阻塞。對一個管道的關閉操做事實上是對全部接收者的廣播信號。

咱們把 done channel 做爲一個參數傳遞給每個 流水線上的函數,經過 defer 表達式聲明對 done channel的關閉操做。
所以,全部從 main 函數做爲源頭被調用的函數均可以收到 done 的信號,每一個階段都可以正常退出。 使用 done 對main函數重構之後,代碼以下:

func main() {
    // 設置一個 全局共享的 done channel,
    // 當流水線退出時,關閉 done channel
    // 全部 goroutine接收到 done 的信號後,
    // 都會正常退出。
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // 將 sq 的工做分發給兩個goroutine
    // 這兩個 goroutine 均從 in 讀取數據
    c1 := sq(done, in)
    c2 := sq(done, in)

    // 消費 outtput 生產的第一個值
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // defer 調用時,done channel 會被關閉。
}

如今,流水線中的每一個階段都可以在 done channel 被關閉時返回。merge 函數中的 output 代碼也可以順利返回,由於它知道 done channel關閉時,上游發送者 sq 會中止發送數據。 在 defer 表達式執行結束時,全部調用鏈上的 output 都能保證 wg.Done() 被調用:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 爲 cs 的每個 channel 建立一個 goroutine
    // 這個 goroutine 運行 output,它將數據從 c
    // 拷貝到 out,直到 c 關閉,或者 接收到 done
    // 的關閉信號。人啊後調用 wg.Done()
    output := func(c <-chan int) {
        defer wg.Done()
        for n := range c {
            select {
            case out <- n:
            case <-done:
                return
            }
        }
    }
    // ... the rest is unchanged ...

一樣的原理, done channel 被關閉時,sq 也可以當即返回。在defer表達式執行結束時,全部調用鏈上的 sq 都能保證 out channel 被關閉。代碼以下:

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

這裏,咱們給出幾條構建流水線的指導:

  1. 當全部發送操做結束時,每一個階段都關閉本身的 outbound channels

  2. 每一個階段都會一直從 inbound channels 接收數據,直到這些 channels 被關閉,或發送者解除阻塞狀態。

流水線經過兩種方式解除發送者的阻塞:

  1. 提供足夠大的緩衝保存發送者發送的數據

  2. 接收者放棄 channel 時,顯式地通知發送者。

結論

本文介紹了Go 語言中構建數據流水線的一些技巧。流水線的錯誤處理比較複雜,流水線的每一個階段均可能阻塞向下遊發送數據,
下游階段也可能再也不關注上游發送的數據。上面咱們介紹了經過關閉一個channel,向流水線中的全部 goroutine 發送一個 "done" 信號;也定義了
構建流水線的正確方法。

下一篇文章,咱們將經過一個 並行 md5 的例子來講明本文所講的一些理念和技巧。

原做者 Sameer Ajmani,翻譯 Oscar

下期預告:Go語言併發模型:以並行md5計算爲例。英文原文連接

相關連接

  1. 原文連接:https://blog.golang.org/pipel...

  2. Go併發模型:http://talks.golang.org/2012/...

  3. Go高級併發模型:http://blog.golang.org/advanc...

掃碼關注微信公衆號「深刻Go語言」

在這裏

相關文章
相關標籤/搜索