Channel是Go中的一種類型,和goroutine一塊兒爲Go提供了併發技術, 它在開發中獲得了普遍的應用。Go鼓勵人們經過Channel在goroutine之間傳遞數據的引用(就像把數據的owner從一個goroutine傳遞給另一個goroutine), Effective Go總結了這麼一句話:html
Do not communicate by sharing memory; instead, share memory by communicating.git
在 Go內存模型指出了channel做爲併發控制的一個特性:github
A send on a channel happens before the corresponding receive from that channel completes. (Golang Spec)golang
除了正常的在goroutine之間安全地傳遞共享數據, Channel還能夠玩出不少的花樣(模式), 本文列舉了一些channel的應用模式。apache
促成本文誕生的因素主要包括:編程
下面就讓咱們以實例的方式看看這麼模式吧。數組
咱們知道, Go的標準庫sync
有Mutex
,能夠用來做爲鎖,可是Mutex
卻沒有實現TryLock
方法。緩存
咱們對於TryLock
的定義是當前goroutine嘗試得到鎖, 若是成功,則得到了鎖,返回true, 不然返回false。咱們可使用這個方法避免在獲取鎖的時候當前goroutine被阻塞住。安全
原本,這是一個經常使用的功能,在一些其它編程語言中都有實現,爲何Go中沒有實現的?issue#6123有詳細的討論,在我看來,Go核心組成員自己對這個特性沒有積極性,而且認爲經過channel能夠實現相同的方式。併發
其實,對於標準庫的sync.Mutex
要增長這個功能很簡單,下面的方式就是經過hack
的方式爲Mutex
實現了TryLock
的功能。
const mutexLocked = 1 << iota type Mutex struct { mu sync.Mutex } func (m *Mutex) Lock() { m.mu.Lock() } func (m *Mutex) Unlock() { m.mu.Unlock() } func (m *Mutex) TryLock() bool { return atomic.CompareAndSwapInt32((*int32)(unsafe.Pointer(&m.mu)), 0, mutexLocked) } func (m *Mutex) IsLocked() bool { return atomic.LoadInt32((*int32)(unsafe.Pointer(&m.mu))) == mutexLocked }
上面的代碼還額外增長了一個IsLocked
方法,不過這個方法通常不經常使用,由於查詢和加鎖這兩個方法執行的時候不是一個原子的操做,素以這個方法通常在調試和打日誌的時候可能有用。若是你看一下Mutex
實現的源代碼,就很容易理解上面的這段代碼了,由於mutex
實現鎖主要利用CAS
對它的一個int32字段作操做。
既然標準庫中不許備在Mutex
上增長這個方法,而是推薦使用channel來實現,那麼就讓咱們看看如何使用 channel來實現。
type Mutex struct { ch chan struct{} } func NewMutex() *Mutex { mu := &Mutex{make(chan struct{}, 1)} mu.ch <- struct{}{} return mu } func (m *Mutex) Lock() { <-m.ch } func (m *Mutex) Unlock() { select { case m.ch <- struct{}{}: default: panic("unlock of unlocked mutex") } } func (m *Mutex) TryLock() bool { select { case <-m.ch: return true default: } return false } func (m *Mutex) IsLocked() bool { return len(m.ch) > 0 }
你還能夠將緩存的大小從1改成n,用來處理n個鎖(資源)。主要是利用channel邊界狀況下的阻塞特性實現的。
有時候,咱們在獲取一把鎖的時候,因爲有競爭的關係,在鎖被別的goroutine擁有的時候,當前goroutine沒有辦法當即得到鎖,只能阻塞等待。標準庫並無提供等待超時的功能,咱們嘗試實現它。
type Mutex struct { ch chan struct{} } func NewMutex() *Mutex { mu := &Mutex{make(chan struct{}, 1)} mu.ch <- struct{}{} return mu } func (m *Mutex) Lock() { <-m.ch } func (m *Mutex) Unlock() { select { case m.ch <- struct{}{}: default: panic("unlock of unlocked mutex") } } func (m *Mutex) TryLock(timeout time.Duration) bool { timer := time.NewTimer(timeout) select { case <-m.ch: timer.Stop() return true case <-time.After(timeout): } return false } func (m *Mutex) IsLocked() bool { return len(m.ch) > 0 }
Or Channel 模式
你也能夠把它用Context
來改造,不是利用超時,而是利用Context
來取消/超時得到鎖的操做,這個做業留給讀者來實現。
當你等待多個信號的時候,若是收到任意一個信號, 就執行業務邏輯,忽略其它的還未收到的信號。
舉個例子, 咱們往提供相同服務的n個節點發送請求,只要任意一個服務節點返回結果,咱們就能夠執行下面的業務邏輯,其它n-1的節點的請求能夠被取消或者忽略。當n=2的時候,這就是back request
模式。 這樣能夠用資源來換取latency的提高。
須要注意的是,當收到任意一個信號的時候,其它信號都被忽略。若是用channel來實現,只要從任意一個channel中接收到一個數據,那麼全部的channel均可以被關閉了(依照你的實現,可是輸出的channel確定會被關閉)。
有三種實現的方式: goroutine、reflect和遞歸。
func or(chans ...<-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { var once sync.Once for _, c := range chans { go func(c <-chan interface{}) { select { case <-c: once.Do(func() { close(out) }) case <-out: } }(c) } }() return out }
爲了不併發關閉輸出channel的問題,關閉操做只執行一次。or
函數能夠處理n個channel,它爲每一個channel啓動一個goroutine,只要任意一個goroutine從channel讀取到數據,輸出的channel就被關閉掉了。
Go的反射庫針對select語句有專門的數據(reflect.SelectCase
)和函數(reflect.Select
)處理。
因此咱們能夠利用反射「隨機」地從一組可選的channel中接收數據,並關閉輸出channel。
這種方式看起來更簡潔。
func or(channels ...<-chan interface{}) <-chan interface{} { switch len(channels) { case 0: return nil case 1: return channels[0] } orDone := make(chan interface{}) go func() { defer close(orDone) var cases []reflect.SelectCase for _, c := range channels { cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), }) } reflect.Select(cases) }() return orDone }
func or(channels ...<-chan interface{}) <-chan interface{} { switch len(channels) { case 0: return nil case 1: return channels[0] } orDone := make(chan interface{}) go func() { defer close(orDone) switch len(channels) { case 2: select { case <-channels[0]: case <-channels[1]: } default: m := len(channels) / 2 select { case <-or(channels[:m]...): case <-or(channels[m:]...): } } }() return orDone }
Or-Done-Channel模式
在後面的扇入(合併)模式中,咱們仍是會使用相一樣的遞歸模式來合併多個輸入channel,根據 justforfun 的測試結果,這種遞歸的方式要比goroutine、Reflect更有效。
這種模式是咱們常用的一種模式,經過一個信號channel(done)來控制(取消)輸入channel的處理。
一旦從done channel中讀取到一個信號,或者done channel被關閉, 輸入channel的處理則被取消。
這個模式提供一個簡便的方法,把done channel 和 輸入 channel 融合成一個輸出channel。
func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream }
每一個channel起一個goroutine。
func fanIn(chans ...<-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { var wg sync.WaitGroup wg.Add(len(chans)) for _, c := range chans { go func(c <-chan interface{}) { for v := range c { out <- v } wg.Done() }(c) } wg.Wait() close(out) }() return out
下面這種實現方式其實仍是有些問題的, 在輸入channel讀取比較均勻的時候比較有效,不然性能比較低下。
func fanInReflect(chans ...<-chan interface{}) <-chan interface{} { out := make(chan interface{}) go func() { defer close(out) var cases []reflect.SelectCase for _, c := range chans { cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(c), }) } for len(cases) > 0 { i, v, ok := reflect.Select(cases) if !ok { //remove this case cases = append(cases[:i], cases[i+1:]...) continue } out <- v.Interface() } }() return out }
func fanInRec(chans ...<-chan interface{}) <-chan interface{} { switch len(chans) { case 0: c := make(chan interface{}) close(c) return c case 1: return chans[0] case 2: return mergeTwo(chans[0], chans[1]) default: m := len(chans) / 2 return mergeTwo( fanInRec(chans[:m]...), fanInRec(chans[m:]...)) } } func mergeTwo(a, b <-chan interface{}) <-chan interface{} { c := make(chan interface{}) go func() { defer close(c) for a != nil || b != nil { select { case v, ok := <-a: if !ok { a = nil continue } c <- v case v, ok := <-b: if !ok { b = nil continue } c <- v } } }() return c }
扇出行爲至少能夠分爲兩種:
本節只介紹第一種狀況,下一節介紹第二種狀況
將讀取的值發送給每一個輸出channel, 異步模式可能會產生不少的goroutine。
func fanOut(ch <-chan interface{}, out []chan interface{}, async bool) { go func() { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }() for v := range ch { v := v for i := 0; i < len(out); i++ { i := i if async { go func() { out[i] <- v }() } else { out[i] <- v } } } }() }
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) { go func() { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }() cases := make([]reflect.SelectCase, len(out)) for i := range cases { cases[i].Dir = reflect.SelectSend } for v := range ch { v := v for i := range cases { cases[i].Chan = reflect.ValueOf(out[i]) cases[i].Send = reflect.ValueOf(v) } for _ = range cases { // for each channel chosen, _, _ := reflect.Select(cases) cases[chosen].Chan = reflect.ValueOf(nil) } } }() }
roundrobin的方式選擇輸出channel。
func fanOut(ch <-chan interface{}, out []chan interface{}) { go func() { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }() // roundrobin var i = 0 var n = len(out) for v := range ch { v := v out[i] <- v i = (i + 1) % n } }() }
func fanOutReflect(ch <-chan interface{}, out []chan interface{}) { go func() { defer func() { for i := 0; i < len(out); i++ { close(out[i]) } }() cases := make([]reflect.SelectCase, len(out)) for i := range cases { cases[i].Dir = reflect.SelectSend cases[i].Chan = reflect.ValueOf(out[i]) } for v := range ch { v := v for i := range cases { cases[i].Send = reflect.ValueOf(v) } _, _, _ = reflect.Select(cases) } }() }
由於go自己的channel沒法再進行擴展, eapache/channels
庫定義了本身的channel接口,並提供了與channel方便的轉換。
eapache/channels
提供了四個方法:
同時對上面的四個函數還提供了WeakXXX
的函數,輸入關閉後不會關閉輸出。
下面看看對應的函數的例子。
func testDist() { fmt.Println("dist:") a := channels.NewNativeChannel(channels.None) outputs := []channels.Channel{ channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), } channels.Distribute(a, outputs[0], outputs[1], outputs[2], outputs[3]) //channels.WeakDistribute(a, outputs[0], outputs[1], outputs[2], outputs[3]) go func() { for i := 0; i < 5; i++ { a.In() <- i } a.Close() }() for i := 0; i < 6; i++ { var v interface{} var j int select { case v = <-outputs[0].Out(): j = 0 case v = <-outputs[1].Out(): j = 1 case v = <-outputs[2].Out(): j = 2 case v = <-outputs[3].Out(): j = 3 } fmt.Printf("channel#%d: %d\n", j, v) } }
func testTee() { fmt.Println("tee:") a := channels.NewNativeChannel(channels.None) outputs := []channels.Channel{ channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), } channels.Tee(a, outputs[0], outputs[1], outputs[2], outputs[3]) //channels.WeakTee(a, outputs[0], outputs[1], outputs[2], outputs[3]) go func() { for i := 0; i < 5; i++ { a.In() <- i } a.Close() }() for i := 0; i < 20; i++ { var v interface{} var j int select { case v = <-outputs[0].Out(): j = 0 case v = <-outputs[1].Out(): j = 1 case v = <-outputs[2].Out(): j = 2 case v = <-outputs[3].Out(): j = 3 } fmt.Printf("channel#%d: %d\n", j, v) } }
func testMulti() { fmt.Println("multi:") a := channels.NewNativeChannel(channels.None) inputs := []channels.Channel{ channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), channels.NewNativeChannel(channels.None), } channels.Multiplex(a, inputs[0], inputs[1], inputs[2], inputs[3]) //channels.WeakMultiplex(a, inputs[0], inputs[1], inputs[2], inputs[3]) go func() { for i := 0; i < 5; i++ { for j := range inputs { inputs[j].In() <- i } } for i := range inputs { inputs[i].Close() } }() for v := range a.Out() { fmt.Printf("%d ", v) } }
func testPipe() { fmt.Println("pipe:") a := channels.NewNativeChannel(channels.None) b := channels.NewNativeChannel(channels.None) channels.Pipe(a, b) // channels.WeakPipe(a, b) go func() { for i := 0; i < 5; i++ { a.In() <- i } a.Close() }() for v := range b.Out() { fmt.Printf("%d ", v) } }
從channel的行爲來看,它看起來很像一個數據流,因此咱們能夠實現一些相似Scala 集合的操做。
Scala的集合類提供了豐富的操做(方法), 固然其它的一些編程語言或者框架也提供了相似的方法, 好比Apache Spark、Java Stream、ReactiveX等。
下面列出了一些方法的實現,我相信通過一些人的挖掘,相關的方法能夠變成一個很好的類庫,可是目前咱們先看一些例子。
skip函數是從一個channel中跳過開一些數據,而後纔開始讀取。
skipN跳過開始的N個數據。
func skipN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for i := 0; i < num; i++ { select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream }
func skipFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for { select { case <-done: return case v := <-valueStream: if !fn(v) { takeStream <- v } } } }() return takeStream }
func skipWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) take := false for { select { case <-done: return case v := <-valueStream: if !take { take = !fn(v) if !take { continue } } takeStream <- v } } }() return takeStream }
takeN 讀取開頭N個數據。
func takeN(done <-chan struct{}, valueStream <-chan interface{}, num int) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for i := 0; i < num; i++ { select { case <-done: return case takeStream <- <-valueStream: } } }() return takeStream }
func takeFn(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for { select { case <-done: return case v := <-valueStream: if fn(v) { takeStream <- v } } } }() return takeStream }
func takeWhile(done <-chan struct{}, valueStream <-chan interface{}, fn func(interface{}) bool) <-chan interface{} { takeStream := make(chan interface{}) go func() { defer close(takeStream) for { select { case <-done: return case v := <-valueStream: if !fn(v) { return } takeStream <- v } } }() return takeStream }
若是輸入是一個channel,channel中的數據仍是相同類型的channel, 那麼flat將返回一個輸出channel,輸出channel中的數據是輸入的各個channel中的數據。
它與扇入不一樣,扇入的輸入channel在調用的時候就是固定的,而且以數組的方式提供,而flat的輸入是一個channel,能夠運行時隨時的加入channel。
func orDone(done <-chan struct{}, c <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { select { case <-done: return case v, ok := <-c: if ok == false { return } select { case valStream <- v: case <-done: } } } }() return valStream } func flat(done <-chan struct{}, chanStream <-chan <-chan interface{}) <-chan interface{} { valStream := make(chan interface{}) go func() { defer close(valStream) for { var stream <-chan interface{} select { case maybeStream, ok := <-chanStream: if ok == false { return } stream = maybeStream case <-done: return } for val := range orDone(done, stream) { select { case valStream <- val: case <-done: } } } }() return valStream }
map將一個channel映射成另一個channel, channel的類型能夠不一樣。
func mapChan(in <-chan interface{}, fn func(interface{}) interface{}) <-chan interface{} { out := make(chan interface{}) if in == nil { close(out) return out } go func() { defer close(out) for v := range in { out <- fn(v) } }() return out }
好比你能夠處理一個公司員工工資的channel, 輸出一個扣稅以後的員工工資的channel。由於map
是go的關鍵字,因此咱們不能命名函數類型爲map
,這裏用mapChan
代替。
func reduce(in <-chan interface{}, fn func(r, v interface{}) interface{}) interface{} { if in == nil { return nil } out := <-in for v := range in { out = fn(out, v) } return out } 你能夠用`reduce`實現`sum`、`max`、`min`等聚合操做。
全部的代碼能夠在github上找到: smallnest/channels。