除了在goroutine之間安全的傳遞數據以外,在看了《Concurrency in Go》以後,感慨channel還有那麼多模式可供使用,在我的的學習中總結了如下幾種經常使用的模式sql
咱們以爬蟲爲例,通常爬蟲分爲以下步驟:安全
抓取頁面 -> 解析頁面 -> 整合數據分析 -> 分析結果入庫函數
若是你把上面全部的步驟都放在一個函數裏面處理,那會是多難看,多難以維護,從解耦角度考慮,咱們能夠起四個進程,分別承擔不一樣的角色,例如,進程1負責抓取頁面, 進程2負責解析頁面,等等,各個進程拿到一個數據後,交給下一個進程來處理,這就是pipeline的基本思想,每一個角色只負責關心本身的東西學習
給定一個數n,執行 (n2 + 1) 2的操做code
func pipeline() { generator := func(done chan interface{}, intergers ...int) <-chan int { inStream := make(chan int) go func() { defer close(inStream) for _, i := range intergers { select { case <-done: return case inStream <- i: } } }() return inStream } add := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int { addInStream := make(chan int) go func() { defer close(addInStream) for i := range inStream { select { case <-done: return case addInStream <- i + increment: } } }() return addInStream } multiply := func(done <-chan interface{}, inStream <-chan int, increment int) <-chan int { multiplyInStream := make(chan int) go func() { defer close(multiplyInStream) for i := range inStream { select { case <-done: return case multiplyInStream <- i * increment: } } }() return multiplyInStream } done := make(chan interface{}) defer close(done) inStream := generator(done, []int{1, 2, 3, 4, 5, 6, 7}...) pipeline := multiply(done, add(done, multiply(done, inStream, 2), 1), 2) for v := range pipeline { fmt.Println(v) } }
在pipeline模型中,是一種高效的流式處理,可是假如pipeline中有a,b,c三個環節,b環節處理的特別慢,這時候就會影響到c環節的處理,若是增長b環節進程處理的數量,也就能夠減弱b環節的慢處理對整個pipeline的影響,那麼a->多個b的過程就是 扇入, 多個b環節輸出數據到c環節,就是扇出進程
func FanInFanOut() { producer := func(intergers ...int) <-chan interface{} { inStream := make(chan interface{}) go func() { defer close(inStream) for _, v := range intergers { time.Sleep(5 * time.Second) inStream <- v } }() return inStream } fanIn := func(channels ...<-chan interface{}, ) <-chan interface{} { var wg sync.WaitGroup multiplexStream := make(chan interface{}) multiplex := func(c <-chan interface{}) { defer wg.Done() for i := range c { multiplexStream <- i } } wg.Add(len(channels)) for _, c := range channels { go multiplex(c) } go func() { wg.Wait() close(multiplexStream) }() return multiplexStream } consumer := func(inStream <-chan interface{}) { for v := range inStream { fmt.Println(v) } } nums := runtime.NumCPU() producerStreams := make([]<-chan interface{}, nums) for i := 0; i < nums; i++ { producerStreams[i] = producer(i) } consumer(fanIn(producerStreams...)) }
假如你從channel中拿到了一條sql語句,這時候,你想對這條sql記錄,分析並執行,那你就須要將這條sql分別轉發給這三個任務對應的channel,tee-channel 就是作這個事情的ip
func teeChannel() { producer := func(intergers ...int) <-chan interface{} { inStream := make(chan interface{}) go func() { defer close(inStream) for _, v := range intergers { inStream <- v } }() return inStream } tee := func(in <-chan interface{}) (_, _ <-chan interface{}) { out1 := make(chan interface{}) out2 := make(chan interface{}) go func() { defer close(out1) defer close(out2) for val := range in { out1, out2 := out1, out2 for i := 0; i < 2; i++ { select { case out1 <- val: out1 = nil case out2 <- val: out2 = nil } } } }() return out1, out2 } out1, out2 := tee(producer(1, 2, 3, 4, 5)) for val1 := range out1 { fmt.Printf("out1: %v, out2: %v", val1, <-out2) } }
不管是前面提到的pipeline仍是扇入扇出,每一個goroutine都是對一個channel進行消費,可是實際場景中,可能會有多個channel來供給咱們消費,而做爲消費者,咱們不關心這些值是來自於哪一個channel,這種狀況下,處理一個充滿channel的channel可能會不少。若是咱們定義一個功能,能夠將充滿channel的channel拆解爲一個簡單的channel,這將使消費者更專一於手頭的工做,這就是橋接channel的思想rem
func bridge() { gen := func() <-chan <-chan interface{} { in := make(chan (<-chan interface{})) go func() { defer close(in) for i := 0; i < 10; i++ { stream := make(chan interface{}, 1) stream <- i close(stream) in <- stream } }() return in } bridge := func(in <-chan (<-chan interface{})) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { stream := make(<-chan interface{}) select { case maybeStream, ok := <-in: if ok == false { return } stream = maybeStream } for val := range stream { valStream <- val } } }() return valStream } for val := range bridge(gen()) { fmt.Println(val) } }