源碼解讀channel

「本文已參與好文召集令活動,點擊查看:後端、大前端雙賽道投稿,2萬元獎池等你挑戰!前端

常見姿式

從一個nil的chan接收數據會deadlockweb

func main() {
   var a chan int
   fmt.Println(<-a)
}
fatal error: all goroutines are asleep - deadlock!
複製代碼

向一個nil的chan發送數據會deadlock後端

func main() {
   var a chan int
   a <- 1
}
fatal error: all goroutines are asleep - deadlock!
複製代碼

從一個已經關閉的chan獲取數據,獲得對應的零值數組

func main() {
   var a chan int
   a = make(chan int, 1)
   close(a)
   v, ok := <-a
   fmt.Println(v, ok) //0,false
}
複製代碼

向一個已經關閉的chan發送數據會panic安全

func main() {
   var a chan int
   a = make(chan int, 1)
   close(a)
   a <- 1
}
panic: send on closed channel
複製代碼

把一個已經關閉的chan再次關閉會panicmarkdown

func main() {
   var a chan int
   a = make(chan int, 1)
   close(a)
   close(a)
}
panic: close of closed channel
複製代碼

沒有buffer的chan,要提早作好接收的準備,不然會deadlocksvg

func main() {
   var a chan int
   a = make(chan int)
   a <- 1
}
fatal error: all goroutines are asleep - deadlock!
複製代碼

有buffer的chan,在buffer滿了以後,再發送會deadlock源碼分析

func main() {
   var a chan int
   a = make(chan int, 1)
   a <- 1 //不會報錯
   a <- 2 //報錯
}
fatal error: all goroutines are asleep - deadlock!
複製代碼

nil的chan,在select和default組合下,不會報錯post

func main() {
   var a chan int
   select {
   case <-a: //nil的chan並不會報錯
   default: //會走到default

   }
}
複製代碼

range一個chan,記得close。不然會deadlockui

func main() {
   var a chan int
   a = make(chan int)
   go func() {
      a <- 1
      close(a) //若是沒close,那麼就會deadlock
   }()
   for v := range a {
      fmt.Println(v)
   }
   time.Sleep(time.Second)
}
複製代碼

源碼分析

從make chan開始

func makechan(t *chantype, size int) *hchan {
   elem := t.elem
   省略...
   switch {
   case mem == 0:           //無緩衝的chan
      c = (*hchan)(mallocgc(hchanSize, nil, true))
      c.buf = c.raceaddr()
   case elem.ptrdata == 0:  //元素不含指針
      c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
      c.buf = add(unsafe.Pointer(c), hchanSize)
   default:                //默認
      c = new(hchan)
      c.buf = mallocgc(mem, elem, true)
   }

   c.elemsize = uint16(elem.size)
   c.elemtype = elem
   c.dataqsiz = uint(size)
   lockInit(&c.lock, lockRankHchan)

   if debugChan {
      print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
   }
   return c
}
複製代碼
  • 若是new的是無buffer的chan,那麼只須要new一個hchan便可
  • 若是new的是有buffer的且元素類型不是指針類型,那麼hchan和buffer能夠一塊兒申請一塊連續的內存
  • 若是new的是有buffer的且元素是指針類型,那麼就不能一塊兒申請,hchan單獨申請,buffer單獨申請。

hchan的結構

type hchan struct {
   qcount   uint           // 環形隊列的長度
   dataqsiz uint           // 環形隊列的長度、緩衝區的大小
   buf      unsafe.Pointer // 環形隊列指針
   elemsize uint16         // 每一個元素的大小
   closed   uint32         // 通道是否關閉,1關閉 0打開
   elemtype *_type         // 元素類型
   sendx    uint           // 已發送元素在循環數組中的索引
   recvx    uint           // 已接收元素在循環數組中的索引
   recvq    waitq          // 等待接收的goroutine隊列
   sendq    waitq          // 等待發送的goroutine隊列
   lock mutex              //互斥鎖,數據的出入都是要鎖保護的
}
複製代碼
  • buf自己是個環形隊列
  • sendx:每次推入數據的時候會加1
  • recvx:每次取出數據的時候會加1
  • recvq:每次取數據發生阻塞的時候,會把當前goroutine放入此隊列,待下次喚醒
  • sendq:每次推數據發生阻塞的時候,會把當前goroutine放入此隊列,待下次喚醒
  • lock:buf是一塊公共的內存空間,因此須要一把鎖

發一個數據

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
   chansend(c, elem, true, getcallerpc()) // block=true
}
複製代碼

像c<-x這樣的語句會被編譯成chansend1chansend1調用chansend

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   if c == nil {
      if !block { //block=false的狀況 通常在select的時候
         return false
      }
      gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) //向nil的chan發送會發生阻塞
      throw("unreachable")
   }
   if !block && c.closed == 0 && full(c) { //select的時候,chan沒關閉,且(buffer也滿了或接收方未準備好接收)
      return false
   }
   ...

   lock(&c.lock) //上鎖安全保護

   if c.closed != 0 { //已經關閉的chan
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }

   if sg := c.recvq.dequeue(); sg != nil { //正好有等待取的goroutine
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }

   if c.qcount < c.dataqsiz { //緩衝區沒滿
      qp := chanbuf(c, c.sendx) //獲取當前sendx的索引
      ...
     typedmemmove(c.elemtype, qp, ep) //新元素的copy進去
      c.sendx++ // 索引+1
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++ //數量加+1
      unlock(&c.lock)
      return true
   }

// 接下來都是緩衝區滿的狀況
   if !block {
      unlock(&c.lock)
      return false
   }
   gp := getg()
   mysg := acquireSudog() //打包suog
   c.sendq.enqueue(mysg) //推入發送隊列
   ...
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) //讓發送的goroutine進入睡眠,等待被喚醒
   //如下是恢復時乾的事
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if gp.param == nil {
      if c.closed == 0 {
         throw("chansend: spurious wakeup")
      }
      panic(plainError("send on closed channel"))
   }
   gp.param = nil
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   mysg.c = nil
   releaseSudog(mysg) //釋放sudog
   return true
}
複製代碼
  • chansend的block參數爲false的狀況,通常就是咱們用select的時候
  • 向一個nil的chan發送數據會阻塞(非select)
  • 向一個chan發數據的時候,其實底層也是會上鎖的。
  • 向一個chan發送的時候,若是正好有一個等待的取的goroutine,那麼直接發給它
  • 當此時沒有正好等待接收者的時候,會嘗試放到緩衝區中
  • sendx用於記錄發送的索引
  • 當緩衝區滿或者沒有緩衝區的時候,將當前goroutine打包成sudog結構
  • 將sudog推入sendq的隊列
  • 當前發送者的goroutine會嘗試進入休眠,等待下次被喚醒
  • 喚醒後,會繼續執行數據的發送

接收一個數據

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
   chanrecv(c, elem, true)
}
複製代碼

像<-c這樣的語句會被編譯成chanrecv1chanrecv1調用chanrecv

func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   ...
   if c == nil {
      if !block { //不阻塞的話,直接返回
         return
      }
      //從一個nil的chan接收數據會阻塞
      gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
      throw("unreachable")
   }
   ...
   lock(&c.lock) //上鎖

   if c.closed != 0 && c.qcount == 0 {
      if raceenabled {
         raceacquire(c.raceaddr())
      }
      unlock(&c.lock)
      if ep != nil {
         typedmemclr(c.elemtype, ep)
      }
      return true, false
   }

   if sg := c.sendq.dequeue(); sg != nil { //正好有發送者
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3) //(帶緩衝和不帶緩衝的接收)
      return true, true
   }

   if c.qcount > 0 { // buffer有數據
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
    if ep != nil {
       typedmemmove(c.elemtype, ep, qp) //直接取數據 copy方式
    }
      typedmemclr(c.elemtype, qp) //清理取掉的位置
      c.recvx++ //索引加1
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }
   //如下緩衝區無數據
   if !block { //不須要阻塞的話,直接返回
      unlock(&c.lock)
      return false, false
   }
   gp := getg()
   mysg := acquireSudog() //打包成sudog
   ...
   c.recvq.enqueue(mysg) //推入recvq隊列
   gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)  //讓出cpu,等待下次調度

   //被喚醒後
   if mysg != gp.waiting {
      throw("G waiting list is corrupted")
   }
   gp.waiting = nil
   gp.activeStackChans = false
   if mysg.releasetime > 0 {
      blockevent(mysg.releasetime-t0, 2)
   }
   closed := gp.param == nil
   gp.param = nil
   mysg.c = nil
   releaseSudog(mysg) //解包sudog
   return true, !closed
}
複製代碼
  • 從一個nil的chan接收數據的時候,會阻塞(非select 模式)
  • 從一個chan接收數據的時候,底層也是會上鎖的
  • 當接收的時候,正好有發送者,嘗試直接從發送者那裏取數據。(無緩衝的話,直接從發送者的copy到接收者,有緩衝的話,說明此時緩衝確定滿了,那麼從緩衝區取走數據後,同時也喚醒下發送方能夠繼續發送數據了)
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
   if c.dataqsiz == 0 { //無緩衝區
   ...
      if ep != nil {
         // 從發送者那裏copy數據
         recvDirect(c.elemtype, sg, ep)
      }
   } else {//緩衝區必定滿了
      ...
      // copy data
      typedmemmove(c.elemtype, qp, sg.elem)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
   }
   ...
   goready(gp, skip+1) //喚醒準備發送的goroutine
}
複製代碼
  • buffer有數據的時候,嘗試直接從buffer copy數據出來
  • buffer無數據的時候,若是不阻塞直接返回
  • 若是阻塞,那麼當前goroutine被打包成sudog
  • 而後推入等待接收的隊列中
  • 讓出cpu,等待下次被調度
  • 被喚醒後,繼續執行獲取數據

關閉一個通道

func closechan(c *hchan) {
   if c == nil {
      panic(plainError("close of nil channel")) // close 一個nil的chan panic
   }

   lock(&c.lock) //上鎖
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("close of closed channel")) //已經關閉的chan 再次close會panic
   }
   ...
   c.closed = 1 //關閉chan

   var glist gList

   // release all readers
   for {
      sg := c.recvq.dequeue()
      if sg == nil {
         break
      }
      if sg.elem != nil {
         typedmemclr(c.elemtype, sg.elem)
         sg.elem = nil
      }
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      if raceenabled {
         raceacquireg(gp, c.raceaddr())
      }
      glist.push(gp)
   }

   // release all writers (they will panic)
   for {
      sg := c.sendq.dequeue()
      if sg == nil {
         break
      }
      sg.elem = nil
      if sg.releasetime != 0 {
         sg.releasetime = cputicks()
      }
      gp := sg.g
      gp.param = nil
      if raceenabled {
         raceacquireg(gp, c.raceaddr())
      }
      glist.push(gp)
   }
   unlock(&c.lock)

   // Ready all Gs now that we've dropped the channel lock.
   for !glist.empty() {
      gp := glist.pop()
      gp.schedlink = 0
      goready(gp, 3)
   }
}
複製代碼
  • 關閉一個nil的chan會panic
  • 關閉一個已經關閉的chan會panic
  • 把chan的close標識1
  • 釋放全部的recvq list
  • 釋放全部的sendq list
相關文章
相關標籤/搜索