轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客:https://www.luozhiyun.comgit
本文使用的go的源碼時14.4github
package main import "fmt" func main() { c := make(chan int) go func() { c <- 1 // send to channel }() x := <-c // recv from channel fmt.Println(x) }
咱們能夠這樣查看彙編結果:golang
go tool compile -N -l -S hello.go -N表示禁用優化 -l禁用內聯 -S打印結果
經過上面這樣的方式,咱們能夠直到chan是調用的哪些函數:數組
type hchan struct { qcount uint // 循環列表元素個數 dataqsiz uint // 循環隊列的大小 buf unsafe.Pointer // 循環隊列的指針 elemsize uint16 // chan中元素的大小 closed uint32 // 是否已close elemtype *_type // chan中元素類型 sendx uint // send在buffer中的索引 recvx uint // recv在buffer中的索引 recvq waitq // receiver的等待隊列 sendq waitq // sender的等待隊列 // 互拆鎖 lock mutex }
qcount表明chan 中已經接收但還沒被取走的元素的個數,函數 len 能夠返回這個字段的值;緩存
dataqsiz和buf分別表明隊列buffer的大小,cap函數能夠返回這個字段的值以及隊列buffer的指針,是一個定長的環形數組;數據結構
elemtype 和 elemsiz表示chan 中元素的類型和 元素的大小;app
sendx:發送數據的指針在 buffer中的位置;函數
recvx:接收請求時的指針在 buffer 中的位置;工具
recvq和sendq分別表示等待接收數據的 goroutine 與等待發送數據的 goroutine;源碼分析
sendq和recvq的類型是waitq的結構體:
type waitq struct { first *sudog last *sudog }
waitq裏面鏈接的是一個sudog雙向鏈表,保存的是等待的goroutine 。整個chan的圖例大概是這樣:
下面看一下建立chan,咱們經過彙編結果也能夠查看到make(chan int)
這句代碼會調用到runtime的makechan函數中:
const ( maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1)) ) func makechan(t *chantype, size int) *hchan { elem := t.elem // 略去檢查代碼 ... //計算須要分配的buf空間 mem, overflow := math.MulUintptr(elem.size, uintptr(size)) if overflow || mem > maxAlloc-hchanSize || size < 0 { panic(plainError("makechan: size out of range")) } var c *hchan switch { case mem == 0: // chan的size或者元素的size是0,沒必要建立buf c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector c.buf = c.raceaddr() case elem.ptrdata == 0: // 元素不是指針,分配一塊連續的內存給hchan數據結構和buf c = (*hchan)(mallocgc(hchanSize+mem, nil, true)) // 表示hchan後面在內存裏緊跟着就是buf c.buf = add(unsafe.Pointer(c), hchanSize) default: // 元素包含指針,那麼單獨分配buf c = new(hchan) c.buf = mallocgc(mem, elem, true) } c.elemsize = uint16(elem.size) c.elemtype = elem c.dataqsiz = uint(size) return c }
首先咱們能夠看到計算hchanSize:
maxAlign = 8 hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
maxAlign是8,那麼maxAlign-1的二進制就是111,而後和int(unsafe.Sizeof(hchan{}))取與就是取它的低三位,hchanSize就獲得的是8的整數倍,作對齊使用。
這裏switch有三種狀況,第一種狀況是緩衝區所需大小爲 0,那麼在爲 hchan 分配內存時,只須要分配 sizeof(hchan) 大小的內存;
第二種狀況是緩衝區所需大小不爲 0,並且數據類型不包含指針,那麼就分配連續的內存。注意的是,咱們在建立channel的時候能夠指定類型爲指針類型:
//chan裏存入的是int的指針 c := make(chan *int) //chan裏存入的是int的值 c := make(chan int)
第三種狀況是緩衝區所需大小不爲 0,並且數據類型包含指針,那麼就不使用add的方式讓hchan和buf放在一塊兒了,而是單獨的爲buf申請一塊內存。
在看發送數據的代碼以前,咱們先看一下什麼是channel的阻塞和非阻塞。
通常狀況下,傳入的參數都是 block=true
,即阻塞調用,一個往 channel 中插入數據的 goroutine 會阻塞到插入成功爲止。
非阻塞是隻這種狀況:
select { case c <- v: ... foo default: ... bar }
編譯器會將其改成:
if selectnbsend(c, v) { ... foo } else { ... bar }
selectnbsend方法傳入的block就是false:
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(c, elem, false, getcallerpc()) }
向通道發送數據咱們經過彙編結果能夠發現是在runtime 中經過 chansend 實現的,方法比較長下面咱們分段來進行理解:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { if c == nil { // 對於非阻塞的發送,直接返回 if !block { return false } // 對於阻塞的通道,將 goroutine 掛起 gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") } ... }
這裏會對chan作一個判斷,若是它是空的,那麼對於非阻塞的發送,直接返回 false;對於阻塞的通道,將 goroutine 掛起,而且永遠不會返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 非阻塞的狀況下,若是通道沒有關閉,知足如下一條: // 1.沒有緩衝區而且當前沒有接收者 // 2.緩衝區不爲0,而且已滿 if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) || (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) { return false } ... }
須要注意的是這裏是沒有加鎖的,go雖然在使用指針讀取單個值的時候原子性的,可是讀取多個值並不能保證,因此在判斷完closed雖然是沒有關閉的,那麼在讀取完以後依然可能在這一瞬間從未關閉狀態轉變成關閉狀態。那麼就有兩種可能:
有關go的一致性原語,能夠看這篇:The Go Memory Model。
上面的這些判斷被稱爲 fast path,由於加鎖的操做是一個很重的操做,因此可以在加鎖以前返回的判斷就在加鎖以前作好是最好的。
下面接着看看加鎖部分的代碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... //加鎖 lock(&c.lock) // 是否關閉的判斷 if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } // 從 recvq 中取出一個接收者 if sg := c.recvq.dequeue(); sg != nil { // 若是接收者存在,直接向該接收者發送數據,繞過buffer send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } ... }
進入了lock區域以後還須要再判斷如下close的狀態,而後從recvq 中取出一個接收者,若是已經有接收者,那麼就向第一個接收者發送當前enqueue的消息。這裏須要注意的是若是有接收者在隊列中等待,則說明此時的緩衝區是空的。
既然是一行行分析代碼,那麼咱們再進入到send看一下實現:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ... if sg.elem != nil { // 直接把要發送的數據copy到reciever的棧空間 sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 喚醒對應的 goroutine goready(gp, skip+1) }
在send方法裏,sg就是goroutine打包好的對象,ep是對應要發送數據的指針,sendDirect方法會調用memmove進行數據的內存拷貝。而後goready函數會喚醒對應的 goroutine進行調度。
回到chansend方法,繼續往下看:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 若是緩衝區沒有滿,直接將要發送的數據複製到緩衝區 if c.qcount < c.dataqsiz { // 找到buf要填充數據的索引位置 qp := chanbuf(c, c.sendx) ... // 將數據拷貝到 buffer 中 typedmemmove(c.elemtype, qp, ep) // 數據索引前移,若是到了末尾,又從0開始 c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } // 元素個數加1,釋放鎖並返回 c.qcount++ unlock(&c.lock) return true } ... }
這裏會判斷buf緩衝區有沒有滿,若是沒有滿,那麼就找到buf要填充數據的索引位置,調用typedmemmove方法將數據拷貝到buf中,而後從新設值sendx偏移量。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { ... // 緩衝區沒有空間了,因此對於非阻塞調用直接返回 if !block { unlock(&c.lock) return false } // 建立 sudog 對象 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil // 將sudog 對象入隊 c.sendq.enqueue(mysg) // 進入等待狀態 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2) ... }
這裏會作兩部分的操做,對於非阻塞的調用會直接返回;對於阻塞的調用會建立sudog 對象,而後將sudog對象入隊以後gopark將 goroutine 轉入 waiting 狀態,並解鎖。調用gopark以後,在使用者看來該向 channel 發送數據的代碼語句會進行阻塞。
這裏也須要注意一下,若是緩衝區爲0,那麼也會進入到這裏,會調用到gopark立馬阻塞,因此在使用的時候須要記得接收數據,防止向chan發送數據的那一端永遠阻塞,如:
func process(timeout time.Duration) bool { ch := make(chan bool) go func() { // 模擬處理耗時的業務 time.Sleep((timeout + time.Second)) ch <- true // block fmt.Println("exit goroutine") }() select { case result := <-ch: return result case <-time.After(timeout): return false } }
若是這裏在select的時候直接timeout返回了,而沒有調用 result := <-ch
,那麼goroutine 就會永遠阻塞。
到這裏發送的代碼就講解完了,整個流程大體以下:
好比我要執行:ch<-10
從chan獲取數據實現函數爲 chanrecv。下面咱們看一下代碼實現:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... if c == nil { // 若是 c 爲空且是非阻塞調用,那麼直接返回 (false,false) if !block { return } // 阻塞調用直接等待 gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") } // 對於非阻塞的狀況,而且沒有關閉的狀況 // 若是是無緩衝chan或者是chan中沒有數據,那麼直接返回 (false,false) if !block && (c.dataqsiz == 0 && c.sendq.first == nil || c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) && atomic.Load(&c.closed) == 0 { return } // 上鎖 lock(&c.lock) // 若是已經關閉,而且chan中沒有數據,返回 (true,false) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false } ... }
chanrecv方法和chansend方法是同樣的,首先也是作非空判斷,若是chan沒有初始化,那麼若是是非阻塞調用,那麼直接返回 (false,false),阻塞調用會直接等待;
下面的兩個if判斷我放在一塊兒來進行講解,由於這裏和chansend是不同的,chanrecv要根據不一樣條件須要返回不一樣的結果。
在上鎖以前的判斷是邊界條件的判斷:若是是非阻塞調用會判斷chan沒有發送方(dataqsiz爲空且發送隊列爲空),或chan的緩衝爲空(dataqsiz>0 而且qcount==0)而且chan是沒有close,那麼須要返回 (false,false);而chan已經關閉了,而且buf中沒有數據,須要返回 (true,false);
爲了實現這個需求,因此在chanrecv方法裏面邊界條件的判斷都使用atomic方法進行了獲取。
由於須要正確的獲得chan已關閉,而且 buf 空會返回 (true, false),而不是 (false,false),因此在lock上鎖以前須要使用atomic來獲取參數防止重排序(Happens Before),所以必須使此處的 qcount 和 closed 的讀取操做的順序經過原子操做獲得順序保障。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 從發送者隊列獲取數據 if sg := c.sendq.dequeue(); sg != nil { // 發送者隊列不爲空,直接從發送者那裏提取數據 recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } ... } func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { // 若是是無緩衝區chan if c.dataqsiz == 0 { ... if ep != nil { // 直接從發送者拷貝數據 recvDirect(c.elemtype, sg, ep) } // 有緩衝區chan } else { // 獲取buf的存放數據指針 qp := chanbuf(c, c.recvx) ... // 直接從緩衝區拷貝數據給接收者 if ep != nil { typedmemmove(c.elemtype, ep, qp) } // 從發送者拷貝數據到緩衝區 typedmemmove(c.elemtype, qp, sg.elem) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } sg.elem = nil gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } // 將發送者喚醒 goready(gp, skip+1) }
在這裏若是有發送者在隊列等待,那麼直接從發送者那裏提取數據,而且喚醒這個發送者。須要注意的是因爲有發送者在等待,因此若是有緩衝區,那麼緩衝區必定是滿的。
在喚醒發送者以前須要對緩衝區作判斷,若是是無緩衝區,那麼直接從發送者那裏提取數據;若是有緩衝區首先會獲取recvx的指針,而後將從緩衝區拷貝數據給接收者,再將發送者數據拷貝到緩衝區。
而後將recvx加1,至關於將新的數據移到了隊尾,再將recvx的值賦值給sendx,最後調用goready將發送者喚醒,這裏有些繞,咱們經過圖片來展現:
這裏展現的是在chansend中將數據拷貝到緩衝區中,當數據滿的時候會將sendx的指針置爲0,因此當buf環形隊列是滿的時候sendx等於recvx。
而後再來看看chanrecv中發送者隊列有數據的時候移交緩衝區的數據是怎麼作的:
這裏會將recvx爲0處的數據直接從緩存區拷貝數據給接收者,而後將發送者拷貝數據到緩衝區recvx指針處,而後將recvx指針加1並將recvx賦值給sendx,因爲是滿的因此用recvx加1的效果實現了將新加入的數據入庫到隊尾的操做。
接着往下看:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 若是緩衝區中有數據 if c.qcount > 0 { qp := chanbuf(c, c.recvx) ... // 從緩衝區複製數據到 ep if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) // 接收數據的指針前移 c.recvx++ // 環形隊列,若是到了末尾,再從0開始 if c.recvx == c.dataqsiz { c.recvx = 0 } // 緩衝區中現存數據減一 c.qcount-- unlock(&c.lock) return true, true } ... }
到了這裏,說明緩衝區中有數據,可是發送者隊列沒有數據,那麼將數據拷貝到接收數據的協程,而後將接收數據的指針前移,若是已經到了隊尾,那麼就從0開始,最後將緩衝區中現存數據減一併解鎖。
下面就是緩衝區中沒有數據的狀況:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { ... // 非阻塞,直接返回 if !block { unlock(&c.lock) return false, false } // 建立sudog gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil // 將sudog添加到接收隊列中 c.recvq.enqueue(mysg) // 阻塞住goroutine,等待被喚醒 gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2) ... }
若是是非阻塞調用,直接返回;阻塞調用會將當前goroutine 封裝成sudog,而後將sudog添加到接收隊列中,調用gopark阻塞住goroutine,等待被喚醒。
關閉通道會調用到closechan方法:
func closechan(c *hchan) { // 1. 校驗chan是否已初始化 if c == nil { panic(plainError("close of nil channel")) } // 加鎖 lock(&c.lock) // 若是已關閉了,那麼不能被再次關閉 if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } ... // 設置chan已關閉 c.closed = 1 // 申明一個存放g的list,用於存放在等待隊列中的groutine var glist gList // 2. 獲取全部接收者 for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } // 加入隊列中 glist.push(gp) } // 獲取全部發送者 for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } // 加入隊列中 glist.push(gp) } unlock(&c.lock) // 3.喚醒全部的glist中的goroutine for !glist.empty() { gp := glist.pop() gp.schedlink = 0 goready(gp, 3) } }
chan在go中是一個很是強大的工具,使用它能夠實現不少功能,可是爲了可以高效的使用它咱們也應該去了解裏面是如何實現的。這篇文章經過一步步分析從零開始瞭解go的chan是如何實現的,以及在使用過程當中有什麼須要注意的事項,chan的buf環形隊列是怎樣維護的,但願能對你有所幫助~
https://speakerdeck.com/kavya719/understanding-channels
https://github.com/talkgo/night/issues/450
https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8