本文會嘗試解釋 go runtime 中 channel 和 select 的具體實現,部份內容來自 gophercon2017。Go版本爲1.8.3數組
第一部分講述一下 channel 的用法。channel 能夠看作一個隊列,用於多個goroutine之間的通訊,例以下面的例子,一個goroutine發送msg,另外一個msg接受消息。channel 分爲帶緩衝和不帶緩衝,差異不是很大,具體請自行google。看一個簡單的例子,瞭解一下channel的使用。app
package main import "fmt" func main() { // Create a new channel with `make(chan val-type)`. // Channels are typed by the values they convey. messages := make(chan string) // Send a value into a channel using the `channel <-` // syntax. Here we send `"ping"` to the `messages` // channel we made above, from a new goroutine. go func() { messages <- "ping" }() // The `<-channel` syntax receives a value from the // channel. Here we'll receive the `"ping"` message // we sent above and print it out. msg := <-messages fmt.Println(msg) }
channel的功能點:oop
咱們圍繞這3點功能展開,講講具體的實現。ui
註釋標註了幾個重要的變量,從功能上大體能夠分爲兩個功能單元,一個是 ring buffer,用於存數據; 一個是存放 goroutine 的隊列。this
type hchan struct { qcount uint // 當前隊列中的元素個數 dataqsiz uint // 緩衝隊列的固定大小 buf unsafe.Pointer // 緩衝數組 elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // 下一次發送的 index recvx uint // 下一次接收的 index recvq waitq // 接受者隊列 sendq waitq // 發送者隊列 // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex }
主要是如下變量組成的功能, 一個 buf 存儲實際數據,兩個指針分別表明發送,接收的索引位置,配合 size, count 在數組大小範圍內來回滑動。google
qcount uint // 當前隊列中的元素個數 dataqsiz uint // 緩衝隊列的固定大小 buf unsafe.Pointer // 緩衝數組 sendx uint // 下一次發送的 index recvx uint // 下一次接收的 index
舉個例子,假設咱們初始化了一個帶緩衝的channel, ch := make(chan int, 3)
, 那麼它初始狀態的值爲:指針
qcount = 0 dataqsiz = 3 buf = [3]int{0, 0, 0} // 表示長度爲3的數組 sendx = 0 recvx = 0
第一步,向 channel 裏 send 一個值, ch <- 1
, 由於如今緩衝還沒滿,因此操做後狀態以下:code
qcount = 1 dataqsiz = 3 buf = [3]int{1, 0, 0} // 表示長度爲3的數組 sendx = 1 recvx = 0
快進兩部,連續向 channel 裏 send 兩個值 (2, 3),狀態以下:索引
qcount = 3 dataqsiz = 3 buf = [3]int{1, 2, 3} // 表示長度爲3的數組 sendx = 0 // 下一個發送的 index 回到了0 recvx = 0
從 channel 中 receive 一個值, <- ch
, 狀態以下:隊列
qcount = 2 dataqsiz = 3 buf = [3]int{1, 2, 3} // 表示長度爲3的數組 sendx = 0 // 下一個發送的 index 回到了0 recvx = 1 // 下一個接收的 index
咱們看下,若是 receive channel 時,channel 的 buffer中沒有數據是怎麼處理的。邏輯在 chanrecv
這個方法中,它的大體流程以下,僅保留了阻塞操做的代碼。
func chanrecv(t *chantype, c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { // 檢查 channdel 是否爲 nil // 當不阻塞時,檢查buffer大小,當前大小,檢查chennel是否關閉,看看是否能直接返回 // 檢查發送端是否有等待的goroutine,下部分會提到 // 當前buffer中有數據,則嘗試取出。 // 若是非阻塞,直接返回 // 沒有sender等待,buffer中沒有數據,則阻塞等待。 gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.selectdone = nil mysg.c = c gp.param = nil c.recvq.enqueue(mysg) //關鍵操做:設置 goroutine 狀態爲 waiting, 把 G 和 M 分離 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) // someone woke us up // 被喚醒,清理 sudog if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed }
這裏的操做就是 建立一個 當前 goroutine 的 sudog, 而後把這個 sudog 放入 channel 的接受者等待隊列;設置當前 G 的狀態,和 M分離,到這裏當前G就阻塞了,代碼不會執行下去。
當被喚醒後,執行sudog的清理操做。這裏接受buffer中的值的指針是 ep
這個變量,被喚醒後好像沒有向 ep
中賦值的操做。這個咱們下部分會講。
還剩最後一個疑問,當一個goroutine由於channel阻塞,另外一個goroutine是如何喚醒它的。
channel 中有兩個 waitq
類型的變量, 看下結構發現,就是sudog的鏈表,關鍵是 sudog。sudog中包含了goroutine的引用,注意一下 elem
這個變量,註釋說可能會指向stack。
type waitq struct { first *sudog last *sudog } type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this. g *g selectdone *uint32 // CAS to 1 to win select race (may point to stack) next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // waitlink is only accessed by g. acquiretime int64 releasetime int64 ticket uint32 waitlink *sudog // g.waiting list c *hchan // channel }
講阻塞部分的時候,咱們看到goroutine被調度以前,有一個 enqueue
操做,這時,當前G的sudog已經被存入recvq
中,咱們看下發送者這時的操做。
這裏的操做是,sender發送的值 直接被拷貝到 sudog.elem 了。而後喚醒 sudog.g ,這樣對面的receiver goroutine 就被喚醒了。具體請下面的註釋。
func chansend(t *chantype, c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { // 檢查工做 // 若是能從 chennel 的 recvq 彈出 sudog, 那麼直接send if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }) return true } // buffer有空餘空間,返回; 阻塞操做 } func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func()) { // 處理 index // 關鍵 if sg.elem != nil { // 這裏是根據 elemtype.size 複製內存 sendDirect(c.elemtype, sg, ep) sg.elem = nil } // 一些處理 // 從新設置 goroutine 的狀態,喚醒它 goready(gp, 4) } func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src is on our stack, dst is a slot on another stack. // Once we read sg.elem out of sg, it will no longer // be updated if the destination's stack gets copied (shrunk). // So make sure that no preemption points can happen between read & use. dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) memmove(dst, src, t.size) } // memmove copies n bytes from "from" to "to". // in memmove_*.s //go:noescape func memmove(to, from unsafe.Pointer, n uintptr)
在看 chanrecv()
方法 時,發現了一個 block 參數,表明操做是否阻塞。通常狀況下,channel 都是阻塞的(不考慮buffer),那何時非阻塞呢?
第一個想到的就是 select, 在寫了default case的時候,其餘的channel是非阻塞的。
還有一個可能不經常使用,就是 channel 的反射 value, 能夠是非阻塞的,這個方法是public的,咱們先看下簡單的。
func (v Value) TryRecv() (x Value, ok bool) func (v Value) TrySend(x Value) bool
select 就複雜一點點,首先在源碼中發現一段註釋:
// compiler implements // // select { // case c <- v: // ... foo // default: // ... bar // } // // as // // if selectnbsend(c, v) { // ... foo // } else { // ... bar // } // func selectnbsend(t *chantype, c *hchan, elem unsafe.Pointer) (selected bool) { return chansend(t, c, elem, false, getcallerpc(unsafe.Pointer(&t))) } // compiler implements // // select { // case v = <-c: // ... foo // default: // ... bar // } // // as // // if selectnbrecv(&v, c) { // ... foo // } else { // ... bar // } // func selectnbrecv(t *chantype, elem unsafe.Pointer, c *hchan) (selected bool) { selected, _ = chanrecv(t, c, elem, false) return }
若是是一個 case + default 的模式,那麼編譯器就調用以上方法來實現。
若是是多個 case + default 的模式呢?select 在runtime究竟是如何執行的?寫個簡單的select編譯一下。
package main func main() { var ch chan int select { case <-ch: case ch <- 1: default: } }
go tool compile -S -l -N test.go > test.s
結果中找一下關鍵字,例如:
0x008c 00140 (test.go:5) CALL runtime.newselect(SB) 0x00ad 00173 (test.go:6) CALL runtime.selectrecv(SB) 0x00ec 00236 (test.go:7) CALL runtime.selectsend(SB) 0x0107 00263 (test.go:8) CALL runtime.selectdefault(SB) 0x0122 00290 (test.go:5) CALL runtime.selectgo(SB)
這裏 selectgo
是實際運行的方法,找一下,注意註釋。先檢查channel是否能操做,若是不能操做,就走 default 邏輯。
loop: // pass 1 - look for something already waiting var dfl *scase var cas *scase for i := 0; i < int(sel.ncase); i++ { cas = &scases[pollorder[i]] c = cas.c switch cas.kind { // 接受數據 case caseRecv: sg = c.sendq.dequeue() // 若是有 sender 在等待 if sg != nil { goto recv } // 當前buffer中有數據 if c.qcount > 0 { goto bufrecv } // 關閉的channel if c.closed != 0 { goto rclose } case caseSend: if raceenabled { racereadpc(unsafe.Pointer(c), cas.pc, chansendpc) } // 關閉 if c.closed != 0 { goto sclose } // 有 receiver 正在等待 sg = c.recvq.dequeue() if sg != nil { goto send } // 有空間接受 if c.qcount < c.dataqsiz { goto bufsend } // 走default case caseDefault: dfl = cas } } if dfl != nil { selunlock(scases, lockorder) cas = dfl goto retc }