golang 管道

2.管道

簡介

Golang的原子併發特性使得它很容易構造流數據管道,這使得Golang可有效的使用I/O和多CPU特性。本文提出一些關於管道的示

例,在這個過程當中突出了操做失敗的微妙之處和介紹處理失敗的具體技術。

什麼是管道

在Golang對於管道沒有明確的定義;它只是許多種併發程序中的一種。管道是通道鏈接的一系列階段, 每一個階段是一組

goroutine運行相同的功能。在每一個階段,goroutine運行步驟爲:

    從上游通過入境通道接受值

    對數據執行一些功能操做,一般會產生新的值

    從下游通過出境通道發送值

除了開始和最後階段只有一個入境通道或者一個出境通道外,其餘每一個階段有任意數量的入境通道和出境通道,。開始階段有時

又稱爲源或者生產者;最後一個階段又稱爲sink或者消費者。

咱們將開始一個簡單的示例來解釋管道的思想和技術。稍後,咱們將展現更多相關的例子。


平方數

一個通道有三個階段。

第一階段:gen,以從列表讀出整數的方式轉換整數列表到一個通道。gen函數開始goroutine後, 在通道上發送整數而且在在所

有的值被髮送完後將通道關閉:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二階段:sq,從通道接受整數,而後將接受到的每一個整數值的平方後返回到一個通道 。在入境通道關閉和發送全部下行值的階

段結束後,關閉出口通道:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

main函數創建了通道並運行最後一個階段:它接受來自第二階段的值並打印出每一個值,直到通道關閉:

func main() {
    // Set up the pipeline.
    c := gen(2, 3)
    out := sq(c)

    // Consume the output.
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

因爲sq有相同類型的入境和出境通道,咱們能夠寫任意次。咱們也能夠重寫main函數,像其餘階段同樣作一系列循環 :

func main() {
    // Set up the pipeline and consume the output.
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 then 81
    }
}


扇出,扇入

扇出(fan-out):多個函數能從相同的通道中讀數據,直到通道關閉;這提供了一種在一組「人員」中分發任務的方式,使得

CPU和I/O的並行處理.

扇入(fan-in):一個函數能從多個輸入中讀取並處理數據,而這多個輸入通道映射到一個單通道,該單通道隨着全部輸入的結

束而關閉。

咱們能夠改變通道去運行兩個sq實例,每一個實例從相同的輸入通道讀取數據。咱們引入了一個新函數merge去扇入結果:

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the merged output from c1 and c2.
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 then 9, or 9 then 4
    }
}

merge函數經過爲每個入境通道開啓一個goroutine去複製數值到惟一的出境通道,從而實現了轉換通道列表到一個單通道 。一

旦全部的output goroutine啓動,全部在通道上的發送完成後merge函數開啓一個以上的goroutine用於關閉出境通道。

在一個關閉的通道上發送沒有意義,因此在關閉以前確保全部的發送完成是重要的。sync.WaitGroup類型提供了一個簡單的方法

去組織同步:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed, then calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // Start a goroutine to close out once all the output goroutines are
    // done.  This must start after the wg.Add call.
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}


短暫中止

管道函數模型:

    當全部的發送操做結束後, 階段關閉他們的出境通道。

    階段持續接收來自入境通道的值,直到那些通道關閉。

這個模型容許每個接收階段經過range循環的寫數據,確保一旦全部向下遊發送的值發送成功,全部的goroutine退出。

但在一個真實的管道上,階段並不老是接收全部的入境值。有時設計是這樣的:接收者可能只須要一個子集值就能取得進展。更

多時候是一個階段早早的退出,由於一個入境值表明一個早期階段的錯誤。 在這兩種狀況下接收者不該該等待剩餘的值到達,我

們想要早期階段中止產生後續階段不須要的值。


在咱們的示例中,若是一個階段不能處理全部的入境值,那麼試圖發送這些值得goroutine將無限期的阻塞:

    // Consume the first value from output.
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 or 9
    return
    // Since we didn't receive the second value from out,
    // one of the output goroutines is hung attempting to send it.
}

這是一個資源鎖:goroutine消耗內存和運行資源,而且在goroutine棧中的堆引用防止數據被回收。Goroutine不能垃圾回收;它

們必須本身退出。

當下遊階段在接收全部的入境值失敗後,咱們須要安排管道的上游階段退出。一種實現方法是將出境通道改成一個緩衝區。該緩

衝區能保存固定數量的值;若是緩衝區有空閒就當即發送操做完成信號:

c := make(chan int, 2) // buffer size 2
c <- 1  // succeeds immediately
c <- 2  // succeeds immediately
c <- 3  // blocks until another goroutine does <-c and receives 1



當在通道建立就預先知道待發送的數值個數時,經過使用緩衝區能夠簡化代碼。例如,咱們能夠重寫  gen 來將整數列表複製到

帶有緩衝區的通道中,也能夠避免建立一個新的 goroutine:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

回到管道中處於阻塞狀態的 goroutine,咱們能夠考慮爲 merge 返回的出境通道增長一個緩衝區:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // enough space for the unread inputs
    // ... the rest is unchanged ...

儘管它修正了程序中 goroutine 的阻塞問題,但卻不能稱爲好代碼。在這裏,緩衝區大小選取爲 1,取決於預知 merge 將會接

收的數值個數及下游各階段將會消費的數值個數。這很脆弱:若是咱們給 gen 多傳了一個數值,或者下游階段少讀了一些數值,

goroute 的阻塞問題會再次出現。

做爲代替,咱們須要爲下游各階段提供一種手段,來向發送方代表指明它們將中止接收數據的輸入。

顯式取消

當main沒有接受完out全部的值就決定退出時,它必須告知上游狀態(upstream stage)的goroutines,讓它丟棄正在發送中的數據

。經過在一個叫作done的channel上發送數據,便可實現。例子裏有兩個受阻的發送方,因此發送的值有兩組:

func main() {
    in := gen(2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(in)
    c2 := sq(in)

    // Consume the first value from output.
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // Tell the remaining senders we're leaving.
    done <- struct{}{}
    done <- struct{}{}
}

使用select語句,讓發送中的goroutines取代了發送操做。這條語句既能夠處理在發送out的情形,也能夠處理從done中接受一個

值的狀況。done的值類型是空結構,由於它的數值並不重要:它是一個接受事件,代表out的發送應該被丟棄。output goroutines

繼續在channel c內循環運行,而不會阻塞上游狀態(upstream stage):

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // Start an output goroutine for each input channel in cs.  output
    // copies values from c to out until c is closed or it receives a value
    // from done, then output calls wg.Done.
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... the rest is unchanged ...

可是這種方法有個問題:下游的接收者須要知道潛在會被阻塞的上游發送者的數量。追蹤這些數量不只枯燥,還容易出錯。


咱們要有一個方法告知一個未知的、無限數量的go程序向下遊發送它們的值。在GO裏面咱們經過關閉一個通道來實現,由於一個

在已關閉通道上的接收操做總能當即執行,並返回該元素類型的零值。

這意味着main函數只需關閉「done」通道就能開啓全部發送者。close其實是傳給發送者的一個廣播信號。咱們擴展每個管道

函數接收「done」參數並經過一個「defer」語句觸發「close」,這樣全部來自main的返回路徑都會以信號通知管道退出。

func main() {
    // Set up a done channel that's shared by the whole pipeline,
    // and close that channel when this pipeline exits, as a signal
    // for all the goroutines we started to exit.
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // Distribute the sq work across two goroutines that both read from in.
    c1 := sq(done, in)
    c2 := sq(done, in)

    // Consume the first value from output.
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 or 9

    // done will be closed by the deferred call.
}


管道里的每一個狀態如今均可以隨意的提前退出了:sq能夠在它的循環中退出,由於咱們知道若是done已經被關閉了,也會關閉上

遊的gen狀態。sq經過defer語句,保證無論從哪一個返回路徑,它的outchannel 都會被關閉。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

下面列出了構建管道的指南:

    狀態會在全部發送操做作完後,關閉它們的流出 channel

    狀態會持續接收從流入 channel 輸入的數值,直到 channel 關閉或者其發送者被釋放。

管道要麼保證足夠能存下全部發送數據的緩衝區,要麼接收來自接收者明確的要放棄 channel 的信號,來保證釋放發送者。



讓咱們考慮一個更真實的管道狀況。

MD5摘要生成算法在校驗文件時很是有用。命令行工具md5sum會生成文件的摘要的列表。

% md5sum *.go
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

咱們的樣例程序就像md5sum差很少,不同的是它會以文件夾做爲參數,並打印每一個文件夾內文件的索引值,按路徑名排序。

% go run serial.go .
d47c2bbc28298ca9befdfbc5d3aa4e65  bounded.go
ee869afd31f83cbb2d10ee81b2b831dc  parallel.go
b88175e65fdcbc01ac08aaf1fd9b5e96  serial.go

程序的主函數調用輔助函數MD5ALL,它會返回一個map,將路徑名對應到索引值,以後排序並打印結果。

func main() {
    // Calculate the MD5 sum of all files under the specified directory,
    // then print the results sorted by path name.
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5ALL函數是咱們要討論的。在serial.go中,它的實現沒有使用併發,只是簡單地遍歷文件樹,讀取文件並生成摘要。

// MD5All reads all the files in the file tree rooted at root and returns a map
// from file path to the MD5 sum of the file's contents.  If the directory walk
// fails or any read operation fails, MD5All returns an error.
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if info.IsDir() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}



I在parallel.go裏,咱們把MD5All分解爲兩個狀態的管道。第一個狀態,sumFiles,遍歷目錄,在一個新的 Goroutine 裏對每一個

文件作摘要,並把結果發送到類型爲result的 channel:

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles返回兩個 channel:一個用來傳遞result,另外一個用來返回filepath.Walk的錯誤。遍歷函數啓動一個新的 Goroutine

來處理每一個常規文件,以後檢查done。若是done已經被關閉了,遍歷就馬上中止:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // For each regular file, start a goroutine that sums the file and sends
    // the result on c.  Send the result of the walk on errc.
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if info.IsDir() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // Abort the walk if done is closed.
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // Walk has returned, so all calls to wg.Add are done.  Start a
        // goroutine to close c once all the sends are done.
        go func() {
            wg.Wait()
            close(c)
        }()
        // No select needed here, since errc is buffered.
        errc <- err
    }()
    return c, errc
}

MD5All從c接收全部的摘要值。MD5All返回早先的錯誤,經過defer關閉done:

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All closes the done channel when it returns; it may do so before
    // receiving all the values from c and errc.
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}


受限的併發

在parallel.go裏實現的MD5All對每一個文件啓動一個新的 Goroutine。若是目錄裏含有不少大文件,這可能會致使申請大量內存,

超出機器上的可用內存。

咱們能夠經過控制並行讀取的文件數量來限制內存的申請。在bounded.go,咱們建立固定數量的用於讀取文件的 Goroutine,來

限制內存使用。如今整個管道有三個狀態:遍歷樹,讀取並對文件作摘要,收集摘要值。

第一個狀態,walkFiles,發送樹裏的每一個常規文件的路徑:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // Close the paths channel after Walk returns.
        defer close(paths)
        // No select needed for this send, since errc is buffered.
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if info.IsDir() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中間的狀態啓動固定數量的digesterGoroutine,從paths接收文件名,並將結果result發送到 channelc:

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}


不像以前的例子,digester並不關閉輸出 channel,由於多個 Goroutine 會發送到共享的 channel。另外一邊,MD5All中的代碼會

在全部digester完成後關閉 channel:

// Start a fixed number of goroutines to read and digest files.
c := make(chan result)
var wg sync.WaitGroup
const numDigesters = 20
wg.Add(numDigesters)
for i := 0; i < numDigesters; i++ {
    go func() {
        digester(done, paths, c)
        wg.Done()
    }()
}
go func() {
    wg.Wait()
    close(c)
}()

咱們也可讓每一個digester建立並返回本身的輸出 channel,可是這就須要一個單獨的 Goroutine 來扇入全部結果。

最終從c收集到全部結果result,並檢查從errc傳入的錯誤。這個錯誤的檢查不能提前,由於在這個時間點以前,walkFiles可能

會由於正在發送消息給下游而阻塞:

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // Check whether the Walk failed.
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}



這篇文章展現了使用Go構建流數據管道的技術。要慎重處理這種管道產生的錯誤,由於管道里的每一個狀態均可能由於向下遊發送

數值而阻塞,而下游的狀態卻再也不關心輸入的數據。咱們展現瞭如何將關閉channel做爲「完成」信號廣播給全部由管道啓動的

Goroutine,而且定義了正確構建管道的指南。

進一步閱讀:

Go併發模式(視頻)展現了Go的併發特性的基礎知識,並演示了應用這些知識的方法。
高級Go併發模式(視頻)覆蓋了關於Go特性更復雜的使用場景,尤爲是select。
Douglas McIlroy的論文《一窺級數數列》展現了Go使用的這類併發技術是如何優雅地支持複雜計算。算法

相關文章
相關標籤/搜索