在使用Go channel的時候,一個適用的原則是不要從接收端關閉channel,也不要在多個併發發送端中關閉channel。換句話說,若是sender(發送者)只是惟一的sender或者是channel最後一個活躍的sender,那麼你應該在sender的goroutine關閉channel,從而通知receiver(s)(接收者們)已經沒有值能夠讀了。維持這條原則將保證永遠不會發生向一個已經關閉的channel發送值或者關閉一個已經關閉的channel。
(咱們將會稱上面的原則爲channel closing principle)golang
channel closing principle要求咱們只能在發送端進行channel的關閉,對於平常遇到的能夠歸結爲三類api
一、m個receivers,一個sender.安全
二、一個receiver,n個sender併發
三、m個receivers,n個senderdom
一、m個receivers,一個senderide
M個receivers,一個sender,sender經過關閉data channel說「再也不發送」函數
這是最簡單的場景了,就只是當sender不想再發送的時候讓sender關閉data 來關閉channel:性能
1 package main 2 3 import ( 4 "time" 5 "math/rand" 6 "sync" 7 "log" 8 ) 9 10 func main() { 11 rand.Seed(time.Now().UnixNano()) 12 log.SetFlags(0) 13 14 // ... 15 const MaxRandomNumber = 100000 16 const NumReceivers = 100 17 18 wgReceivers := sync.WaitGroup{} 19 wgReceivers.Add(NumReceivers) 20 21 // ... 22 dataCh := make(chan int, 100) 23 24 // the sender 25 go func() { 26 for { 27 if value := rand.Intn(MaxRandomNumber); value == 0 { 28 // the only sender can close the channel safely. 29 close(dataCh) 30 return 31 } else { 32 dataCh <- value 33 } 34 } 35 }() 36 37 // receivers 38 for i := 0; i < NumReceivers; i++ { 39 go func() { 40 defer wgReceivers.Done() 41 42 // receive values until dataCh is closed and 43 // the value buffer queue of dataCh is empty. 44 for value := range dataCh { 45 log.Println(value) 46 } 47 }() 48 } 49 50 wgReceivers.Wait() 51 }
二、一個receiver,n個sendersspa
一個receiver,N個sender,receiver經過關閉一個額外的signal channel說「請中止發送」
這種場景比上一個要複雜一點。咱們不能讓receiver關閉data channel,由於這麼作將會打破channel closing principle。可是咱們可讓receiver關閉一個額外的signal channel來通知sender中止發送值:code
1 package main 2 3 import ( 4 "time" 5 "math/rand" 6 "sync" 7 "log" 8 ) 9 10 func main() { 11 rand.Seed(time.Now().UnixNano()) 12 log.SetFlags(0) 13 14 // ... 15 const MaxRandomNumber = 100000 16 const NumSenders = 1000 17 18 wgReceivers := sync.WaitGroup{} 19 wgReceivers.Add(1) 20 21 // ... 22 dataCh := make(chan int, 100) 23 stopCh := make(chan struct{}) 24 // stopCh is an additional signal channel. 25 // Its sender is the receiver of channel dataCh. 26 // Its reveivers are the senders of channel dataCh. 27 28 // senders 29 for i := 0; i < NumSenders; i++ { 30 go func() { 31 for { 32 value := rand.Intn(MaxRandomNumber) 33 34 select { 35 case <- stopCh: 36 return 37 case dataCh <- value: 38 } 39 } 40 }() 41 } 42 43 // the receiver 44 go func() { 45 defer wgReceivers.Done() 46 47 for value := range dataCh { 48 if value == MaxRandomNumber-1 { 49 // the receiver of the dataCh channel is 50 // also the sender of the stopCh cahnnel. 51 // It is safe to close the stop channel here. 52 close(stopCh) 53 return 54 } 55 56 log.Println(value) 57 } 58 }() 59 60 // ... 61 wgReceivers.Wait() 62 }
三、m個receivers,n個sender
M個receiver,N個sender,它們當中任意一個經過通知一個moderator(仲裁者)關閉額外的signal channel來講「讓咱們結束遊戲吧」
這是最複雜的場景了。咱們不能讓任意的receivers和senders關閉data channel,也不能讓任何一個receivers經過關閉一個額外的signal channel來通知全部的senders和receivers退出遊戲。這麼作的話會打破channel closing principle。可是,咱們能夠引入一個moderator來關閉一個額外的signal channel。這個例子的一個技巧是怎麼通知moderator去關閉額外的signal channel:
1 package main 2 3 import ( 4 "time" 5 "math/rand" 6 "sync" 7 "log" 8 "strconv" 9 ) 10 11 func main() { 12 rand.Seed(time.Now().UnixNano()) 13 log.SetFlags(0) 14 15 // ... 16 const MaxRandomNumber = 100000 17 const NumReceivers = 10 18 const NumSenders = 1000 19 20 wgReceivers := sync.WaitGroup{} 21 wgReceivers.Add(NumReceivers) 22 23 // ... 24 dataCh := make(chan int, 100) 25 stopCh := make(chan struct{}) 26 // stopCh is an additional signal channel. 27 // Its sender is the moderator goroutine shown below. 28 // Its reveivers are all senders and receivers of dataCh. 29 toStop := make(chan string, 1) 30 // the channel toStop is used to notify the moderator 31 // to close the additional signal channel (stopCh). 32 // Its senders are any senders and receivers of dataCh. 33 // Its reveiver is the moderator goroutine shown below. 34 35 var stoppedBy string 36 37 // moderator 38 go func() { 39 stoppedBy = <- toStop // part of the trick used to notify the moderator 40 // to close the additional signal channel. 41 close(stopCh) 42 }() 43 44 // senders 45 for i := 0; i < NumSenders; i++ { 46 go func(id string) { 47 for { 48 value := rand.Intn(MaxRandomNumber) 49 if value == 0 { 50 // here, a trick is used to notify the moderator 51 // to close the additional signal channel. 52 select { 53 case toStop <- "sender#" + id: 54 default: 55 } 56 return 57 } 58 59 // the first select here is to try to exit the 60 // goroutine as early as possible. 61 select { 62 case <- stopCh: 63 return 64 default: 65 } 66 67 select { 68 case <- stopCh: 69 return 70 case dataCh <- value: 71 } 72 } 73 }(strconv.Itoa(i)) 74 } 75 76 // receivers 77 for i := 0; i < NumReceivers; i++ { 78 go func(id string) { 79 defer wgReceivers.Done() 80 81 for { 82 // same as senders, the first select here is to 83 // try to exit the goroutine as early as possible. 84 select { 85 case <- stopCh: 86 return 87 default: 88 } 89 90 select { 91 case <- stopCh: 92 return 93 case value := <-dataCh: 94 if value == MaxRandomNumber-1 { 95 // the same trick is used to notify the moderator 96 // to close the additional signal channel. 97 select { 98 case toStop <- "receiver#" + id: 99 default: 100 } 101 return 102 } 103 104 log.Println(value) 105 } 106 } 107 }(strconv.Itoa(i)) 108 } 109 110 // ... 111 wgReceivers.Wait() 112 log.Println("stopped by", stoppedBy) 113 }
有沒有一個內置函數能夠檢查一個channel是否已經關閉。若是你能肯定不會向channel發送任何值,那麼也確實須要一個簡單的方法來檢查channel是否已經關閉:
1 package main 2 3 import "fmt" 4 5 type T int 6 7 func IsClosed(ch <-chan T) bool { 8 select { 9 case <-ch: 10 return true 11 default: 12 } 13 14 return false 15 } 16 17 func main() { 18 c := make(chan T) 19 fmt.Println(IsClosed(c)) // false 20 close(c) 21 fmt.Println(IsClosed(c)) // true 22 }
上面已經提到了,沒有一種適用的方式來檢查channel是否已經關閉了。可是,就算有一個簡單的 closed(chan T) bool
函數來檢查channel是否已經關閉,它的用處仍是頗有限的,就像內置的len
函數用來檢查緩衝channel中元素數量同樣。緣由就在於,已經檢查過的channel的狀態有可能在調用了相似的方法返回以後就修改了,所以返回來的值已經不可以反映剛纔檢查的channel的當前狀態了。
儘管在調用closed(ch)
返回true
的狀況下中止向channel發送值是能夠的,可是若是調用closed(ch)
返回false
,那麼關閉channel或者繼續向channel發送值就不安全了(會panic)。
在使用Go channel的時候,一個適用的原則是不要從接收端關閉channel,也不要在多個併發發送端中關閉channel。換句話說,若是sender(發送者)只是惟一的sender或者是channel最後一個活躍的sender,那麼你應該在sender的goroutine關閉channel,從而通知receiver(s)(接收者們)已經沒有值能夠讀了。維持這條原則將保證永遠不會發生向一個已經關閉的channel發送值或者關閉一個已經關閉的channel。
(下面,咱們將會稱上面的原則爲channel closing principle
若是你由於某種緣由從接收端(receiver side)關閉channel或者在多個發送者中的一個關閉channel,那麼你應該使用列在Golang panic/recover Use Cases的函數來安全地發送值到channel中(假設channel的元素類型是T)
1 func SafeSend(ch chan T, value T) (closed bool) { 2 defer func() { 3 if recover() != nil { 4 // the return result can be altered 5 // in a defer function call 6 closed = true 7 } 8 }() 9 10 ch <- value // panic if ch is closed 11 return false // <=> closed = false; return 12 }
若是channel ch
沒有被關閉的話,那麼這個函數的性能將和ch <- value
接近。對於channel關閉的時候,SafeSend
函數只會在每一個sender goroutine中調用一次,所以程序不會有太大的性能損失。
一樣的想法也能夠用在從多個goroutine關閉channel中:
1 func SafeClose(ch chan T) (justClosed bool) { 2 defer func() { 3 if recover() != nil { 4 justClosed = false 5 } 6 }() 7 8 // assume ch != nil here. 9 close(ch) // panic if ch is closed 10 return true 11 }
不少人喜歡用sync.Once
來關閉channel:
1 type MyChannel struct { 2 C chan T 3 once sync.Once 4 } 5 6 func NewMyChannel() *MyChannel { 7 return &MyChannel{C: make(chan T)} 8 } 9 10 func (mc *MyChannel) SafeClose() { 11 mc.once.Do(func(){ 12 close(mc.C) 13 }) 14 }
固然了,咱們也能夠用sync.Mutex
來避免屢次關閉channel:
1 type MyChannel struct { 2 C chan T 3 closed bool 4 mutex sync.Mutex 5 } 6 7 func NewMyChannel() *MyChannel { 8 return &MyChannel{C: make(chan T)} 9 } 10 11 func (mc *MyChannel) SafeClose() { 12 mc.mutex.Lock() 13 if !mc.closed { 14 close(mc.C) 15 mc.closed = true 16 } 17 mc.mutex.Unlock() 18 } 19 20 func (mc *MyChannel) IsClosed() bool { 21 mc.mutex.Lock() 22 defer mc.mutex.Unlock() 23 return mc.closed 24 }
咱們應該要理解爲何Go不支持內置SafeSend
和SafeClose
函數,緣由就在於並不推薦從接收端或者多個併發發送端關閉channel。Golang甚至禁止關閉只接收(receive-only)的channel。