golang CSP 模型中的
C
, 主要用於goroutine之間消息的傳遞,咱們知道在寫代碼的過程當中,解偶是很是重要的一環,而使用channel則能夠很好的隔離goroutine,使得goroutne之間的交互,只須要將重心關注在如何從channel中消費或者生產消息。
使用make聲明一個channel
ch := make(chan int) ch <- 1 // write ch位於 <- 的左側(表明數據流入 <- ch // read ch位於 <- 的右側(表明數據流出
在真正使用channel前,咱們須要瞭解channel可能會產生 阻塞場景的全部可能,以防止在代碼中編寫出不符合咱們預期的代碼。
下面咱們羅列出可能的四種情形
channel中無數據,可是執行 <- channel (讀
ch := make(chan interface{}) <-ch fmt.Println("read buf succ")
channel中無數據,往 channel <- (寫 ,可是沒有goroutine讀取。
ch := make(chan interface{}) ch <- 1 fmt.Println("read buf succ")
channel中無數據,可是執行 <- channel
ch := make(chan interface{}, 1) <-ch fmt.Println("read buf succ")
channel中已滿, 繼續執行 channel <- 動做,可是沒有goroutine讀取。
ch := make(chan interface{}, 1) ch <- 1 ch <- 2 fmt.Println("read buf succ")
使用 close關閉channel
ch := make(chan interface{}) close(ch)
關閉channel須要注意golang
- 重複關閉會 panic
- 向關閉的channel發送數據會panic
- 從關閉的channel讀取數據,會讀取到值的初始值,好比interface類型,讀取到的就是nil
range 字段會阻塞監聽 channel, 直到channel 被close。
func recv(ch chan int) { for msg := range ch { // 使用 range 能夠自動等待 ch 的行爲, 直到ch 被close。 fmt.Println(msg) } fmt.Println("channel closed") } func send(ch chan int, msg int) { ch <- msg } func main() { ch := make(chan int, 2) go recv(ch) ch <- 1 ch <- 2 ch <- 3 time.AfterFunc(time.Second*2, func() { close(ch) }) }
select 的大體工做原理app
- 檢查全部的
case
- 當檢查的
case
已經能夠發送|接收,則執行當前代碼塊- 當有多個
case
能夠執行,則隨機
選擇一個執行- 當沒有
case
能夠執行,則阻塞- 若是存在
default
,當沒有可執行代碼塊時,則執行default
代碼塊使用select來管理channel的讀取, 經過default防止阻塞.性能
func readCh(ch chan interface{}) error { select { case v := <-ch: fmt.Println(v) default: return errors.New("no data") } return nil }
使用 timer 或者 context 來進行到期退出斷定. 另外咱們也可使用sync.Once()這種形式設定一個開關,
來控制select的退出邏輯,可參照grpc/internal/grpcsync/event.go
func readCh(ch chan interface{}) error { select { case v := <-ch: fmt.Println(v) case <-time.After(time.Second): return errors.New("time arrived") } return nil }
下面代碼的 Unbounded 實現摘自grpc/internal/buffer/unbounded.go,
它沒有選擇使用帶容量的channel,而是另外使用了一個list來備份積壓的消息,這裏我猜有兩個緣由code
- 使用這種方式channel變成了一個任意長度的channel,不用考慮channel被寫滿致使的問題。
- 這裏爲何不直接使用list + mutex,由於須要channel的特性來隔離goroutine。
type Unbounded struct { c chan interface{} backlog []interface{} sync.Mutex } func NewUnbounded() *Unbounded { return &Unbounded{c: make(chan interface{}, 1)} } func (b *Unbounded) Put(t interface{}) { b.Lock() if len(b.backlog) == 0 { select { case b.c <- t: b.Unlock() return default: } } b.backlog = append(b.backlog, t) b.Unlock() } func (b *Unbounded) Load() { b.Lock() if len(b.backlog) > 0 { select { case b.c <- b.backlog[0]: b.backlog[0] = nil b.backlog = b.backlog[1:] default: } } b.Unlock() } func (b *Unbounded) Get() <-chan interface{} { return b.c } var q *Queue type Queue struct { buf *Unbounded } type QueueInterface interface { consume() produce(info int) } func (q *Queue) consume() { for { select { case t := <-q.buf.Get(): q.buf.Load() fmt.Println(t) case <-time.After(time.Second * 10): fmt.Println(errors.New("the end")) } } } func (q *Queue) produce(info int) { q.buf.Put(info) } func main() { q := &Queue{ buf: NewUnbounded(), } go q.consume() q.produce(1) q.produce(3) time.AfterFunc(time.Second*2, func() { for i := 0; i < 3; i++ { q.produce(4) } //q.produce(4) }) select {} }
注: 這裏的實現使用了interface做爲channel的消息體,凡是在有性能瓶頸的地方應該使用具體的類型獨立實現一版,相似grpc/internal/transport.go中的recvBuffer