go通道基於go的併發調度實現,自己並不複雜,go併發調度請看個人這篇文章:go併發調度原理學習html
type hchan struct { qcount uint // 緩衝區中已有元素個數 dataqsiz uint //循環隊列容量大小 buf unsafe.Pointer // 緩衝區指針 elemsize uint16 //元素大小 closed uint32 //關閉標記,0沒關閉,1關閉 elemtype *_type //數據項類型 sendx uint //發送索引 recvx uint //接收索引 recvq waitq //等待接收排隊鏈表 sendq waitq //等待發送排隊鏈表 lock mutex //鎖 } type waitq struct { first *sudog last *sudog }
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { //緩衝區就是一個固定長度的循環列表 //發送隊列是一個雙向鏈表,接在緩衝區的後面,總體是一個隊列,保證先進先出 //有接收者,並非將當前要發送的數據直接發出,而是將緩衝區的第一個元素髮送給接收者,同時將發送隊列的第一個元素加入緩衝區剛出隊列的位置 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { //緩衝區沒有滿,直接將要發送的數據複製到緩衝區,直接返回, qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } //以上都是同步非阻塞的,ch <- 100直接返回 //如下是同步阻塞 //緩衝區滿了,也沒有接收者,通道將被阻塞,其實就是不執行當前G了,將狀態改爲等待狀態 gp := getg() mysg := acquireSudog() c.sendq.enqueue(mysg) goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) //當G被喚醒,狀態改爲可執行狀態,從這裏開始繼續執行 releaseSudog(mysg) return true }
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { lock(&c.lock) if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } //以上同步非阻塞 //如下同步阻塞 gp := getg() mysg := acquireSudog() c.recvq.enqueue(mysg) //將當前G狀態改爲等待狀態,中止調度 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) //當前G被喚醒從這裏繼續執行 mysg.c = nil releaseSudog(mysg) return true, !closed }