go語言學習--channel的關閉

在使用Go channel的時候,一個適用的原則是不要從接收端關閉channel,也不要在多個併發發送端中關閉channel。換句話說,若是sender(發送者)只是惟一的sender或者是channel最後一個活躍的sender,那麼你應該在sender的goroutine關閉channel,從而通知receiver(s)(接收者們)已經沒有值能夠讀了。維持這條原則將保證永遠不會發生向一個已經關閉的channel發送值或者關閉一個已經關閉的channel。
(咱們將會稱上面的原則爲channel closing principle)golang

保持channel closing principle的優雅方案

channel closing principle要求咱們只能在發送端進行channel的關閉,對於平常遇到的能夠歸結爲三類api

一、m個receivers,一個sender.安全

二、一個receiver,n個sender併發

三、m個receivers,n個senderdom

一、m個receivers,一個senderide

M個receivers,一個sender,sender經過關閉data channel說「再也不發送」函數

這是最簡單的場景了,就只是當sender不想再發送的時候讓sender關閉data 來關閉channel:性能

 1 package main
 2 
 3 import (
 4     "time"
 5     "math/rand"
 6     "sync"
 7     "log"
 8 )
 9 
10 func main() {
11     rand.Seed(time.Now().UnixNano())
12     log.SetFlags(0)
13 
14     // ...
15     const MaxRandomNumber = 100000
16     const NumReceivers = 100
17 
18     wgReceivers := sync.WaitGroup{}
19     wgReceivers.Add(NumReceivers)
20 
21     // ...
22     dataCh := make(chan int, 100)
23 
24     // the sender
25     go func() {
26         for {
27             if value := rand.Intn(MaxRandomNumber); value == 0 {
28                 // the only sender can close the channel safely.
29                 close(dataCh)
30                 return
31             } else {            
32                 dataCh <- value
33             }
34         }
35     }()
36 
37     // receivers
38     for i := 0; i < NumReceivers; i++ {
39         go func() {
40             defer wgReceivers.Done()
41 
42             // receive values until dataCh is closed and
43             // the value buffer queue of dataCh is empty.
44             for value := range dataCh {
45                 log.Println(value)
46             }
47         }()
48     }
49 
50     wgReceivers.Wait()
51 }

二、一個receiver,n個sendersspa

      一個receiver,N個sender,receiver經過關閉一個額外的signal channel說「請中止發送」
這種場景比上一個要複雜一點。咱們不能讓receiver關閉data channel,由於這麼作將會打破channel closing principle。可是咱們可讓receiver關閉一個額外的signal channel來通知sender中止發送值:code

 1 package main
 2 
 3 import (
 4     "time"
 5     "math/rand"
 6     "sync"
 7     "log"
 8 )
 9 
10 func main() {
11     rand.Seed(time.Now().UnixNano())
12     log.SetFlags(0)
13 
14     // ...
15     const MaxRandomNumber = 100000
16     const NumSenders = 1000
17 
18     wgReceivers := sync.WaitGroup{}
19     wgReceivers.Add(1)
20 
21     // ...
22     dataCh := make(chan int, 100)
23     stopCh := make(chan struct{})
24         // stopCh is an additional signal channel.
25         // Its sender is the receiver of channel dataCh.
26         // Its reveivers are the senders of channel dataCh.
27 
28     // senders
29     for i := 0; i < NumSenders; i++ {
30         go func() {
31             for {
32                 value := rand.Intn(MaxRandomNumber)
33 
34                 select {
35                 case <- stopCh:
36                     return
37                 case dataCh <- value:
38                 }
39             }
40         }()
41     }
42 
43     // the receiver
44     go func() {
45         defer wgReceivers.Done()
46 
47         for value := range dataCh {
48             if value == MaxRandomNumber-1 {
49                 // the receiver of the dataCh channel is
50                 // also the sender of the stopCh cahnnel.
51                 // It is safe to close the stop channel here.
52                 close(stopCh)
53                 return
54             }
55 
56             log.Println(value)
57         }
58     }()
59 
60     // ...
61     wgReceivers.Wait()
62 }

三、m個receivers,n個sender

M個receiver,N個sender,它們當中任意一個經過通知一個moderator(仲裁者)關閉額外的signal channel來講「讓咱們結束遊戲吧」
這是最複雜的場景了。咱們不能讓任意的receivers和senders關閉data channel,也不能讓任何一個receivers經過關閉一個額外的signal channel來通知全部的senders和receivers退出遊戲。這麼作的話會打破channel closing principle。可是,咱們能夠引入一個moderator來關閉一個額外的signal channel。這個例子的一個技巧是怎麼通知moderator去關閉額外的signal channel:

  1 package main
  2 
  3 import (
  4     "time"
  5     "math/rand"
  6     "sync"
  7     "log"
  8     "strconv"
  9 )
 10 
 11 func main() {
 12     rand.Seed(time.Now().UnixNano())
 13     log.SetFlags(0)
 14 
 15     // ...
 16     const MaxRandomNumber = 100000
 17     const NumReceivers = 10
 18     const NumSenders = 1000
 19 
 20     wgReceivers := sync.WaitGroup{}
 21     wgReceivers.Add(NumReceivers)
 22 
 23     // ...
 24     dataCh := make(chan int, 100)
 25     stopCh := make(chan struct{})
 26         // stopCh is an additional signal channel.
 27         // Its sender is the moderator goroutine shown below.
 28         // Its reveivers are all senders and receivers of dataCh.
 29     toStop := make(chan string, 1)
 30         // the channel toStop is used to notify the moderator
 31         // to close the additional signal channel (stopCh).
 32         // Its senders are any senders and receivers of dataCh.
 33         // Its reveiver is the moderator goroutine shown below.
 34 
 35     var stoppedBy string
 36 
 37     // moderator
 38     go func() {
 39         stoppedBy = <- toStop // part of the trick used to notify the moderator
 40                               // to close the additional signal channel.
 41         close(stopCh)
 42     }()
 43 
 44     // senders
 45     for i := 0; i < NumSenders; i++ {
 46         go func(id string) {
 47             for {
 48                 value := rand.Intn(MaxRandomNumber)
 49                 if value == 0 {
 50                     // here, a trick is used to notify the moderator
 51                     // to close the additional signal channel.
 52                     select {
 53                     case toStop <- "sender#" + id:
 54                     default:
 55                     }
 56                     return
 57                 }
 58 
 59                 // the first select here is to try to exit the
 60                 // goroutine as early as possible.
 61                 select {
 62                 case <- stopCh:
 63                     return
 64                 default:
 65                 }
 66 
 67                 select {
 68                 case <- stopCh:
 69                     return
 70                 case dataCh <- value:
 71                 }
 72             }
 73         }(strconv.Itoa(i))
 74     }
 75 
 76     // receivers
 77     for i := 0; i < NumReceivers; i++ {
 78         go func(id string) {
 79             defer wgReceivers.Done()
 80 
 81             for {
 82                 // same as senders, the first select here is to 
 83                 // try to exit the goroutine as early as possible.
 84                 select {
 85                 case <- stopCh:
 86                     return
 87                 default:
 88                 }
 89 
 90                 select {
 91                 case <- stopCh:
 92                     return
 93                 case value := <-dataCh:
 94                     if value == MaxRandomNumber-1 {
 95                         // the same trick is used to notify the moderator 
 96                         // to close the additional signal channel.
 97                         select {
 98                         case toStop <- "receiver#" + id:
 99                         default:
100                         }
101                         return
102                     }
103 
104                     log.Println(value)
105                 }
106             }
107         }(strconv.Itoa(i))
108     }
109 
110     // ...
111     wgReceivers.Wait()
112     log.Println("stopped by", stoppedBy)
113 }

打破channel closing principle

有沒有一個內置函數能夠檢查一個channel是否已經關閉。若是你能肯定不會向channel發送任何值,那麼也確實須要一個簡單的方法來檢查channel是否已經關閉:

 1 package main
 2 
 3 import "fmt"
 4 
 5 type T int
 6 
 7 func IsClosed(ch <-chan T) bool {
 8     select {
 9     case <-ch:
10         return true
11     default:
12     }
13 
14     return false
15 }
16 
17 func main() {
18     c := make(chan T)
19     fmt.Println(IsClosed(c)) // false
20     close(c)
21     fmt.Println(IsClosed(c)) // true
22 }

 

上面已經提到了,沒有一種適用的方式來檢查channel是否已經關閉了。可是,就算有一個簡單的 closed(chan T) bool函數來檢查channel是否已經關閉,它的用處仍是頗有限的,就像內置的len函數用來檢查緩衝channel中元素數量同樣。緣由就在於,已經檢查過的channel的狀態有可能在調用了相似的方法返回以後就修改了,所以返回來的值已經不可以反映剛纔檢查的channel的當前狀態了。
儘管在調用closed(ch)返回true的狀況下中止向channel發送值是能夠的,可是若是調用closed(ch)返回false,那麼關閉channel或者繼續向channel發送值就不安全了(會panic)。

The Channel Closing Principle

在使用Go channel的時候,一個適用的原則是不要從接收端關閉channel,也不要在多個併發發送端中關閉channel。換句話說,若是sender(發送者)只是惟一的sender或者是channel最後一個活躍的sender,那麼你應該在sender的goroutine關閉channel,從而通知receiver(s)(接收者們)已經沒有值能夠讀了。維持這條原則將保證永遠不會發生向一個已經關閉的channel發送值或者關閉一個已經關閉的channel。
(下面,咱們將會稱上面的原則爲channel closing principle

打破channel closing principle的解決方案

若是你由於某種緣由從接收端(receiver side)關閉channel或者在多個發送者中的一個關閉channel,那麼你應該使用列在Golang panic/recover Use Cases的函數來安全地發送值到channel中(假設channel的元素類型是T)

 1 func SafeSend(ch chan T, value T) (closed bool) {
 2     defer func() {
 3         if recover() != nil {
 4             // the return result can be altered 
 5             // in a defer function call
 6             closed = true
 7         }
 8     }()
 9 
10     ch <- value // panic if ch is closed
11     return false // <=> closed = false; return
12 }

 

若是channel ch沒有被關閉的話,那麼這個函數的性能將和ch <- value接近。對於channel關閉的時候,SafeSend函數只會在每一個sender goroutine中調用一次,所以程序不會有太大的性能損失。
一樣的想法也能夠用在從多個goroutine關閉channel中:

 1 func SafeClose(ch chan T) (justClosed bool) {
 2     defer func() {
 3         if recover() != nil {
 4             justClosed = false
 5         }
 6     }()
 7 
 8     // assume ch != nil here.
 9     close(ch) // panic if ch is closed
10     return true
11 }

 

不少人喜歡用sync.Once來關閉channel:

 1 type MyChannel struct {
 2     C    chan T
 3     once sync.Once
 4 }
 5 
 6 func NewMyChannel() *MyChannel {
 7     return &MyChannel{C: make(chan T)}
 8 }
 9 
10 func (mc *MyChannel) SafeClose() {
11     mc.once.Do(func(){
12         close(mc.C)
13     })
14 }

 

固然了,咱們也能夠用sync.Mutex來避免屢次關閉channel:

 1 type MyChannel struct {
 2     C      chan T
 3     closed bool
 4     mutex  sync.Mutex
 5 }
 6 
 7 func NewMyChannel() *MyChannel {
 8     return &MyChannel{C: make(chan T)}
 9 }
10 
11 func (mc *MyChannel) SafeClose() {
12     mc.mutex.Lock()
13     if !mc.closed {
14         close(mc.C)
15         mc.closed = true
16     }
17     mc.mutex.Unlock()
18 }
19 
20 func (mc *MyChannel) IsClosed() bool {
21     mc.mutex.Lock()
22     defer mc.mutex.Unlock()
23     return mc.closed
24 }

 

咱們應該要理解爲何Go不支持內置SafeSendSafeClose函數,緣由就在於並不推薦從接收端或者多個併發發送端關閉channel。Golang甚至禁止關閉只接收(receive-only)的channel。

相關文章
相關標籤/搜索