上篇已經記錄到發送數據到 chanel 的三種狀況的代碼邏輯,接下來是從 chanel 接收數據的邏輯。golang
和 chansend 方法十分相似ui
if c == nil { if !block { return } gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2) throw("unreachable") }
lock(&c.lock) if c.closed != 0 && c.qcount == 0 { if raceenabled { raceacquire(c.raceaddr()) } unlock(&c.lock) if ep != nil { typedmemclr(c.elemtype, ep) } return true, false }
下面分別截取三種狀況的代碼段。this
if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true }
if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if raceenabled { raceacquire(qp) racerelease(qp) } if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true }
··· // 上面條件都不知足,則只剩一種狀況:hchan.buf 爲空 // no sender available: block on this channel. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil gp.waiting = mysg mysg.g = gp mysg.isSelect = false mysg.c = c gp.param = nil c.recvq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
假設有一個 goroutine sender,和一個 goroutine reciever,若是 reciever 執行 chanrecv 方法的時候,buf 已經爲空了,從上面的代碼最後一行知道,goparkunlock 方法使 reciever 阻塞,那麼 sender 寫數據進 chanel,reciever 又如何被 sender 喚醒呢?code
reciever 休眠後, sender 來了,sender 執行到如下代碼處。sender 從 recvq 隊列 彈出 reciever,而後執行 send 方法。對象
if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
send 方法最後一行執行 goready 方法將 reciever 喚醒。隊列
// send processes a send operation on an empty channel c. // The value ep sent by the sender is copied to the receiver sg. // The receiver is then woken up to go on its merry way. // Channel c must be empty and locked. send unlocks c with unlockf. // sg must already be dequeued from c. // ep must be non-nil and point to the heap or the caller's stack. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { if raceenabled { if c.dataqsiz == 0 { racesync(c, sg) } else { // Pretend we go through the buffer, even though // we copy directly. Note that we need to increment // the head/tail locations only when raceenabled. qp := chanbuf(c, c.recvx) raceacquire(qp) racerelease(qp) raceacquireg(sg.g, qp) racereleaseg(sg.g, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz } } if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
reciever 喚醒後,繼續執行 chanrecv 方法剩下的語句:ip
// someone woke us up if mysg != gp.waiting { throw("G waiting list is corrupted") } gp.waiting = nil if mysg.releasetime > 0 { blockevent(mysg.releasetime-t0, 2) } closed := gp.param == nil gp.param = nil mysg.c = nil releaseSudog(mysg) return true, !closed
close(ch) 對應的執行方法即爲 closechan 方法。主要有如下步驟:ci
func closechan(c *hchan) { if c == nil { panic(plainError("close of nil channel")) } lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("close of closed channel")) } if raceenabled { callerpc := getcallerpc() racewritepc(c.raceaddr(), callerpc, funcPC(closechan)) racerelease(c.raceaddr()) } c.closed = 1 var glist *g // release all readers for { sg := c.recvq.dequeue() if sg == nil { break } if sg.elem != nil { typedmemclr(c.elemtype, sg.elem) sg.elem = nil } if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } gp.schedlink.set(glist) // <------ a 語句 glist = gp // <------ b 語句 } // release all writers (they will panic) for { sg := c.sendq.dequeue() if sg == nil { break } sg.elem = nil if sg.releasetime != 0 { sg.releasetime = cputicks() } gp := sg.g gp.param = nil if raceenabled { raceacquireg(gp, c.raceaddr()) } gp.schedlink.set(glist) // <------ c 語句 glist = gp // <------ d 語句 } unlock(&c.lock) // Ready all Gs now that we've dropped the channel lock. for glist != nil { gp := glist glist = glist.schedlink.ptr() // <------ e 語句 gp.schedlink = 0 goready(gp, 3) } } //go:nosplit func (gp guintptr) ptr() *g { return (*g)(unsafe.Pointer(gp)) } // 得到 gp 的地址 //go:nosplit func (gp *guintptr) set(g *g) { *gp = guintptr(unsafe.Pointer(g)) } // 設置 gp 的地址
上面代碼有個注意的點: schedlink 是 G 的一個屬性,用於指向下一個 G 。rem
參考文章
http://legendtkl.com/2017/08/...