Go併發模式:管道和取消

原地址:http://air.googol.im/2014/03/15/go-concurrency-patterns-pipelines-and-cancellation.htmlhtml

譯自http://blog.golang.org/pipelinesgolang

這是Go官方blog的一篇文章,介紹瞭如何使用Go來編寫併發程序,並按照程序的演化順序,介紹了不一樣模式遇到的問題以及解決的問題。主要解釋了用管道模式連接不一樣的線程,以及如何在某個線程取消工做時,保證全部線程以及管道資源的正常回收。算法

Go併發模式:管道和取消c#

做者:Sameer Ajmani,blog.golang.org,寫於2014年3月13日。併發

介紹

Go自己提供的併發特性,能夠輕鬆構建用於處理流數據的管道,從而高效利用I/O和多核CPU。這篇文章就展現了這種管道的例子,並關注當操做失敗時要處理的一些細節,並介紹瞭如何干淨的處理錯誤的技巧。app

什麼是管道?

Go語言裏沒有明肯定義管道,而只是把管道看成一類併發程序。簡單來講,管道是一系列由channel聯通的狀態(stage),而每一個狀態是一組運行相同函數的Goroutine。每一個狀態上,Goroutineide

  • 經過流入(inbound)channel接收上游的數值
  • 運行一些函數來處理接收的數據,通常會產生新的數值
  • 經過流出(outbound)channel將數值發給下游

每一個語態都會有任意個流入或者流出channel,除了第一個狀態(只有流出channel)和最後一個狀態(只有流入channel)。第一個狀態有時被稱做源或者生產者;最後一個狀態有時被稱做槽(sink)或者消費者。函數

咱們先從一個簡單的管道例子開始解釋這些想法和技術。以後,咱們再來看一些更真實的例子。ui

求平方數

考慮一個管道和三個狀態。命令行

第一個狀態,gen,是一個將一系列整數一一傳入channel的函數。gen函數啓動一個Goroutine,將整數數列發送給channel,若是全部數都發送完成,關閉這個channel:

func gen(nums ...int) <-chan int {
    out := make(chan int)
    go func() {
        for _, n := range nums {
            out <- n
        }
        close(out)
    }()
    return out
}

第二個狀態,sq,從一個channel接收整數,並求整數的平方,發送給另外一個channel。當流入channel被關閉,並且狀態已經把全部數值都發送給了下游,關閉流出channel:

func sq(in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
            out <- n * n
        }
        close(out)
    }()
    return out
}

主函數創建起管道,並執行最終的狀態:從第二個狀態接收全部的數值並打印,直到channel被關閉:

func main() {
    // 創建管道
    c := gen(2, 3)
    out := sq(c)

    // 產生輸出
    fmt.Println(<-out) // 4
    fmt.Println(<-out) // 9
}

由於sq有相同類型的流入和流出channel,咱們能夠將其組合任意次。咱們也能夠將main函數寫成和其餘狀態相似的範圍循環的形式:

func main() {
    // 創建管道併產生輸出
    for n := range sq(sq(gen(2, 3))) {
        fmt.Println(n) // 16 和 81
    }
}

扇出,扇入

多個函數能夠同時從一個channel接收數據,直到channel關閉,這種狀況被稱做扇出。這是一種將工做分佈給一組工做者的方法,目的是並行使用CPU和I/O。

一個函數同時接收並處理多個channel輸入並轉化爲一個輸出channel,直到全部的輸入channel都關閉後,關閉輸出channel。這種狀況稱做扇入

咱們能夠將咱們的管道改成同時執行兩個sq實例,每一個都從一樣的輸入channel讀取數據。咱們還引入新函數,merge,來扇入全部的結果:

func main() {
    in := gen(2, 3)

    // 在兩個從in裏讀取數據的Goroutine間分配sq的工做
    c1 := sq(in)
    c2 := sq(in)

    // 輸出從c1和c2合併的數據
    for n := range merge(c1, c2) {
        fmt.Println(n) // 4 和 9, 或者 9 和 4
    }
}

merge對每一個流入channel啓動一個Goroutine,並將流入的數值複製到流出channel,由此將一組channel轉換到一個channel。一旦啓動了全部的output Goroutine,merge函數會多啓動一個Goroutine,這個Goroutine在全部的輸入channel輸入完畢後,關閉流出channel。

往一個已經關閉的channel輸出會產生異常(panic),因此必定要保證全部數據發送完成後再執行關閉。sync.WaitGroup類型提供了方便的方法,來保證這種同步:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 爲cs中每一個輸入channel啓動輸出Goroutine。output從c中複製數值,直到c被關閉
    // 以後調用wg.Done
    output := func(c <-chan int) {
        for n := range c {
            out <- n
        }
        wg.Done()
    }
    wg.Add(len(cs))
    for _, c := range cs {
        go output(c)
    }

    // 啓動一個Goroutine,當全部output Goroutine都工做完後(wg.Done),關閉out,
    // 保證只關閉一次。這個Goroutine必須在wg.Add以後啓動
    go func() {
        wg.Wait()
        close(out)
    }()
    return out
}

忽然關閉

咱們的管道函數裏有個模式:

  • 狀態會在全部發送操做作完後,關閉它們的流出channel
  • 狀態會持續接收從流入channel輸入的數值,直到channel關閉

這個模式使得每一個接收狀態能夠寫爲一個range循環,並保證全部的Goroutine在將全部的數值發送成功給下游後馬上退出。

可是實際的管道,狀態不能老是接收全部的流入數值。有時這是設計決定的:接收者可能只須要一部分數值作進一步處理。更常見的狀況是,一個狀態會因爲從早先的狀態流入的數值有誤而退出。無論哪一種狀況,接收者都不該該繼續等待剩下的數值,並且咱們但願早先的狀態能夠中止生產後續狀態不須要的數據。

在咱們的管道例子裏,若是一個狀態沒法處理全部的流入數值,試圖發送那些數值的Goroutine會被永遠阻塞住:

// 處理輸出的第一個數值
    out := merge(c1, c2)
    fmt.Println(<-out) // 4 或者 9
    return
    // 因爲咱們再也不接收從out輸出的第二個數值,其中一個輸出Goroutine會因爲試圖發送數值而掛起
}

這是資源泄漏:Goroutine會佔用內存和運行時資源,並且Goroutine棧裏的堆引用會一直持有數據,這些數據沒法被垃圾回收。Goroutine自己也沒法被垃圾回收,它們必須靠本身退出(而不是被其餘人殺死)。

即使下游的狀態沒法接收全部的流入數值,咱們依然須要讓管道里的上游狀態正常退出。一種方法是修改流出channel,使其含有緩衝區。緩衝區能夠持有固定數量的數值,當緩衝區有空間時,發送操做會馬上完成(不會產生阻塞)。

在建立channel時,若是已經知道要發送數值的數量,緩衝區能夠簡化代碼。好比,咱們可讓gen把整數列表裏的數複製進channel緩衝區,而不需使用新的Goroutine:

func gen(nums ...int) <-chan int {
    out := make(chan int, len(nums))
    for _, n := range nums {
        out <- n
    }
    close(out)
    return out
}

回到咱們管道的阻塞問題上來,咱們能夠考慮給merge的流出channel加上緩衝區:

func merge(cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int, 1) // 1個空間足夠應付未讀的輸入
    // ... 其他未變 ...

這個改動固然修正了程序中阻塞Goroutine的問題,但這不是好的代碼。緩衝區的大小爲1,依賴於咱們已經知道咱們將要merge的數值總數和下游狀態要處理的數值總數。這太脆弱了:若是咱們從gen傳入額外的數值,或者下游狀態再多讀一些數值,咱們仍將看到Goroutine被阻塞住了。

不使用緩衝區的話,咱們須要提供一種方法,讓下游狀態通知發送者,下游狀態將中止接收輸入。

明確的取消

main要在不接收全部來自out的數值前退出,就須要告訴全部上游狀態的Goroutine,放棄嘗試發送數值的行爲。這能夠經過發送數值到一個叫作done的channel來完成。例子裏有兩個潛在的會被阻塞的發送者,因此給done發送了兩個數值:

func main() {
    in := gen(2, 3)

    // 發佈sq的工做到兩個都從in裏讀取數據的Goroutine
    c1 := sq(in)
    c2 := sq(in)

    // 處理來自output的第一個數值
    done := make(chan struct{}, 2)
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 或者 9

    // 通知其餘發送者,該退出了
    done <- struct{}{}
    done <- struct{}{}
}

發送Goroutine將發送操做替換爲一個select語句,要麼把數據發送給out,要麼處理來自done的數值。done的類型是個空結構,由於具體數值並不重要:接收事件自己就指明瞭應當放棄繼續發送給out的動做。而output Goroutine會繼續循環處理流入的channel,c,而不會阻塞上游狀態:

func merge(done <-chan struct{}, cs ...<-chan int) <-chan int {
    var wg sync.WaitGroup
    out := make(chan int)

    // 爲每一個cs中的輸入channel啓動一個output Goroutine。outpu從c裏複製數值直到c被關閉
    // 或者從done裏接收到數值,以後output調用wg.Done
    output := func(c <-chan int) {
        for n := range c {
            select {
            case out <- n:
            case <-done:
            }
        }
        wg.Done()
    }
    // ... 其他的不變 ...

可是這種方法有個問題:下游的接收者須要知道潛在會被阻塞的上游發送者的數量。追蹤這些數量不只枯燥,還容易出錯。

咱們須要一種方法,讓不知道也不限制數量的Goroutine,中止往它們下游發送數據的行爲。在Go裏,咱們能夠經過關閉channel來實現這個工做,由於channel被關閉時,接收工做會馬上執行,併產生一個符合類型的0值

這就是說,main能夠容易的經過關閉donechannel來釋放全部的發送者。關閉是個高效的發送給全部發送者的廣播信號。咱們擴展管道里的每一個函數,讓其以參數方式接收done,並經過defer語句在函數退出時執行關閉操做,這樣main裏全部的退出路徑都會觸發管道里的全部狀態退出。

func main() {
    // 構建done channel,整個管道里分享done,並在管道退出時關閉這個channel
    // 以此通知全部Goroutine該推出了。
    done := make(chan struct{})
    defer close(done)

    in := gen(done, 2, 3)

    // 發佈sq的工做到兩個都從in裏讀取數據的Goroutine
    c1 := sq(done, in)
    c2 := sq(done, in)

    // 處理來自output的第一個數值
    out := merge(done, c1, c2)
    fmt.Println(<-out) // 4 或者 9

    // done會經過defer調用而關閉
}

管道里的每一個狀態如今均可以隨意的提前退出了:sq能夠在它的循環中退出,由於咱們知道若是done已經被關閉了,也會關閉上游的gen狀態。sq經過defer語句,保證無論從哪一個返回路徑,它的out channel都會被關閉。

func sq(done <-chan struct{}, in <-chan int) <-chan int {
    out := make(chan int)
    go func() {
        defer close(out)
        for n := range in {
            select {
            case out <- n * n:
            case <-done:
                return
            }
        }
    }()
    return out
}

下面列出了構建管道的指南:

  • 狀態會在全部發送操做作完後,關閉它們的流出channel
  • 狀態會持續接收從流入channel輸入的數值,直到channel關閉或者其發送者被釋放。

管道要麼保證足夠能存下全部發送數據的緩衝區,要麼接收來自接收者明確的要放棄channel的信號,來保證釋放發送者。

對目錄作摘要

來考慮一個更現實的管道。

MD5是一個摘要算法,常常在對文件的校驗的時候使用。命令行上使用md5sum來打印出一系列文件的摘要數值。

咱們的程序相似md5sum,可是參數是一個目錄,以後會打印出這個目錄下全部常規文件的摘要值,以文件路徑名排序。

咱們的主函數包含一個MD5All的輔助函數,返回一個路徑名到摘要值的映射,以後排序並打印結果:

func main() {
    // 計算指定目錄下全部文件的MD5值,以後按照目錄名排序並打印結果
    m, err := MD5All(os.Args[1])
    if err != nil {
        fmt.Println(err)
        return
    }
    var paths []string
    for path := range m {
        paths = append(paths, path)
    }
    sort.Strings(paths)
    for _, path := range paths {
        fmt.Printf("%x  %s\n", m[path], path)
    }
}

MD5All函數是咱們討論的焦點。在serial.go文件裏,是非併發的函數實現,再掃描目錄樹時簡單讀取並計算每一個文件。

// MD5All讀取文件目錄root下全部文件,並返回從文件路徑到文件內容MD5值的映射。若是掃描目錄
// 出錯或者任何操做失敗,MD5All返回失敗。
func MD5All(root string) (map[string][md5.Size]byte, error) {
    m := make(map[string][md5.Size]byte)
    err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
        if err != nil {
            return err
        }
        if info.IsDir() {
            return nil
        }
        data, err := ioutil.ReadFile(path)
        if err != nil {
            return err
        }
        m[path] = md5.Sum(data)
        return nil
    })
    if err != nil {
        return nil, err
    }
    return m, nil
}

並行摘要

parallel.go裏,咱們把MD5All分解爲兩個狀態的管道。第一個狀態,sumFiles,遍歷目錄,在一個新的Goroutine裏對每一個文件作摘要,並把結果發送到類型爲result的channel:

type result struct {
    path string
    sum  [md5.Size]byte
    err  error
}

sumFiles返回兩個channel:一個用來傳遞result,另外一個用來返回filepath.Walk的錯誤。遍歷函數啓動一個新的Goroutine來處理每一個常規文件,以後檢查done。若是done已經被關閉了,遍歷就馬上中止:

func sumFiles(done <-chan struct{}, root string) (<-chan result, <-chan error) {
    // 對每一個常規文件,啓動一個Goroutine計算文件內容併發送結果到c。發送walk的結果到errc
    c := make(chan result)
    errc := make(chan error, 1)
    go func() {
        var wg sync.WaitGroup
        err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if info.IsDir() {
                return nil
            }
            wg.Add(1)
            go func() {
                data, err := ioutil.ReadFile(path)
                select {
                case c <- result{path, md5.Sum(data), err}:
                case <-done:
                }
                wg.Done()
            }()
            // 若是done被關閉了,中止walk
            select {
            case <-done:
                return errors.New("walk canceled")
            default:
                return nil
            }
        })
        // walk已經返回,全部wg.Add的工做都作完了。開啓新進程,在全部發送完成後
        // 關閉c。
        go func() {
            wg.Wait()
            close(c)
        }()
        // 由於errc有緩衝區,因此這裏不須要select。
        errc <- err
    }()
    return c, errc
}

MD5Allc接收全部的摘要值。MD5All返回早先的錯誤,經過defer關閉done

func MD5All(root string) (map[string][md5.Size]byte, error) {
    // MD5All在返回時關閉done channel;這個可能在從c和errc收到全部的值以前被調用
    done := make(chan struct{})
    defer close(done)

    c, errc := sumFiles(done, root)

    m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

受限的併發

parallel.go裏實現的MD5All對每一個文件啓動一個新的Goroutine。若是目錄裏含有不少大文件,這可能會致使申請大量內存,超出機器上的可用內存。

咱們能夠經過控制並行讀取的文件數量來限制內存的申請。在bounded.go,咱們建立固定數量的用於讀取文件的Goroutine,來限制內存使用。如今整個管道有三個狀態:遍歷樹,讀取並對文件作摘要,收集摘要值。

第一個狀態,walkFiles,發送樹裏的每一個常規文件的路徑:

func walkFiles(done <-chan struct{}, root string) (<-chan string, <-chan error) {
    paths := make(chan string)
    errc := make(chan error, 1)
    go func() {
        // 在Walk以後關閉paths channel
        defer close(paths)
        // 由於errc有緩衝區,因此這裏不須要select。
        errc <- filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
            if err != nil {
                return err
            }
            if info.IsDir() {
                return nil
            }
            select {
            case paths <- path:
            case <-done:
                return errors.New("walk canceled")
            }
            return nil
        })
    }()
    return paths, errc
}

中間的狀態啓動固定數量的digester Goroutine,從paths接收文件名,並將結果result發送到channel c

func digester(done <-chan struct{}, paths <-chan string, c chan<- result) {
    for path := range paths {
        data, err := ioutil.ReadFile(path)
        select {
        case c <- result{path, md5.Sum(data), err}:
        case <-done:
            return
        }
    }
}

不象以前的例子,digester並不關閉輸出channel,由於多個Goroutine會發送到共享的channel。另外一邊,MD5All中的代碼會在全部digester完成後關閉channel:

// 啓動固定數量的Goroutine來讀取並對文件作摘要。
    c := make(chan result)
    var wg sync.WaitGroup
    const numDigesters = 20
    wg.Add(numDigesters)
    for i := 0; i < numDigesters; i++ {
        go func() {
            digester(done, paths, c)
            wg.Done()
        }()
    }
    go func() {
        wg.Wait()
        close(c)
    }()

咱們也可讓每一個digester建立並返回本身的輸出channel,可是這就須要一個單獨的Goroutine來扇入全部結果。

最終從c收集到全部結果result,並檢查從errc傳入的錯誤。這個錯誤的檢查不能提前,由於在這個時間點以前,walkFiles可能會由於正在發送消息給下游而阻塞:

m := make(map[string][md5.Size]byte)
    for r := range c {
        if r.err != nil {
            return nil, r.err
        }
        m[r.path] = r.sum
    }
    // 檢查Walk是否失敗
    if err := <-errc; err != nil {
        return nil, err
    }
    return m, nil
}

結論

這篇文章展現了使用Go構建流數據管道的技術。要慎重處理這種管道產生的錯誤,由於管道里的每一個狀態均可能由於向下遊發送數值而阻塞,而下游的狀態卻再也不關心輸入的數據。咱們展現瞭如何將關閉channel做爲「完成」信號廣播給全部由管道啓動的Goroutine,而且定義了正確構建管道的指南。

進一步閱讀:

Go併發模式視頻)展現了Go的併發特性的基礎知識,並演示了應用這些知識的方法。
高級Go併發模式視頻)覆蓋了關於Go特性更復雜的使用場景,尤爲是select。
Douglas McIlroy的論文《一窺級數數列》展現了Go使用的這類併發技術是如何優雅地支持複雜計算。

相關文章
相關標籤/搜索