Go channel實現源碼分析

go通道基於go的併發調度實現,自己並不複雜,go併發調度請看個人這篇文章:go併發調度原理學習html

1.channel數據結構
type hchan struct {
   qcount   uint               // 緩衝區中已有元素個數
   dataqsiz uint               //循環隊列容量大小
   buf      unsafe.Pointer     // 緩衝區指針
   elemsize uint16             //元素大小
   closed   uint32             //關閉標記,0沒關閉,1關閉
   elemtype *_type             //數據項類型
   sendx    uint               //發送索引
   recvx    uint               //接收索引
   recvq    waitq              //等待接收排隊鏈表
   sendq    waitq              //等待發送排隊鏈表
   lock mutex                  //
}
type waitq struct {
   first *sudog
   last  *sudog
}
 
2.建立channel實現
建立channel實例:
ch := make(chan int, 4)
實現函數:
func makechan(t *chantype, size int64) *hchan
大體實現:
執行上面這行代碼會new一個hchan結構,同時建立一個dataqsiz=4的int類型的循環隊列,其實就是一個容納4個元素的數組,就是按順序往裏面寫數據,寫滿以後又從0開始寫,這個順序索引就是hchan.sendx
 
3.發送數據
發送數據實例:
ch <- 100
發送數據實現函數:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
ep指向要發送數據的首地址
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
   lock(&c.lock)
   if c.closed != 0 {
      unlock(&c.lock)
      panic(plainError("send on closed channel"))
   }
 
   if sg := c.recvq.dequeue(); sg != nil {
      //緩衝區就是一個固定長度的循環列表
      //發送隊列是一個雙向鏈表,接在緩衝區的後面,總體是一個隊列,保證先進先出
      //有接收者,並非將當前要發送的數據直接發出,而是將緩衝區的第一個元素髮送給接收者,同時將發送隊列的第一個元素加入緩衝區剛出隊列的位置
      send(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true
   }
 
   if c.qcount < c.dataqsiz {
      //緩衝區沒有滿,直接將要發送的數據複製到緩衝區,直接返回,
      qp := chanbuf(c, c.sendx)
      typedmemmove(c.elemtype, qp, ep)
      c.sendx++
      if c.sendx == c.dataqsiz {
         c.sendx = 0
      }
      c.qcount++
      unlock(&c.lock)
      return true
   }
 
   if !block {
      unlock(&c.lock)
      return false
   }
   //以上都是同步非阻塞的,ch <- 100直接返回
   
   //如下是同步阻塞
   //緩衝區滿了,也沒有接收者,通道將被阻塞,其實就是不執行當前G了,將狀態改爲等待狀態
   gp := getg()
   mysg := acquireSudog()
   c.sendq.enqueue(mysg)
   goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)
 
   //當G被喚醒,狀態改爲可執行狀態,從這裏開始繼續執行
   releaseSudog(mysg)
   return true
}
大體實現:
1:接收隊列不爲空,從接收隊列中取出第一個接收者*sudog,將數據複製到sudog.elem,複製函數爲memmove用匯編實現,通知接收方數據給你了,將接收方協程由等待狀態改爲可運行狀態,將當前協程加入協程隊列,等待被調度。
2:沒有接收者,有緩衝區且沒有滿,直接將數據複製到緩衝中,寫入緩衝區的位置爲hchan.buf[sendx++],若是緩衝區已滿sendx=0,就是循環隊列的實現,往sendx指定的位置寫數據,hchan.qcount++
3:沒有接收者,沒有緩衝區或是滿了,則從當前協程對應的P的sudog隊列中取一個struct sudog,將數據複製到sudog.elem,將sudog加入sendq隊列中,通知接收方,當前流程阻塞,等待被喚醒,接收方收到通知後(被喚醒),繼續往下執行,接收數據完成後會通知發送方,即將發送方協程狀態由等待狀態改爲可運行狀態,加入協程可運行隊列,等着被執行不會阻塞的狀況:
1:通道緩衝區沒有滿以前,由於只是將要發送的數據複製到緩衝區就返回了
2:有接收者的狀況,有數據複製到接收方的數據結構中(不是最終接收數據的變量,在執行接收函數的時候會拷貝到最終接收數據的變量),喚醒接收協程會阻塞的狀況:天然就是緩衝區滿了,也沒有接收方,這個時候會將數據打包放到發送隊列,當前協程被設置成等待狀態,這個狀態不會被調度,當有接收方收到數據後,纔會被喚醒
 
4.接收數據
接收數據實例:
val := <- ch
接收數據實現函數:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool)
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
   lock(&c.lock)
   if sg := c.sendq.dequeue(); sg != nil {
      // Found a waiting sender. If buffer is size 0, receive value
      // directly from sender. Otherwise, receive from head of queue
      // and add sender's value to the tail of the queue (both map to
      // the same buffer slot because the queue is full).
      recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
      return true, true
   }
 
   if c.qcount > 0 {
      // Receive directly from queue
      qp := chanbuf(c, c.recvx)
      if ep != nil {
         typedmemmove(c.elemtype, ep, qp)
      }
      typedmemclr(c.elemtype, qp)
      c.recvx++
      if c.recvx == c.dataqsiz {
         c.recvx = 0
      }
      c.qcount--
      unlock(&c.lock)
      return true, true
   }
 
   if !block {
      unlock(&c.lock)
      return false, false
   }
   //以上同步非阻塞
 
   //如下同步阻塞
   gp := getg()
   mysg := acquireSudog()
   c.recvq.enqueue(mysg)
   //將當前G狀態改爲等待狀態,中止調度
   goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3)
 
   //當前G被喚醒從這裏繼續執行 
   mysg.c = nil
   releaseSudog(mysg)
   return true, !closed
}
大體實現:
1.發送隊列不爲空(說明緩衝區已滿),從發送隊列中取出第一個發送者*sudog
1.1.沒有緩衝區,直接將發送隊列中的數據sudog.elem複製出來,存到接收數據的變量val中,通知發送方我處理完了,你能夠繼續執行
1.2.有緩衝區,複製出緩衝區hchan.buf[recvx]對應的元素到val,在將發送方sudog.elem複製到hchan.buf[recvx],發送方按順序寫,接收方按順序讀,典型的FIFO,爲了保證是先進先出,因此先複製出,再將隊列首元素複製到對應的緩衝區中,其實就是發送隊列鏈接在緩衝區後面,緩衝區滿了,就寫隊列,接收的時候先從緩衝區中拿數據,拿掉以後空出來的位置從發送隊列中取第一個填滿,並喚醒對應的G,只要發送隊列不爲空,緩衝區確定會被填滿
2.發送隊列爲空,緩衝區不爲空,複製出緩衝區hchan.buf[recvx]對應的元素到val,hchan.qcount--
3.發送隊列爲空,緩衝區也爲空,那就是沒有任何待接收的數據,接收流程就只能等了,將接收信息打包成sudog,加入接收隊列recvq,當前執行流程阻塞,等有發送數據後會被喚醒繼續
 
5.channel FIFO在解釋一次
5.1:緩衝區沒滿,發送數據就是進緩衝隊列,接收數據就是出緩衝隊列,比較好理解
5.2:緩衝區已滿,發送數據就是進等待隊列,接收數據先出緩衝隊列,即爲要接收的數據,等待隊列出列,將數據存在緩衝隊列剛出列的位置,剛出列的位置至關於緩衝隊列的末尾,也就是說等待隊列的列頭連在緩衝隊列的末尾,將等待隊列的列頭加入緩存隊列的列尾,保證了緩衝隊列是滿的,減小的是緩衝隊列中的數據,保證先進先出
5.3:接收數據,緩衝隊列或等待隊列有數據,拿走第一個,保證等待隊列是接在緩衝區末尾,即緩衝區末尾有空缺,就讓等待隊列出列,並填充至緩衝區末尾,不然將本身打包加入接收隊列,當前G進入等待狀態,有數據發送天然會通知你
 
總結:Go channel基於go的併發調度實現阻塞和非阻塞兩種通信方式
相關文章
相關標籤/搜索