有了上篇的基本瞭解,能夠翻閱源碼了golang
// Go/src/runtime/chan.go type hchan struct { qcount uint // total data in the queue dataqsiz uint // size of the circular queue buf unsafe.Pointer // points to an array of dataqsiz elements elemsize uint16 closed uint32 elemtype *_type // element type sendx uint // send index recvx uint // receive index recvq waitq // list of recv waiters sendq waitq // list of send waiters // lock protects all fields in hchan, as well as several // fields in sudogs blocked on this channel. // // Do not change another G's status while holding this lock // (in particular, do not ready a G), as this can deadlock // with stack shrinking. lock mutex } type waitq struct { first *sudog last *sudog } // sudog represents a g in a wait list, such as for sending/receiving // on a channel. // // sudog is necessary because the g ↔ synchronization object relation // is many-to-many. A g can be on many wait lists, so there may be // many sudogs for one g; and many gs may be waiting on the same // synchronization object, so there may be many sudogs for one object. // // sudogs are allocated from a special pool. Use acquireSudog and // releaseSudog to allocate and free them. type sudog struct { // The following fields are protected by the hchan.lock of the // channel this sudog is blocking on. shrinkstack depends on // this for sudogs involved in channel ops. g *g // isSelect indicates g is participating in a select, so // g.selectDone must be CAS'd to win the wake-up race. isSelect bool next *sudog prev *sudog elem unsafe.Pointer // data element (may point to stack) // The following fields are never accessed concurrently. // For channels, waitlink is only accessed by g. // For semaphores, all fields (including the ones above) // are only accessed when holding a semaRoot lock. acquiretime int64 releasetime int64 ticket uint32 parent *sudog // semaRoot binary tree waitlink *sudog // g.waiting list or semaRoot waittail *sudog // semaRoot c *hchan // channel }
make(chan int,3)
帶有設置緩存大小的參數,則會分配一段連續空間,buf 指向這段內存空間c.buf = add(unsafe.Pointer(c), hchanSize)
分配 hchanSize 大小的空間,其中常量 hchanSize hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
func makechan(t *chantype, size int) *hchan { elem := t.elem // compiler checks this but be safe. ··· var c *hchan // 返回的是指針 switch { case size == 0 || elem.size == 0: // Queue or element size is zero. c = (*hchan)(mallocgc(hchanSize, nil, true)) // Race detector uses this location for synchronization. c.buf = c.raceaddr() case elem.kind&kindNoPointers != 0: // Elements do not contain pointers. // Allocate hchan and buf in one call. c = (*hchan)(mallocgc(hchanSize+uintptr(size)*elem.size, nil, true)) c.buf = add(unsafe.Pointer(c), hchanSize) // <============ default: // Elements contain pointers. c = new(hchan) c.buf = mallocgc(uintptr(size)*elem.size, elem, true) // <============== } ··· return c }
咱們截取不一樣狀況的代碼段。緩存
若是 chanel 爲空,即沒有用 make 分配內存,那麼會調用 gopark 方法數據結構
if c == nil { if !block { return false } gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2) throw("unreachable") }
而 gopark 方法會將當前的 goroutine 休眠,而後回調經過參數傳來的 unlockf 方法。注意看回上面的代碼,調用 gopark 方法時傳遞的 unlockf 參數爲 nil,因此會一直休眠。app
// Puts the current goroutine into a waiting state and calls unlockf. // If unlockf returns false, the goroutine is resumed. // unlockf must not access this G's stack, as it may be moved between // the call to gopark and the call to unlockf. // Reason explains why the goroutine has been parked. // It is displayed in stack traces and heap dumps. // Reasons should be unique and descriptive. // Do not re-use reasons, add new ones. func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { if reason != waitReasonSleep { checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy } mp := acquirem() gp := mp.curg status := readgstatus(gp) if status != _Grunning && status != _Gscanrunning { throw("gopark: bad g status") } mp.waitlock = lock mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf)) gp.waitreason = reason mp.waittraceev = traceEv mp.waittraceskip = traceskip releasem(mp) // can't do anything that might move the G between Ms here. mcall(park_m) }
這時, Go 語言啓動的時候會有一個 goroutine sysmon 一直檢測系統的運行狀況,其中有一個方法 checkdead(),當檢測到全部 goroutine 都處於休眠,即死鎖,便拋出錯誤。ui
// /Go/src/runtime/proc.go func checkdead() { ... throw("all goroutines are asleep - deadlock!") }
直接引起 panic:this
lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) }
lock(&c.lock) ··· if sg := c.recvq.dequeue(); sg != nil { // Found a waiting receiver. We pass the value we want to send // directly to the receiver, bypassing the channel buffer (if any). send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true }
send 方法判斷到接收方 sudog 的 elem 字段存有對應的內存空間地址值的話,調用 sendDirect 方法。這裏囉嗦一下,sudog 的 elem 是在 func chanrecv
方法中賦值的,將接收方 goroutine 用來接收數據的棧空間地址賦值給 elem。spa
// send processes a send operation on an empty channel c. // The value ep sent by the sender is copied to the receiver sg. // The receiver is then woken up to go on its merry way. // Channel c must be empty and locked. send unlocks c with unlockf. // sg must already be dequeued from c. // ep must be non-nil and point to the heap or the caller's stack. func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) { ··· if sg.elem != nil { sendDirect(c.elemtype, sg, ep) sg.elem = nil } gp := sg.g unlockf() gp.param = unsafe.Pointer(sg) if sg.releasetime != 0 { sg.releasetime = cputicks() } goready(gp, skip+1) }
sendDirect 方法中 memmove 方法直接拷貝 t.size 個字節到目的內存空間。指針
func sendDirect(t *_type, sg *sudog, src unsafe.Pointer) { // src is on our stack, dst is a slot on another stack. // Once we read sg.elem out of sg, it will no longer // be updated if the destination's stack gets copied (shrunk). // So make sure that no preemption points can happen between read & use. dst := sg.elem typeBitsBulkBarrier(t, uintptr(dst), uintptr(src), t.size) // No need for cgo write barrier checks because dst is always // Go memory. memmove(dst, src, t.size) }
if c.qcount < c.dataqsiz { // Space is available in the channel buffer. Enqueue the element to send. qp := chanbuf(c, c.sendx) if raceenabled { raceacquire(qp) racerelease(qp) } typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true }
// Block on the channel. Some receiver will complete our operation for us. gp := getg() mysg := acquireSudog() mysg.releasetime = 0 if t0 != 0 { mysg.releasetime = -1 } // No stack splits between assigning elem and enqueuing mysg // on gp.waiting where copystack can find it. mysg.elem = ep mysg.waitlink = nil mysg.g = gp mysg.isSelect = false mysg.c = c gp.waiting = mysg gp.param = nil c.sendq.enqueue(mysg) goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
goparkunlock 方法將當前 goroutine 休眠,而且釋放鎖資源code
// Puts the current goroutine into a waiting state and unlocks the lock. // The goroutine can be made runnable again by calling goready(gp). func goparkunlock(lock *mutex, reason waitReason, traceEv byte, traceskip int) { gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip) }
參考文章
http://legendtkl.com/2017/08/...