不要在消費端關閉channel,不要在有多個並行的生產者時對channel執行關閉操做。golang
也就是說應該只在[惟一的或者最後惟一剩下]的生產者協程中關閉channel,來通知消費者已經沒有值能夠繼續讀了。只要堅持這個原則,就能夠確保向一個已經關閉的channel發送數據的狀況不可能發生。併發
若是想要在消費端關閉channel,或者在多個生產者端關閉channel,可使用recover機制來上個保險,避免程序由於panic而崩潰。dom
func SafeClose(ch chan T) (justClosed bool) {
defer func() {
if recover() != nil {
justClosed = false
}
}()
// assume ch != nil here.
close(ch) // panic if ch is closed
return true // <=> justClosed = true; return
}
複製代碼
使用這種方法明顯違背了上面的channel關閉原則,而後性能還能夠,畢竟在每一個協程只會調用一次SafeClose,性能損失很小。oop
一樣也能夠在生產消息的時候使用recover方法。性能
func SafeSend(ch chan T, value T) (closed bool) {
defer func() {
if recover() != nil {
// The return result can be altered
// in a defer function call.
closed = true
}
}()
ch <- value // panic if ch is closed
return false // <=> closed = false; return
}
複製代碼
還有很多人常常使用用sync.Once來關閉channel,這樣能夠確保只會關閉一次ui
type MyChannel struct {
C chan T
once sync.Once
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.once.Do(func() {
close(mc.C)
})
}
複製代碼
一樣咱們也可使用sync.Mutex達到一樣的目的。this
type MyChannel struct {
C chan T
closed bool
mutex sync.Mutex
}
func NewMyChannel() *MyChannel {
return &MyChannel{C: make(chan T)}
}
func (mc *MyChannel) SafeClose() {
mc.mutex.Lock()
if !mc.closed {
close(mc.C)
mc.closed = true
}
mc.mutex.Unlock()
}
func (mc *MyChannel) IsClosed() bool {
mc.mutex.Lock()
defer mc.mutex.Unlock()
return mc.closed
}
複製代碼
要知道golang的設計者不提供SafeClose或者SafeSend方法是有緣由的,他們原本就不推薦在消費端或者在併發的多個生產端關閉channel,好比關閉只讀channel在語法上就完全被禁止使用了。spa
上文的SafeSend方法一個很大的劣勢在於它不能用在select塊的case語句中。而另外一個很重要的劣勢在於像我這樣對代碼有潔癖的人來講,使用panic/recover和sync/mutex來搞定不是那麼的優雅。下面咱們引入在不一樣的場景下可使用的純粹的優雅的解決方法。設計
多個消費者,單個生產者。這種狀況最簡單,直接讓生產者關閉channel好了。code
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
// the sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
// The only sender can close the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// Receive values until dataCh is closed and
// the value buffer queue of dataCh is empty.
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
複製代碼
多個生產者,單個消費者。這種狀況要比上面的複雜一點。咱們不能在消費端關閉channel,由於這違背了channel關閉原則。可是咱們可讓消費端關閉一個附加的信號來通知發送端中止生產數據。
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
// The first select here is to try to exit the goroutine
// as early as possible. In fact, it is not essential
// for this example, so it can be omitted.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first branch in the
// second select may be still not selected for some
// loops if the send to dataCh is also unblocked.
// But this is acceptable, so the first select
// can be omitted.
select {
case <- stopCh:
return
case dataCh <- rand.Intn(MaxRandomNumber):
}
}
}()
}
// the receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == MaxRandomNumber-1 {
// The receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
複製代碼
就上面這個例子,生產者同時也是退出信號channel的接受者,退出信號channel仍然是由它的生產端關閉的,因此這仍然沒有違背channel關閉原則。值得注意的是,這個例子中生產端和接受端都沒有關閉消息數據的channel,channel在沒有任何goroutine引用的時候會自行關閉,而不須要顯示進行關閉。
這是最複雜的一種狀況,咱們既不能讓接受端也不能讓發送端關閉channel。咱們甚至都不能讓接受者關閉一個退出信號來通知生產者中止生產。由於咱們不能違反channel關閉原則。可是咱們能夠引入一個額外的協調者來關閉附加的退出信號channel。
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// The channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.
var stoppedBy string
// moderator
go func() {
stoppedBy = <- toStop
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// Here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
// The first select here is to try to exit the goroutine
// as early as possible. This select blocks with one
// receive operation case and one default branches will
// be optimized as a try-receive operation by the
// official Go compiler.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first branch in the
// second select may be still not selected for some
// loops (and for ever in theory) if the send to
// dataCh is also unblocked.
// This is why the first select block is needed.
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// Same as the sender goroutine, the first select here
// is to try to exit the goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
// Even if stopCh is closed, the first branch in the
// second select may be still not selected for some
// loops (and for ever in theory) if the receive from
// dataCh is also unblocked.
// This is why the first select block is needed.
select {
case <- stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// The same trick is used to notify
// the moderator to close the
// additional signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
複製代碼
以上三種場景不能涵蓋所有,可是它們是最多見最通用的三種場景,基本上全部的場景均可以劃分爲以上三類。
沒有任何場景值得你去打破channel關閉原則,若是你遇到這樣的一種特殊場景,仍是建議你好好思考一下本身設計,是否是該重構一下了。
閱讀相關文章,關注公衆號「碼洞」