在學習Go的過程當中,最讓人驚歎的莫過於goroutine了。可是goroutine是什麼,咱們用go
關鍵字就能夠建立一個goroutine,這麼多的goroutine之間,是如何調度的呢?html
在看Go源碼的過程當中,遍地可見g、p、m,咱們首先就看一下這些關鍵字的結構及相互之間的關係web
這裏咱們僅列出來告終構體裏面比較關鍵的一些成員bootstrap
goroutine是運行時的最小執行單元segmentfault
type g struct { // Stack parameters. // stack describes the actual stack memory: [stack.lo, stack.hi). // stackguard0 is the stack pointer compared in the Go stack growth prologue. // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption. // stackguard1 is the stack pointer compared in the C stack growth prologue. // It is stack.lo+StackGuard on g0 and gsignal stacks. // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash). // 當前g使用的棧空間,stack結構包括 [lo, hi]兩個成員 stack stack // offset known to runtime/cgo // 用於檢測是否須要進行棧擴張,go代碼使用 stackguard0 uintptr // offset known to liblink // 用於檢測是否須要進行棧擴展,原生代碼使用的 stackguard1 uintptr // offset known to liblink // 當前g所綁定的m m *m // current m; offset known to arm liblink // 當前g的調度數據,當goroutine切換時,保存當前g的上下文,用於恢復 sched gobuf // g當前的狀態 atomicstatus uint32 // 當前g的id goid int64 // 下一個g的地址,經過guintptr結構體的ptr set函數能夠設置和獲取下一個g,經過這個字段和sched.gfreeStack sched.gfreeNoStack 能夠把 free g串成一個鏈表 schedlink guintptr // 判斷g是否容許被搶佔 preempt bool // preemption signal, duplicates stackguard0 = stackpreempt // g是否要求要回到這個M執行, 有的時候g中斷了恢復會要求使用原來的M執行 lockedm muintptr }
P是M運行G所需的資源windows
type p struct { lock mutex id int32 // p的狀態,稍後介紹 status uint32 // one of pidle/prunning/... // 下一個p的地址,可參考 g.schedlink link puintptr // p所關聯的m m muintptr // back-link to associated m (nil if idle) // 內存分配的時候用的,p所屬的m的mcache用的也是這個 mcache *mcache // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen. // 從sched中獲取並緩存的id,避免每次分配goid都從sched分配 goidcache uint64 goidcacheend uint64 // Queue of runnable goroutines. Accessed without lock. // p 本地的runnbale的goroutine造成的隊列 runqhead uint32 runqtail uint32 runq [256]guintptr // runnext, if non-nil, is a runnable G that was ready'd by // the current G and should be run next instead of what's in // runq if there's time remaining in the running G's time // slice. It will inherit the time left in the current time // slice. If a set of goroutines is locked in a // communicate-and-wait pattern, this schedules that set as a // unit and eliminates the (potentially large) scheduling // latency that otherwise arises from adding the ready'd // goroutines to the end of the run queue. // 下一個執行的g,若是是nil,則從隊列中獲取下一個執行的g runnext guintptr // Available G's (status == Gdead) // 狀態爲 Gdead的g的列表,能夠進行復用 gfree *g gfreecnt int32 }
type m struct { // g0是用於調度和執行系統調用的特殊g g0 *g // goroutine with scheduling stack // m當前運行的g curg *g // current running goroutine // 當前擁有的p p puintptr // attached p for executing go code (nil if not executing go code) // 線程的 local storage tls [6]uintptr // thread-local storage // 喚醒m時,m會擁有這個p nextp puintptr id int64 // 若是 !="", 繼續運行curg preemptoff string // if != "", keep curg running on this m // 自旋狀態,用於判斷m是否工做已結束,並尋找g進行工做 spinning bool // m is out of work and is actively looking for work // 用於判斷m是否進行休眠狀態 blocked bool // m is blocked on a note // m休眠和喚醒經過這個,note裏面有一個成員key,對這個key所指向的地址進行值的修改,進而達到喚醒和休眠的目的 park note // 全部m組成的一個鏈表 alllink *m // on allm // 下一個m,經過這個字段和sched.midle 能夠串成一個m的空閒鏈表 schedlink muintptr // mcache,m擁有p的時候,會把本身的mcache給p mcache *mcache // lockedm的對應值 lockedg guintptr // 待釋放的m的list,經過sched.freem 串成一個鏈表 freelink *m // on sched.freem }
type schedt struct { // 全局的go id分配 goidgen uint64 // 記錄的最後一次從i/o中查詢g的時間 lastpoll uint64 lock mutex // When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be // sure to call checkdead(). // m的空閒鏈表,結合m.schedlink 就能夠組成一個空閒鏈表了 midle muintptr // idle m's waiting for work nmidle int32 // number of idle m's waiting for work nmidlelocked int32 // number of locked m's waiting for work // 下一個m的id,也用來記錄建立的m數量 mnext int64 // number of m's that have been created and next M ID // 最多容許的m的數量 maxmcount int32 // maximum number of m's allowed (or die) nmsys int32 // number of system m's not counted for deadlock // free掉的m的數量,exit的m的數量 nmfreed int64 // cumulative number of freed m's ngsys uint32 // number of system goroutines; updated atomically pidle puintptr // idle p's npidle uint32 nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go. // Global runnable queue. // 這個就是全局的g的隊列了,若是p的本地隊列沒有g或者太多,會跟全局隊列進行平衡 // 根據runqhead能夠獲取隊列頭的g,而後根據g.schedlink 獲取下一個,從而造成了一個鏈表 runqhead guintptr runqtail guintptr runqsize int32 // freem is the list of m's waiting to be freed when their // m.exited is set. Linked through m.freelink. // 等待釋放的m的列表 freem *m }
在這裏插一下狀態的解析數組
m的status沒有p、g的那麼明確,可是在運行流程的分析中,主要有如下幾個狀態緩存
在上面的結構中,存在不少的鏈表,g m p結構中還有指向對方地址的成員,那麼他們的關係究竟是什麼樣的數據結構
咱們能夠從上圖,簡單的表述一下 m p g的關係app
從下圖,能夠簡單的一窺go的整個調度流程的大概less
接下來咱們就從源碼的角度來具體的分析整個調度流程(本人彙編不照,彙編方面的就不分析了🤪)
go的啓動流程分爲4步
其中,schedinit 就是調度器的初始化,出去schedinit 中對內存分配,垃圾回收等操做,針對調度器的初始化大體就是初始化自身,設置最大的maxmcount, 肯定p的數量並初始化這些操做
schedinit這裏對當前m進行了初始化,並根據osinit獲取到的cpu核數和設置的GOMAXPROCS
肯定p的數量,並進行初始化
func schedinit() { // 從TLS或者專用寄存器獲取當前g的指針類型 _g_ := getg() // 設置m最大的數量 sched.maxmcount = 10000 // 初始化棧的複用空間 stackinit() // 初始化當前m mcommoninit(_g_.m) // osinit的時候會設置 ncpu這個全局變量,這裏就是根據cpu核心數和參數GOMAXPROCS來肯定p的數量 procs := ncpu if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 { procs = n } // 生成設定數量的p if procresize(procs) != nil { throw("unknown runnable goroutine during bootstrap") } }
func mcommoninit(mp *m) { _g_ := getg() lock(&sched.lock) // 判斷mnext的值是否溢出,mnext須要賦值給m.id if sched.mnext+1 < sched.mnext { throw("runtime: thread ID overflow") } mp.id = sched.mnext sched.mnext++ // 判斷m的數量是否比maxmcount設定的要多,若是超出直接報異常 checkmcount() // 建立一個新的g用於處理signal,並分配棧 mpreinit(mp) if mp.gsignal != nil { mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard } // Add to allm so garbage collector doesn't free g->m // when it is just in a register or thread-local storage. // 接下來的兩行,首先將當前m放到allm的頭,而後原子操做,將當前m的地址,賦值給m,這樣就將當前m添加到了allm鏈表的頭了 mp.alllink = allm // NumCgoCall() iterates over allm w/o schedlock, // so we need to publish it safely. atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp)) unlock(&sched.lock) // Allocate memory to hold a cgo traceback if the cgo call crashes. if iscgo || GOOS == "solaris" || GOOS == "windows" { mp.cgoCallers = new(cgoCallers) } }
在這裏就開始涉及到了m鏈表了,這個鏈表能夠以下圖表示,其餘的p g鏈表能夠參考,只是使用的結構體的字段不同
更改p的數量,多退少補的原則,在初始化過程當中,因爲最開始是沒有p的,因此這裏的做用就是初始化設定數量的p了
procesize
不只在初始化的時候會調用,當用戶手動調用 runtime.GOMAXPROCS
的時候,會從新設定 nprocs,而後執行 startTheWorld()
, startTheWorld()
會是使用新的 nprocs 再次調用procresize
這個方法
func procresize(nprocs int32) *p { old := gomaxprocs if old < 0 || nprocs <= 0 { throw("procresize: invalid arg") } // update statistics now := nanotime() if sched.procresizetime != 0 { sched.totaltime += int64(old) * (now - sched.procresizetime) } sched.procresizetime = now // Grow allp if necessary. // 若是新給的p的數量比原先的p的數量多,則新建增加的p if nprocs > int32(len(allp)) { // Synchronize with retake, which could be running // concurrently since it doesn't run on a P. lock(&allpLock) // 判斷allp 的cap是否知足增加後的長度,知足就直接使用,不知足,則須要擴張這個slice if nprocs <= int32(cap(allp)) { allp = allp[:nprocs] } else { nallp := make([]*p, nprocs) // Copy everything up to allp's cap so we // never lose old allocated Ps. copy(nallp, allp[:cap(allp)]) allp = nallp } unlock(&allpLock) } // initialize new P's // 初始化新增的p for i := int32(0); i < nprocs; i++ { pp := allp[i] if pp == nil { pp = new(p) pp.id = i pp.status = _Pgcstop pp.sudogcache = pp.sudogbuf[:0] for i := range pp.deferpool { pp.deferpool[i] = pp.deferpoolbuf[i][:0] } pp.wbBuf.reset() // allp是一個slice,直接將新增的p放到對應的索引下面就ok了 atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } if pp.mcache == nil { // 初始化時,old=0,第一個新建的p給當前的m使用 if old == 0 && i == 0 { if getg().m.mcache == nil { throw("missing mcache?") } pp.mcache = getg().m.mcache // bootstrap } else { // 爲p分配內存 pp.mcache = allocmcache() } } } // free unused P's // 釋放掉多餘的p,當新設置的p的數量,比原先設定的p的數量少的時候,會走到這個流程 // 經過 runtime.GOMAXPROCS 就能夠動態的修改nprocs for i := nprocs; i < old; i++ { p := allp[i] // move all runnable goroutines to the global queue // 把當前p的運行隊列裏的g轉移到全局的g的隊列 for p.runqhead != p.runqtail { // pop from tail of local queue p.runqtail-- gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr() // push onto head of global queue globrunqputhead(gp) } // 把runnext裏的g也轉移到全局隊列 if p.runnext != 0 { globrunqputhead(p.runnext.ptr()) p.runnext = 0 } // if there's a background worker, make it runnable and put // it on the global queue so it can clean itself up // 若是有gc worker的話,修改g的狀態,而後再把它放到全局隊列中 if gp := p.gcBgMarkWorker.ptr(); gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) globrunqput(gp) // This assignment doesn't race because the // world is stopped. p.gcBgMarkWorker.set(nil) } // sudoig的buf和cache,以及deferpool所有清空 for i := range p.sudogbuf { p.sudogbuf[i] = nil } p.sudogcache = p.sudogbuf[:0] for i := range p.deferpool { for j := range p.deferpoolbuf[i] { p.deferpoolbuf[i][j] = nil } p.deferpool[i] = p.deferpoolbuf[i][:0] } // 釋放掉當前p的mcache freemcache(p.mcache) p.mcache = nil // 把當前p的gfree轉移到全局 gfpurge(p) // 修改p的狀態,讓他自生自滅去了 p.status = _Pdead // can't free P itself because it can be referenced by an M in syscall } // Trim allp. if int32(len(allp)) != nprocs { lock(&allpLock) allp = allp[:nprocs] unlock(&allpLock) } // 判斷當前g是否有p,有的話更改當前使用的p的狀態,繼續使用 _g_ := getg() if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // continue to use the current P _g_.m.p.ptr().status = _Prunning } else { // release the current P and acquire allp[0] // 若是當前g有p,可是擁有的是已經釋放的p,則再也不使用這個p,從新分配 if _g_.m.p != 0 { _g_.m.p.ptr().m = 0 } // 分配allp[0]給當前g使用 _g_.m.p = 0 _g_.m.mcache = nil p := allp[0] p.m = 0 p.status = _Pidle // 將p m g綁定,並把m.mcache指向p.mcache,並修改p的狀態爲_Prunning acquirep(p) } var runnablePs *p for i := nprocs - 1; i >= 0; i-- { p := allp[i] if _g_.m.p.ptr() == p { continue } p.status = _Pidle // 根據 runqempty 來判斷當前p的g運行隊列是否爲空 if runqempty(p) { // g運行隊列爲空的p,放到 sched的pidle隊列裏面 pidleput(p) } else { // g 運行隊列不爲空的p,組成一個可運行隊列,並最後返回 p.m.set(mget()) p.link.set(runnablePs) runnablePs = p } } stealOrder.reset(uint32(nprocs)) var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32 atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs)) return runnablePs }
建立一個goroutine,只須要使用 go func
就能夠了,編譯器會將go func
翻譯成 newproc
進行調用,那麼新建的任務是如何調用的呢,咱們從建立開始進行跟蹤
newproc
函數獲取了參數和當前g的pc信息,並經過g0調用newproc1
去真正的執行建立或獲取可用的g
func newproc(siz int32, fn *funcval) { // 獲取第一參數地址 argp := add(unsafe.Pointer(&fn), sys.PtrSize) // 獲取當前執行的g gp := getg() // 獲取當前g的pc pc := getcallerpc() systemstack(func() { // 使用g0去執行newproc1函數 newproc1(fn, (*uint8)(argp), siz, gp, pc) }) }
newporc1 的做用就是建立或者獲取一個空間的g,初始化這個g,並嘗試尋找一個p和m去執行g
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) { _g_ := getg() if fn == nil { _g_.m.throwing = -1 // do not dump full stacks throw("go of nil func value") } // 加鎖禁止被搶佔 _g_.m.locks++ // disable preemption because it can be holding p in a local var siz := narg siz = (siz + 7) &^ 7 // We could allocate a larger initial stack if necessary. // Not worth it: this is almost always an error. // 4*sizeof(uintreg): extra space added below // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall). // 若是參數過多,則直接拋出異常,棧大小是2k if siz >= _StackMin-4*sys.RegSize-sys.RegSize { throw("newproc: function arguments too large for new goroutine") } _p_ := _g_.m.p.ptr() // 嘗試獲取一個空閒的g,若是獲取不到,則新建一個,並添加到allg裏面 // gfget首先會嘗試從p本地獲取空閒的g,若是本地沒有的話,則從全局獲取一堆平衡到本地p newg := gfget(_p_) if newg == nil { newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) // 新建的g,添加到全局的 allg裏面,allg是一個slice, append進去便可 allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. } // 判斷獲取的g的棧是否正常 if newg.stack.hi == 0 { throw("newproc1: newg missing stack") } // 判斷g的狀態是否正常 if readgstatus(newg) != _Gdead { throw("newproc1: new g is not Gdead") } // 預留一點空間,防止讀取超出一點點 totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame // 空間大小進行對齊 totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign sp := newg.stack.hi - totalSize spArg := sp // usesLr 爲0,這裏不執行 if usesLR { // caller's LR *(*uintptr)(unsafe.Pointer(sp)) = 0 prepGoExitFrame(sp) spArg += sys.MinFrameSize } if narg > 0 { // 將參數拷貝入棧 memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg)) // ... 省略 ... } // 初始化用於保存現場的區域及初始化基本狀態 memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp // 這裏保存了goexit的地址,在用戶函數執行完成後,會根據pc來執行goexit newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) // 這裏調整 sched 信息,pc = goexit的地址 gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn if _g_.m.curg != nil { newg.labels = _g_.m.curg.labels } if isSystemGoroutine(newg) { atomic.Xadd(&sched.ngsys, +1) } newg.gcscanvalid = false casgstatus(newg, _Gdead, _Grunnable) // 若是p緩存的goid已經用完,本地再從sched批量獲取一點 if _p_.goidcache == _p_.goidcacheend { // Sched.goidgen is the last allocated id, // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. // At startup sched.goidgen=0, so main goroutine receives goid=1. _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch) _p_.goidcache -= _GoidCacheBatch - 1 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch } // 分配goid newg.goid = int64(_p_.goidcache) _p_.goidcache++ // 把新的g放到 p 的可運行g隊列中 runqput(_p_, newg, true) // 判斷是否有空閒p,且是否須要喚醒一個m來執行g if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted { wakep() } _g_.m.locks-- if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack _g_.stackguard0 = stackPreempt } }
這個函數的邏輯比較簡單,就是看一下p有沒有空閒的g,沒有則去全局的freeg隊列查找,這裏就涉及了p本地和全局平衡的一個交互了
func gfget(_p_ *p) *g { retry: gp := _p_.gfree // 本地的g隊列爲空,且全局隊列不爲空,則從全局隊列一次獲取至多32個下來,若是全局隊列不夠就算了 if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) { lock(&sched.gflock) for _p_.gfreecnt < 32 { if sched.gfreeStack != nil { // Prefer Gs with stacks. gp = sched.gfreeStack sched.gfreeStack = gp.schedlink.ptr() } else if sched.gfreeNoStack != nil { gp = sched.gfreeNoStack sched.gfreeNoStack = gp.schedlink.ptr() } else { break } _p_.gfreecnt++ sched.ngfree-- gp.schedlink.set(_p_.gfree) _p_.gfree = gp } // 已經從全局拿了g了,再去從頭開始判斷 unlock(&sched.gflock) goto retry } // 若是拿到了g,則判斷g是否有棧,沒有棧就分配 // 棧的分配跟內存分配差很少,首先建立幾個固定大小的棧的數組,而後到指定大小的數組裏面去分配就ok了,過大則直接全局分配 if gp != nil { _p_.gfree = gp.schedlink.ptr() _p_.gfreecnt-- if gp.stack.lo == 0 { // Stack was deallocated in gfput. Allocate a new one. systemstack(func() { gp.stack = stackalloc(_FixedStack) }) gp.stackguard0 = gp.stack.lo + _StackGuard } else { // ... 省略 ... } } // 注意: 若是全局沒有g,p也沒有g,則返回的gp仍是nil return gp }
runqput會把g放到p的本地隊列或者p.runnext,若是p的本地隊列過長,則把g到全局隊列,同時平衡p本地隊列的一半到全局
func runqput(_p_ *p, gp *g, next bool) { if randomizeScheduler && next && fastrand()%2 == 0 { next = false } // 若是next爲true,則放入到p.runnext裏面,並把原先runnext的g交換出來 if next { retryNext: oldnext := _p_.runnext if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) { goto retryNext } if oldnext == 0 { return } // Kick the old runnext out to the regular run queue. gp = oldnext.ptr() } retry: h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers t := _p_.runqtail // 判斷p的隊列的長度是否超了, runq是一個長度爲256的數組,超出的話就會放到全局隊列了 if t-h < uint32(len(_p_.runq)) { _p_.runq[t%uint32(len(_p_.runq))].set(gp) atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption return } // 把g放到全局隊列 if runqputslow(_p_, gp, h, t) { return } // the queue is not full, now the put above must succeed goto retry }
func runqputslow(_p_ *p, gp *g, h, t uint32) bool { var batch [len(_p_.runq)/2 + 1]*g // First, grab a batch from local queue. n := t - h n = n / 2 if n != uint32(len(_p_.runq)/2) { throw("runqputslow: queue is not full") } // 獲取p後面的一半 for i := uint32(0); i < n; i++ { batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr() } if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume return false } batch[n] = gp // Link the goroutines. for i := uint32(0); i < n; i++ { batch[i].schedlink.set(batch[i+1]) } // Now put the batch on global queue. // 放到全局隊列隊尾 lock(&sched.lock) globrunqputbatch(batch[0], batch[n], int32(n+1)) unlock(&sched.lock) return true }
新建任務至此基本結束,建立完成任務後,等待調度執行就行了,從上面能夠看出,任務的優先級是 p.runnext > p.runq > sched.runq
g從建立到執行結束並放入free隊列中的狀態轉換大體以下圖所示
當 newproc1建立完任務後,會嘗試喚醒m來執行任務
func wakep() { // be conservative about spinning threads // 一次應該只有一個m在spining,不然就退出 if !atomic.Cas(&sched.nmspinning, 0, 1) { return } // 調用startm來執行 startm(nil, true) }
調度m或者建立m來運行p,若是p==nil,就會嘗試獲取一個空閒p,p的隊列中有g,拿到p後才能拿到g
func startm(_p_ *p, spinning bool) { lock(&sched.lock) if _p_ == nil { // 若是沒有指定p, 則從sched.pidle獲取空閒的p _p_ = pidleget() if _p_ == nil { unlock(&sched.lock) // 若是沒有獲取到p,重置nmspinning if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return } } // 首先嚐試從 sched.midle獲取一個空閒的m mp := mget() unlock(&sched.lock) if mp == nil { // 若是獲取不到空閒的m,則建立一個 mspining = true的m,並將p綁定到m上,直接返回 var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_) return } // 判斷獲取到的空閒m是不是spining狀態 if mp.spinning { throw("startm: m is spinning") } // 判斷獲取到的m是否有p if mp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. // 調用函數的父函數已經增長了nmspinning, 這裏只須要設置m.spining就ok了,同時把p綁上來 mp.spinning = spinning mp.nextp.set(_p_) // 喚醒m notewakeup(&mp.park) }
newm 經過allocm函數來建立新m
func newm(fn func(), _p_ *p) { // 新建一個m mp := allocm(_p_, fn) // 爲這個新建的m綁定指定的p mp.nextp.set(_p_) // ... 省略 ... // 建立系統線程 newm1(mp) }
func newm1(mp *m) { // runtime cgo包會把iscgo設置爲true,這裏不分析 if iscgo { var ts cgothreadstart if _cgo_thread_start == nil { throw("_cgo_thread_start missing") } ts.g.set(mp.g0) ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0])) ts.fn = unsafe.Pointer(funcPC(mstart)) if msanenabled { msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts)) } execLock.rlock() // Prevent process clone. asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts)) execLock.runlock() return } execLock.rlock() // Prevent process clone. newosproc(mp) execLock.runlock() }
newosproc 建立一個新的系統線程,並執行mstart_stub函數,以後調用mstart
函數進入調度,後面在執行流程會分析
func newosproc(mp *m) { stk := unsafe.Pointer(mp.g0.stack.hi) // Initialize an attribute object. var attr pthreadattr var err int32 err = pthread_attr_init(&attr) // Finally, create the thread. It starts at mstart_stub, which does some low-level // setup and then calls mstart. var oset sigset sigprocmask(_SIG_SETMASK, &sigset_all, &oset) // 建立線程,並傳入啓動啓動函數 mstart_stub, mstart_stub 以後調用mstart err = pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp)) sigprocmask(_SIG_SETMASK, &oset, nil) if err != 0 { write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate))) exit(1) } }
allocm這裏首先會釋放 sched的freem,而後再去建立m,並初始化m
func allocm(_p_ *p, fn func()) *m { _g_ := getg() _g_.m.locks++ // disable GC because it can be called from sysmon if _g_.m.p == 0 { acquirep(_p_) // temporarily borrow p for mallocs in this function } // Release the free M list. We need to do this somewhere and // this may free up a stack we can use. // 首先釋放掉freem列表 if sched.freem != nil { lock(&sched.lock) var newList *m for freem := sched.freem; freem != nil; { if freem.freeWait != 0 { next := freem.freelink freem.freelink = newList newList = freem freem = next continue } stackfree(freem.g0.stack) freem = freem.freelink } sched.freem = newList unlock(&sched.lock) } mp := new(m) // 啓動函數,根據startm調用來看,這個fn就是 mspinning, 會將m.mspinning設置爲true mp.mstartfn = fn // 初始化m,上面已經分析了 mcommoninit(mp) // In case of cgo or Solaris or Darwin, pthread_create will make us a stack. // Windows and Plan 9 will layout sched stack on OS stack. // 爲新的m建立g0 if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" { mp.g0 = malg(-1) } else { mp.g0 = malg(8192 * sys.StackGuardMultiplier) } // 爲mp的g0綁定本身 mp.g0.m = mp // 若是當前的m所綁定的是參數傳遞過來的p,解除綁定,由於參數傳遞過來的p稍後要綁定新建的m if _p_ == _g_.m.p.ptr() { releasep() } _g_.m.locks-- if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack _g_.stackguard0 = stackPreempt } return mp }
func notewakeup(n *note) { var v uintptr // 設置m 爲locked for { v = atomic.Loaduintptr(&n.key) if atomic.Casuintptr(&n.key, v, locked) { break } } // Successfully set waitm to locked. // What was it before? // 根據m的原先的狀態,來判斷後面的執行流程,0則直接返回,locked則衝突,不然認爲是wating,喚醒 switch { case v == 0: // Nothing was waiting. Done. case v == locked: // Two notewakeups! Not allowed. throw("notewakeup - double wakeup") default: // Must be the waiting m. Wake it up. // 喚醒系統線程 semawakeup((*m)(unsafe.Pointer(v))) } }
至此的話,建立完任務g後,將g放入了p的local隊列或者是全局隊列,而後開始獲取了一個空閒的m或者新建一個m來執行g,m, p, g 都已經準備完成了,下面就是開始調度,來運行任務g了
在startm函數分析的過程當中會,能夠看到,有兩種獲取m的方式
m執行g有兩個起點,一個是線程啓動函數 mstart
, 另外一個則是休眠被喚醒後的調度schedule
了,咱們從頭開始,也就是mstart
, mstart
走到最後也是 schedule
調度
func mstart() { _g_ := getg() osStack := _g_.stack.lo == 0 if osStack { // Initialize stack bounds from system stack. // Cgo may have left stack size in stack.hi. // minit may update the stack bounds. // 從系統堆棧上直接劃出所需的範圍 size := _g_.stack.hi if size == 0 { size = 8192 * sys.StackGuardMultiplier } _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size))) _g_.stack.lo = _g_.stack.hi - size + 1024 } // Initialize stack guards so that we can start calling // both Go and C functions with stack growth prologues. _g_.stackguard0 = _g_.stack.lo + _StackGuard _g_.stackguard1 = _g_.stackguard0 // 調用mstart1來處理 mstart1() // Exit this thread. if GOOS == "windows" || GOOS == "solaris" || GOOS == "plan9" || GOOS == "darwin" { // Window, Solaris, Darwin and Plan 9 always system-allocate // the stack, but put it in _g_.stack before mstart, // so the logic above hasn't set osStack yet. osStack = true } // 退出m,正常狀況下mstart1調用schedule() 時,是再也不返回的,因此,不用擔憂系統線程的頻繁建立退出 mexit(osStack) }
func mstart1() { _g_ := getg() if _g_ != _g_.m.g0 { throw("bad runtime·mstart") } // Record the caller for use as the top of stack in mcall and // for terminating the thread. // We're never coming back to mstart1 after we call schedule, // so other calls can reuse the current frame. // 保存調用者的pc sp等信息 save(getcallerpc(), getcallersp()) asminit() // 初始化m的sigal的棧和mask minit() // Install signal handlers; after minit so that minit can // prepare the thread to be able to handle the signals. // 安裝sigal處理器 if _g_.m == &m0 { mstartm0() } // 若是設置了mstartfn,就先執行這個 if fn := _g_.m.mstartfn; fn != nil { fn() } if _g_.m.helpgc != 0 { _g_.m.helpgc = 0 stopm() } else if _g_.m != &m0 { // 獲取nextp acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } schedule() }
acquirep 函數主要是改變p的狀態,綁定 m p,經過吧p的mcache與m共享
func acquirep(_p_ *p) { // Do the part that isn't allowed to have write barriers. acquirep1(_p_) // have p; write barriers now allowed _g_ := getg() // 把p的mcache與m共享 _g_.m.mcache = _p_.mcache }
func acquirep1(_p_ *p) { _g_ := getg() // 讓m p互相綁定 _g_.m.p.set(_p_) _p_.m.set(_g_.m) _p_.status = _Prunning }
開始進入到調度函數了,這是一個由schedule、execute、goroutine fn、goexit構成的邏輯循環,就算m是喚醒後,也是從設置的斷點開始執行
func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } // 若是有lockg,中止執行當前的m if _g_.m.lockedg != 0 { // 解除lockedm的鎖定,並執行當前g stoplockedm() execute(_g_.m.lockedg.ptr(), false) // Never returns. } // We should not schedule away from a g that is executing a cgo call, // since the cgo call is using the m's g0 stack. if _g_.m.incgo { throw("schedule: in cgo") } top: // gc 等待 if sched.gcwaiting != 0 { gcstopm() goto top } var gp *g var inheritTime bool if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. // 爲了保證公平,每隔61次,從全局隊列上獲取g if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { // 全局隊列上獲取不到待運行的g,則從p local隊列中獲取 gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } if gp == nil { // 若是p local獲取不到待運行g,則開始查找,這個函數會從 全局 io poll, p locl和其餘p local獲取待運行的g,後面詳細分析 gp, inheritTime = findrunnable() // blocks until work is available } // This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { // 若是m是自旋狀態,取消自旋 resetspinning() } if gp.lockedm != 0 { // Hands off own p to the locked m, // then blocks waiting for a new p. // 若是g有lockedm,則休眠上交p,休眠m,等待新的m,喚醒後從這裏開始執行,跳轉到top startlockedm(gp) goto top } // 開始執行這個g execute(gp, inheritTime) }
由於當前的m綁定了lockedg,而當前g不是指定的lockedg,因此這個m不能執行,上交當前m綁定的p,而且休眠m直到調度lockedg
func stoplockedm() { _g_ := getg() if _g_.m.lockedg == 0 || _g_.m.lockedg.ptr().lockedm.ptr() != _g_.m { throw("stoplockedm: inconsistent locking") } if _g_.m.p != 0 { // Schedule another M to run this p. // 釋放當前p _p_ := releasep() handoffp(_p_) } incidlelocked(1) // Wait until another thread schedules lockedg again. notesleep(&_g_.m.park) noteclear(&_g_.m.park) status := readgstatus(_g_.m.lockedg.ptr()) if status&^_Gscan != _Grunnable { print("runtime:stoplockedm: g is not Grunnable or Gscanrunnable\n") dumpgstatus(_g_) throw("stoplockedm: not runnable") } // 上交了當前的p,將nextp設置爲可執行的p acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 }
調度 lockedm去運行lockedg
func startlockedm(gp *g) { _g_ := getg() mp := gp.lockedm.ptr() if mp == _g_.m { throw("startlockedm: locked to me") } if mp.nextp != 0 { throw("startlockedm: m has p") } // directly handoff current P to the locked m incidlelocked(-1) // 移交當前p給lockedm,並設置爲lockedm.nextp,以便於lockedm喚醒後,能夠獲取 _p_ := releasep() mp.nextp.set(_p_) // m被喚醒後,從m休眠的地方開始執行,也就是schedule()函數中 notewakeup(&mp.park) stopm() }
func handoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_. // if it has local work, start it straight away if !runqempty(_p_) || sched.runqsize != 0 { // 調用startm開始調度 startm(_p_, false) return } // no local work, check that there are no spinning/idle M's, // otherwise our help is not required // 判斷有沒有正在尋找p的m以及有沒有空閒的p if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } lock(&sched.lock) if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } // 若是 全局待運行g隊列不爲空,嘗試使用startm進行調度 if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } // 把p放入到全局的空閒隊列,放回隊列就很少說了,參考allm的放回 pidleput(_p_) unlock(&sched.lock) }
開始執行g的代碼了
func execute(gp *g, inheritTime bool) { _g_ := getg() // 更改g的狀態,並不容許搶佔 casgstatus(gp, _Grunnable, _Grunning) gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard if !inheritTime { // 調度計數 _g_.m.p.ptr().schedtick++ } _g_.m.curg = gp gp.m = _g_.m // 開始執行g的代碼了 gogo(&gp.sched) }
gogo函數承載的做用就是切換到g的棧,開始執行g的代碼,彙編內容就不分析了,可是有一個疑問就是,gogo執行完函數後,怎麼再次進入調度呢?
咱們回到newproc1
函數的L63 newg.sched.pc = funcPC(goexit) + sys.PCQuantum
,這裏保存了pc的質地爲goexit的地址,因此當執行完用戶代碼後,就會進入 goexit
函數
goexit 在彙編層面就是調用 runtime.goexit1
,而goexit1經過 mcall 調用了goexit0
因此這裏直接分析了goexit0
goexit0
重置g的狀態,並從新進行調度,這樣就調度就又回到了schedule()
了,開始循環往復的調度
func goexit0(gp *g) { _g_ := getg() // 轉換g的狀態爲dead,以放回空閒列表 casgstatus(gp, _Grunning, _Gdead) if isSystemGoroutine(gp) { atomic.Xadd(&sched.ngsys, -1) } // 清空g的狀態 gp.m = nil locked := gp.lockedm != 0 gp.lockedm = 0 _g_.m.lockedg = 0 gp.paniconfault = false gp._defer = nil // should be true already but just in case. gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data. gp.writebuf = nil gp.waitreason = 0 gp.param = nil gp.labels = nil gp.timer = nil // Note that gp's stack scan is now "valid" because it has no // stack. gp.gcscanvalid = true dropg() // 把g放回空閒列表,以備複用 gfput(_g_.m.p.ptr(), gp) // 再次進入調度循環 schedule() }
至此,單次調度結束,再次進入調度,循環往復
func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() // The conditions here and in handoffp must agree: if // findrunnable would return a G to run, handoffp must start // an M. top: _p_ := _g_.m.p.ptr() // local runq // 從p local 去獲取g if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq // 從全局的待運行d隊列獲取 if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // Poll network. // This netpoll is only an optimization before we resort to stealing. // We can safely skip it if there are no waiters or a thread is blocked // in netpoll already. If there is any kind of logical race with that // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway. // 看看netpoll中有沒有已經準備好的g if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if gp := netpoll(false); gp != nil { // non-blocking // netpoll returns list of goroutines linked by schedlink. injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // Steal work from other P's. // 若是sched.pidle == procs - 1,說明全部的p都是空閒的,無需遍歷其餘p了 procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { // Either GOMAXPROCS=1 or everybody, except for us, is idle already. // New work can appear from returning syscall/cgocall, network or timers. // Neither of that submits to local run queues, so no point in stealing. goto stop } // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. // 若是尋找p的m的數量,大於有g的p的數量的通常,就再也不去尋找了 if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } // 設置當前m的自旋狀態 if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // 開始竊取其餘p的待運行g了 for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g // 從其餘的p偷取通常的任務數量,還會隨機偷取p的runnext(過度了),偷取部分就不分析了,就是slice的操做而已 if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: // 對all作個鏡像備份 allpSnapshot := allp // return P and block lock(&sched.lock) if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock) wasSpinning := _g_.m.spinning if _g_.m.spinning { // 設置非自旋狀態,由於找p的工做已經結束了 _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again for _, _p_ := range allpSnapshot { if !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } // poll network if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } gp := netpoll(true) // block until new work is available atomic.Store64(&sched.lastpoll, uint64(nanotime())) if gp != nil { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } injectglist(gp) } } stopm() goto top }
這裏真的是無奈啊,爲了尋找一個可運行的g,也是煞費苦心,及時進入了stop 的label,仍是不死心,又來了一邊尋找。大體尋找過程能夠總結爲一下幾個:
stop會把當前m放到空閒列表裏面,同時綁定m.nextp 與 m
func stopm() { _g_ := getg() retry: lock(&sched.lock) // 把當前m放到sched.midle 的空閒列表裏 mput(_g_.m) unlock(&sched.lock) // 休眠,等待被喚醒 notesleep(&_g_.m.park) noteclear(&_g_.m.park) // 綁定p acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 }
go的監控是依靠函數 sysmon 來完成的,監控主要作一下幾件事
監控線程並非時刻在運行的,監控線程首次休眠20us,每次執行完後,增長一倍的休眠時間,可是最多休眠10ms
func sysmon() { lock(&sched.lock) sched.nmsys++ checkdead() unlock(&sched.lock) // If a heap span goes unused for 5 minutes after a garbage collection, // we hand it back to the operating system. scavengelimit := int64(5 * 60 * 1e9) if debug.scavenge > 0 { // Scavenge-a-lot for testing. forcegcperiod = 10 * 1e6 scavengelimit = 20 * 1e6 } lastscavenge := nanotime() nscavenge := 0 lasttrace := int64(0) idle := 0 // how many cycles in succession we had not wokeup somebody delay := uint32(0) for { // 判斷當前循環,應該休眠的時間 if idle == 0 { // start with 20us sleep... delay = 20 } else if idle > 50 { // start doubling the sleep after 1ms... delay *= 2 } if delay > 10*1000 { // up to 10ms delay = 10 * 1000 } usleep(delay) // STW時休眠sysmon if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { lock(&sched.lock) if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { atomic.Store(&sched.sysmonwait, 1) unlock(&sched.lock) // Make wake-up period small enough // for the sampling to be correct. maxsleep := forcegcperiod / 2 if scavengelimit < forcegcperiod { maxsleep = scavengelimit / 2 } shouldRelax := true if osRelaxMinNS > 0 { next := timeSleepUntil() now := nanotime() if next-now < osRelaxMinNS { shouldRelax = false } } if shouldRelax { osRelax(true) } // 進行休眠 notetsleep(&sched.sysmonnote, maxsleep) if shouldRelax { osRelax(false) } lock(&sched.lock) // 喚醒後,清除休眠狀態,繼續執行 atomic.Store(&sched.sysmonwait, 0) noteclear(&sched.sysmonnote) idle = 0 delay = 20 } unlock(&sched.lock) } // trigger libc interceptors if needed if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) now := nanotime() // 若是netpoll不爲空,每隔10ms檢查一下是否有ok的 if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) // 返回了已經獲取到結果的goroutine的列表 gp := netpoll(false) // non-blocking - returns list of goroutines if gp != nil { incidlelocked(-1) // 把獲取到的g的列表加入到全局待運行隊列中 injectglist(gp) incidlelocked(1) } } // retake P's blocked in syscalls // and preempt long running G's // 搶奪syscall長時間阻塞的p和長時間運行的g if retake(now) != 0 { idle = 0 } else { idle++ } // check if we need to force a GC // 經過gcTrigger.test() 函數判斷是否超過設定的強制觸發gc的時間間隔, if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 { lock(&forcegc.lock) forcegc.idle = 0 forcegc.g.schedlink = 0 // 把gc的g加入待運行隊列,等待調度運行 injectglist(forcegc.g) unlock(&forcegc.lock) } // scavenge heap once in a while // 判斷是否有5分鐘未使用的span,有的話,歸還給系統 if lastscavenge+scavengelimit/2 < now { mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit)) lastscavenge = now nscavenge++ } if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now { lasttrace = now schedtrace(debug.scheddetail > 0) } } }
掃描netpoll,並把g存放到去全局隊列比較好理解,跟前面添加p和m的邏輯差很少,可是搶佔這裏就不是很理解了,你說搶佔就搶佔,被搶佔的g豈不是很沒面子,並且怎麼搶佔呢?
const forcePreemptNS = 10 * 1000 * 1000 // 10ms func retake(now int64) uint32 { n := 0 // Prevent allp slice changes. This lock will be completely // uncontended unless we're already stopping the world. lock(&allpLock) // We can't use a range loop over allp because we may // temporarily drop the allpLock. Hence, we need to re-fetch // allp each time around the loop. for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick s := _p_.status if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). // pd.syscalltick 即 _p_.sysmontick.syscalltick 只有在sysmon的時候會更新,而 _p_.syscalltick 則會每次都更新,因此,當syscall以後,第一個sysmon檢測到的時候並不會搶佔,而是第二次開始纔會搶佔,中間間隔至少有20us,最多會有10ms t := int64(_p_.syscalltick) if int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. // 是否有空p,有尋找p的m,以及當前的p在syscall以後,有沒有超過10ms if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) // 搶佔p,把p的狀態轉爲idle狀態 if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ // 把當前p移交出去,上面已經分析過了 handoffp(_p_) } incidlelocked(1) lock(&allpLock) } else if s == _Prunning { // Preempt G if it's running for too long. // 若是p是running狀態,若是p下面的g執行過久了,則搶佔 t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now continue } // 判斷是否超出10ms, 不超過不搶佔 if pd.schedwhen+forcePreemptNS > now { continue } // 開始搶佔 preemptone(_p_) } } unlock(&allpLock) return uint32(n) }
這個函數的註釋,做者就代表這種搶佔並非很靠譜😂,咱們先看一下實現吧
func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } // 標識搶佔字段 gp.preempt = true // Every call in a go routine checks for stack overflow by // comparing the current stack pointer to gp->stackguard0. // Setting gp->stackguard0 to StackPreempt folds // preemption into the normal stack overflow check. // 更新stackguard0,保證能檢測到棧溢 gp.stackguard0 = stackPreempt return true }
在這裏,做者會更新 gp.stackguard0 = stackPreempt
,而後讓g誤覺得棧不夠用了,那就只有乖乖的去進行棧擴張,站擴張的話就用調用newstack
分配一個新棧,而後把原先的棧的內容拷貝過去,而在 newstack
裏面有一段以下
if preempt { if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning { // Let the goroutine keep running for now. // gp->preempt is set, so it will be preempted next time. gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // never return } }
而後這裏就發現g被搶佔了,那你棧不夠用就有多是假的,可是管你呢,你再去調度去吧,也不給你擴棧了,雖然做者和雨痕大神都吐槽了一下這個,可是這種搶佔方式自動1.5(也可能更早)就一直存在,且穩定運行,就說明仍是很牛逼的了
在調度器的設置上,最明顯的就是複用:g 的free鏈表, m的free列表, p的free列表,這樣就避免了重複建立銷燬鎖浪費的資源
其次就是多級緩存: 這一塊跟內存上的設計思想也是一直的,p一直有一個 g 的待運行隊列,本身沒有貨過多的時候,纔會平衡到全局隊列,全局隊列操做須要鎖,則本地操做則不須要,大大減小了鎖的建立銷燬所消耗的資源
至此,g m p的關係及狀態轉換大體都講解完成了,因爲對彙編這塊比較薄弱,因此基本略過了,右面有機會仍是須要多瞭解一點