做者:孫偉html
協程和線程綁定關係有如下3種:git
由於線程切換須要很大的上下文,這種切換消耗了大量CPU時間,因此Go的並行單元並非傳統意義上的線程,而是採用更輕量的協程(goroutine)來處理,大大提升了並行度,所以Go被稱爲「最並行的語言」。github
既然每一個Goroutine都有本身的棧,那麼在建立Goroutine時,就要同時建立對應的棧。Goroutine在執行時,棧空間會不停增加。棧一般是連續增加的,因爲每一個進程中的各個線程共享虛擬內存空間,當有多個線程時,就須要爲每一個線程分配不一樣起始地址的棧。這就須要在分配棧以前先預估每一個線程棧的大小。若是線程數量很是多,就很容易棧溢出。golang
爲了解決這個問題,就有了Split Stacks 技術:建立棧時,只分配一塊比較小的內存,若是進行某次函數調用致使棧空間不足時,就會在其餘地方分配一塊新的棧空間。新的空間不須要和老的棧空間連續。函數調用的參數會拷貝到新的棧空間中,接下來的函數執行都在新棧空間中進行。Golang的棧管理方式與此相似,可是爲了更高的效率,使用了連續棧( Golang連續棧) 實現方式也是先分配一塊固定大小的棧,在棧空間不足時,分配一塊更大的棧,並把舊的棧所有拷貝到新棧中。這樣避免了Split Stacks方法可能致使的頻繁內存分配和釋放。 編程
Goroutine的執行是能夠被搶佔的。若是一個Goroutine一直佔用CPU,長時間沒有被調度過,就會被runtime搶佔掉,把CPU時間交給其餘Goroutine。 這個能夠經過 debug/goroutine 阻塞實現。api
P必須綁定在M上才能運行,M必須綁定了P才能運行,而通常狀況下,最多有MAXPROCS(一般等於CPU數量)個P,可是可能有不少個M,真正運行的只有綁定了M的P,因此P是真正的並行單元。 緩存
每一個P有一個本身的runnableG隊列,能夠從裏面拿出一個G來運行,同時也有一個全局的runnable G隊列,G經過P依附在M上面執行。不單獨使用全局的runnable G隊列的緣由是,分佈式的隊列有利於減少臨界區大小,想想多個線程同時請求可用的G的時候,若是隻有全局的資源,那麼這個全局的鎖會致使多少線程一直在等待。 數據結構
可是若是一個正在執行的G進入了阻塞,典型的例子就是等待IO,那麼他和它所在的M會在那邊等待,而上下文P會傳遞到其餘可用的M上面,這樣這個阻塞就不會影響程序的並行度。多線程
type g struct { // Stack parameters. // stack describes the actual stack memory: [stack.lo, stack.hi). // stackguard0 is the stack pointer compared in the Go stack growth prologue. // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption. // stackguard1 is the stack pointer compared in the C stack growth prologue. // It is stack.lo+StackGuard on g0 and gsignal stacks. // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash). stack stack // offset known to runtime/cgo //描述了真實的棧內存,包括上下界、 stackguard0 uintptr // offset known to liblink stackguard1 uintptr // offset known to liblink _panic *_panic // innermost panic - offset known to liblink _defer *_defer // innermost defer m *m // current m; offset known to arm liblink //當前的M sched gobuf //goroutine切換時,用於保存g的上下文 syscallsp uintptr // if status==Gsyscall, syscallsp = sched.sp to use during gc syscallpc uintptr // if status==Gsyscall, syscallpc = sched.pc to use during gc stktopsp uintptr // expected sp at top of stack, to check in traceback param unsafe.Pointer // passed parameter on wakeup 用於傳遞參數,睡眠時 其餘goroutine能夠設置param,喚醒時該goroutine能夠獲取 atomicstatus uint32 stackLock uint32 // sigprof/scang lock; TODO: fold in to atomicstatus goid int64 //goroutine 的ID waitsince int64 // approx time when the g become blocked g被阻塞的 大概時間 waitreason string // if status==Gwaiting schedlink guintptr preempt bool // preemption signal, duplicates stackguard0 = stackpreempt paniconfault bool // panic (instead of crash) on unexpected fault address preemptscan bool // preempted g does scan for gc gcscandone bool // g has scanned stack; protected by _Gscan bit in status gcscanvalid bool // false at start of gc cycle, true if G has not run since last scan; TODO: remove? throwsplit bool // must not split stack raceignore int8 // ignore race detection events sysblocktraced bool // StartTrace has emitted EvGoInSyscall about this goroutine sysexitticks int64 // cputicks when syscall has returned (for tracing) traceseq uint64 // trace event sequencer tracelastp puintptr // last P emitted an event for this goroutine lockedm muintptr //G被鎖定只能在這個M運行 sig uint32 writebuf []byte sigcode0 uintptr sigcode1 uintptr sigpc uintptr gopc uintptr // pc of go statement that created this goroutine startpc uintptr // pc of goroutine function racectx uintptr waiting *sudog // sudog structures this g is waiting on (that have a valid elem ptr); in lock order cgoCtxt []uintptr // cgo traceback context labels unsafe.Pointer // profiler labels timer *timer // cached timer for time.Sleep selectDone uint32 // are we participating in a select and did someone win the race? // Per-G GC state // gcAssistBytes is this G's GC assist credit in terms of // bytes allocated. If this is positive, then the G has credit // to allocate gcAssistBytes bytes without assisting. If this // is negative, then the G must correct this by performing // scan work. We track this in bytes to make it fast to update // and check for debt in the malloc hot path. The assist ratio // determines how this corresponds to scan work debt. gcAssistBytes int64 }
type gobuf struct { sp uintptr pc uintptr g guintptr ctxt unsafe.Pointer ret sys.Uintreg lr uintptr bp uintptr // for GOEXPERIMENT=framepointer }
其中最主要的固然是sched了,保存了goroutine的上下文。goroutine切換的時候不一樣於線程有OS來負責這部分數據,而是由一個gobuf對象來保存,這樣可以更加輕量級,再來看看gobuf的結構併發
type m struct { g0 *g // 帶有調度棧的goroutine gsignal *g // 處理信號的goroutine tls [6]uintptr // thread-local storage mstartfn func() curg *g // 當前運行的goroutine caughtsig guintptr p puintptr // 關聯p和執行的go代碼 nextp puintptr id int32 mallocing int32 // 狀態 spinning bool // m是否out of work blocked bool // m是否被阻塞 inwb bool // m是否在執行寫屏蔽 printlock int8 incgo bool // m在執行cgo嗎 fastrand uint32 ncgocall uint64 // cgo調用的總數 ncgo int32 // 當前cgo調用的數目 park note alllink *m // 用於連接allm schedlink muintptr mcache *mcache // 當前m的內存緩存 lockedg *g // 鎖定g在當前m上執行,而不會切換到其餘m createstack [32]uintptr // thread建立的棧 }
結構體M中有兩個G是須要關注一下的:
type p struct { lock mutex id int32 status uint32 // 狀態,能夠爲pidle/prunning/... link puintptr schedtick uint32 // 每調度一次加1 syscalltick uint32 // 每一次系統調用加1 sysmontick sysmontick m muintptr // 回鏈到關聯的m mcache *mcache racectx uintptr goidcache uint64 // goroutine的ID的緩存 goidcacheend uint64 // 可運行的goroutine的隊列 runqhead uint32 runqtail uint32 runq [256]guintptr runnext guintptr // 下一個運行的g sudogcache []*sudog sudogbuf [128]*sudog palloc persistentAlloc // per-P to avoid mutex pad [sys.CacheLineSize]byte }
其中P的狀態有Pidle, Prunning, Psyscall, Pgcstop, Pdead;在其內部隊列runqhead裏面有可運行的goroutine,P優先從內部獲取執行的g,這樣可以提升效率。
type schedt struct { goidgen uint64 lastpoll uint64 lock mutex midle muintptr // idle狀態的m nmidle int32 // idle狀態的m個數 nmidlelocked int32 // lockde狀態的m個數 mcount int32 // 建立的m的總數 maxmcount int32 // m容許的最大個數 ngsys uint32 // 系統中goroutine的數目,會自動更新 pidle puintptr // idle的p npidle uint32 nmspinning uint32 // 全局的可運行的g隊列 runqhead guintptr runqtail guintptr runqsize int32 // dead的G的全局緩存 gflock mutex gfreeStack *g gfreeNoStack *g ngfree int32 // sudog的緩存中心 sudoglock mutex sudogcache *sudog }
大多數須要的信息都已放在告終構體M、G和P中,schedt結構體只是一個殼。能夠看到,其中有M的idle隊列,P的idle隊列,以及一個全局的就緒的G隊列。schedt結構體中的Lock是很是必須的,若是M或P等作一些非局部的操做,它們通常須要先鎖住調度器。
goroutine調度器的代碼在/src/runtime/proc.go中,一些比較關鍵的函數分析以下。
schedule函數在runtime須要進行調度時執行,爲當前的P尋找一個能夠運行的G並執行它,尋找順序以下:
代碼以下:
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() { _g_ := getg() if _g_.m.locks != 0 { throw("schedule: holding locks") } if _g_.m.lockedg != 0 { stoplockedm() execute(_g_.m.lockedg.ptr(), false) // Never returns. } // We should not schedule away from a g that is executing a cgo call, // since the cgo call is using the m's g0 stack. if _g_.m.incgo { throw("schedule: in cgo") } top: if sched.gcwaiting != 0 { gcstopm() goto top } if _g_.m.p.ptr().runSafePointFn != 0 { runSafePointFn() } var gp *g var inheritTime bool if trace.enabled || trace.shutdown { gp = traceReader() if gp != nil { casgstatus(gp, _Gwaiting, _Grunnable) traceGoUnpark(gp, 0) } } if gp == nil && gcBlackenEnabled != 0 { gp = gcController.findRunnableGCWorker(_g_.m.p.ptr()) } if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } if gp == nil { gp, inheritTime = findrunnable() // blocks until work is available } // This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { resetspinning() } if gp.lockedm != 0 { // Hands off own p to the locked m, // then blocks waiting for a new p. startlockedm(gp) goto top } execute(gp, inheritTime) }
findrunnable函數負責給一個P尋找能夠執行的G,它的尋找順序以下:
代碼以下:
// Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from global queue, poll network. func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() // The conditions here and in handoffp must agree: if // findrunnable would return a G to run, handoffp must start // an M. top: _p_ := _g_.m.p.ptr() if sched.gcwaiting != 0 { gcstopm() goto top } if _p_.runSafePointFn != 0 { runSafePointFn() } if fingwait && fingwake { if gp := wakefing(); gp != nil { ready(gp, 0, true) } } if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // local runq if gp, inheritTime := runqget(_p_); gp != nil { return gp, inheritTime } // global runq if sched.runqsize != 0 { lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // Poll network. // This netpoll is only an optimization before we resort to stealing. // We can safely skip it if there are no waiters or a thread is blocked // in netpoll already. If there is any kind of logical race with that // blocked thread (e.g. it has already returned from netpoll, but does // not set lastpoll yet), this thread will do blocking netpoll below // anyway. if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if gp := netpoll(false); gp != nil { // non-blocking // netpoll returns list of goroutines linked by schedlink. injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // Steal work from other P's. procs := uint32(gomaxprocs) if atomic.Load(&sched.npidle) == procs-1 { // Either GOMAXPROCS=1 or everybody, except for us, is idle already. // New work can appear from returning syscall/cgocall, network or timers. // Neither of that submits to local run queues, so no point in stealing. goto stop } // If number of spinning M's >= number of busy P's, block. // This is necessary to prevent excessive CPU consumption // when GOMAXPROCS>>1 but the program parallelism is low. if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) { goto stop } if !_g_.m.spinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil { return gp, false } } } stop: // We have nothing to do. If we're in the GC mark phase, can // safely scan and blacken objects, and have work to do, run // idle-time marking rather than give up the P. if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) { _p_.gcMarkWorkerMode = gcMarkWorkerIdleMode gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } // Before we drop our P, make a snapshot of the allp slice, // which can change underfoot once we no longer block // safe-points. We don't need to snapshot the contents because // everything up to cap(allp) is immutable. allpSnapshot := allp // return P and block lock(&sched.lock) if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 { unlock(&sched.lock) goto top } if sched.runqsize != 0 { gp := globrunqget(_p_, 0) unlock(&sched.lock) return gp, false } if releasep() != _p_ { throw("findrunnable: wrong p") } pidleput(_p_) unlock(&sched.lock) // Delicate dance: thread transitions from spinning to non-spinning state, // potentially concurrently with submission of new goroutines. We must // drop nmspinning first and then check all per-P queues again (with // #StoreLoad memory barrier in between). If we do it the other way around, // another thread can submit a goroutine after we've checked all run queues // but before we drop nmspinning; as the result nobody will unpark a thread // to run the goroutine. // If we discover new work below, we need to restore m.spinning as a signal // for resetspinning to unpark a new worker thread (because there can be more // than one starving goroutine). However, if after discovering new work // we also observe no idle Ps, it is OK to just park the current thread: // the system is fully loaded so no spinning threads are required. // Also see "Worker thread parking/unparking" comment at the top of the file. wasSpinning := _g_.m.spinning if _g_.m.spinning { _g_.m.spinning = false if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("findrunnable: negative nmspinning") } } // check all runqueues once again for _, _p_ := range allpSnapshot { if !runqempty(_p_) { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } goto top } break } } // Check for idle-priority GC work again. if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) { lock(&sched.lock) _p_ = pidleget() if _p_ != nil && _p_.gcBgMarkWorker == 0 { pidleput(_p_) _p_ = nil } unlock(&sched.lock) if _p_ != nil { acquirep(_p_) if wasSpinning { _g_.m.spinning = true atomic.Xadd(&sched.nmspinning, 1) } // Go back to idle GC check. goto stop } } // poll network if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 { if _g_.m.p != 0 { throw("findrunnable: netpoll with p") } if _g_.m.spinning { throw("findrunnable: netpoll with spinning") } gp := netpoll(true) // block until new work is available atomic.Store64(&sched.lastpoll, uint64(nanotime())) if gp != nil { lock(&sched.lock) _p_ = pidleget() unlock(&sched.lock) if _p_ != nil { acquirep(_p_) injectglist(gp.schedlink.ptr()) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } injectglist(gp) } } stopm() goto top }
newproc函數負責建立一個能夠運行的G並將其放在當前的P的runnable G隊列中,它是相似」go func() { … }」語句真正被編譯器翻譯後的調用,核心代碼在newproc1函數。這個函數執行順序以下:
代碼以下:
// Go1.10.8版本默認stack大小爲2KB _StackMin = 2048 // 建立一個g對象,而後放到g隊列 // 等待被執行 // Create a new g running fn with narg bytes of arguments starting // at argp. callerpc is the address of the go statement that created // this. The new g is put on the queue of g's waiting to run. func newproc1(fn *funcval, argp *uint8, narg int32, callerpc uintptr) { _g_ := getg() if fn == nil { _g_.m.throwing = -1 // do not dump full stacks throw("go of nil func value") } _g_.m.locks++ // disable preemption because it can be holding p in a local var siz := narg siz = (siz + 7) &^ 7 // We could allocate a larger initial stack if necessary. // Not worth it: this is almost always an error. // 4*sizeof(uintreg): extra space added below // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall). if siz >= _StackMin-4*sys.RegSize-sys.RegSize { throw("newproc: function arguments too large for new goroutine") } _p_ := _g_.m.p.ptr() newg := gfget(_p_) if newg == nil { newg = malg(_StackMin) casgstatus(newg, _Gidle, _Gdead) allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack. } if newg.stack.hi == 0 { throw("newproc1: newg missing stack") } if readgstatus(newg) != _Gdead { throw("newproc1: new g is not Gdead") } totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign sp := newg.stack.hi - totalSize spArg := sp if usesLR { // caller's LR *(*uintptr)(unsafe.Pointer(sp)) = 0 prepGoExitFrame(sp) spArg += sys.MinFrameSize } if narg > 0 { memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg)) // This is a stack-to-stack copy. If write barriers // are enabled and the source stack is grey (the // destination is always black), then perform a // barrier copy. We do this *after* the memmove // because the destination stack may have garbage on // it. if writeBarrier.needed && !_g_.m.curg.gcscandone { f := findfunc(fn.fn) stkmap := (*stackmap)(funcdata(f, _FUNCDATA_ArgsPointerMaps)) // We're in the prologue, so it's always stack map index 0. bv := stackmapdata(stkmap, 0) bulkBarrierBitmap(spArg, spArg, uintptr(narg), 0, bv.bytedata) } } memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched)) newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) newg.gopc = callerpc newg.startpc = fn.fn if _g_.m.curg != nil { newg.labels = _g_.m.curg.labels } if isSystemGoroutine(newg) { atomic.Xadd(&sched.ngsys, +1) } newg.gcscanvalid = false casgstatus(newg, _Gdead, _Grunnable) if _p_.goidcache == _p_.goidcacheend { // Sched.goidgen is the last allocated id, // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch]. // At startup sched.goidgen=0, so main goroutine receives goid=1. _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch) _p_.goidcache -= _GoidCacheBatch - 1 _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch } newg.goid = int64(_p_.goidcache) _p_.goidcache++ if raceenabled { newg.racectx = racegostart(callerpc) } if trace.enabled { traceGoCreate(newg, newg.startpc) } runqput(_p_, newg, true) if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted { wakep() } _g_.m.locks-- if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack _g_.stackguard0 = stackPreempt } }
goexit函數是當G退出時調用的。這個函數對G進行一些設置後,將它放入free G列表中,供之後複用,以後調用schedule函數調度。
// goexit continuation on g0. func goexit0(gp *g) { _g_ := getg() //設置g的 status從 _Grunning變爲 _Gdead casgstatus(gp, _Grunning, _Gdead) if isSystemGoroutine(gp) { atomic.Xadd(&sched.ngsys, -1) } //對該g 進行釋放設置 基本爲nil /0 gp.m = nil locked := gp.lockedm != 0 gp.lockedm = 0 _g_.m.lockedg = 0 gp.paniconfault = false gp._defer = nil // should be true already but just in case. gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data. gp.writebuf = nil gp.waitreason = "" gp.param = nil gp.labels = nil gp.timer = nil if gcBlackenEnabled != 0 && gp.gcAssistBytes > 0 { // Flush assist credit to the global pool. This gives // better information to pacing if the application is // rapidly creating an exiting goroutines. scanCredit := int64(gcController.assistWorkPerByte * float64(gp.gcAssistBytes)) atomic.Xaddint64(&gcController.bgScanCredit, scanCredit) gp.gcAssistBytes = 0 } // Note that gp's stack scan is now "valid" because it has no // stack. gp.gcscanvalid = true dropg() if _g_.m.lockedInt != 0 { print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n") throw("internal lockOSThread error") } _g_.m.lockedExt = 0 //把這個g 推到free G 列表 gfput(_g_.m.p.ptr(), gp) if locked { // The goroutine may have locked this thread because // it put it in an unusual kernel state. Kill it // rather than returning it to the thread pool. // Return to mstart, which will release the P and exit // the thread. if GOOS != "plan9" { // See golang.org/issue/22227. gogo(&_g_.m.g0.sched) } } schedule() }
handoffp函數將P從系統調用或阻塞的M中傳遞出去,若是P還有runnable G隊列,那麼新開一個M,調用startm函數,新開的M不空旋。
// Hands off P from syscall or locked M. // Always runs without a P, so write barriers are not allowed. //go:nowritebarrierrec func handoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_. //若是這個P的隊列不爲空或調度內的size不爲空 那麼 進行startm 且不空旋 if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } //若是正在進行GC處理 同上 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } //若是沒活可作了,檢查下有沒有 空閒/自旋的 M //不然 不須要咱們作自旋 if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } //調度上鎖 將這個P 摘除走 lock(&sched.lock) if sched.gcwaiting != 0 { _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) return } if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } pidleput(_p_) unlock(&sched.lock) }
startm函數調度一個M或者必要時建立一個M來運行指定的P。
// Schedules some M to run the p (creates an M if necessary). // If p==nil, tries to get an idle P, if no idle P's does nothing. // May run with m.p==nil, so write barriers are not allowed. // If spinning is set, the caller has incremented nmspinning and startm will // either decrement nmspinning or set m.spinning in the newly started M. //go:nowritebarrierrec func startm(_p_ *p, spinning bool) { //加鎖 lock(&sched.lock) if _p_ == nil { _p_ = pidleget() if _p_ == nil { unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return } } mp := mget() unlock(&sched.lock) if mp == nil { var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_) return } if mp.spinning { throw("startm: m is spinning") } if mp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning mp.nextp.set(_p_) notewakeup(&mp.park) }
sysmon函數是Go runtime啓動時建立的,負責監控全部goroutine的狀態,判斷是否須要GC,進行netpoll等操做。sysmon函數中會調用retake函數進行搶佔式調度。
// Always runs without a P, so write barriers are not allowed. // //go:nowritebarrierrec func sysmon() { lock(&sched.lock) sched.nmsys++ checkdead() unlock(&sched.lock) // If a heap span goes unused for 5 minutes after a garbage collection, // we hand it back to the operating system. scavengelimit := int64(5 * 60 * 1e9) if debug.scavenge > 0 { // Scavenge-a-lot for testing. forcegcperiod = 10 * 1e6 scavengelimit = 20 * 1e6 } lastscavenge := nanotime() nscavenge := 0 lasttrace := int64(0) idle := 0 // how many cycles in succession we had not wokeup somebody delay := uint32(0) for { if idle == 0 { // start with 20us sleep... delay = 20 } else if idle > 50 { // start doubling the sleep after 1ms... delay *= 2 } if delay > 10*1000 { // up to 10ms delay = 10 * 1000 } usleep(delay) if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) { lock(&sched.lock) if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) { atomic.Store(&sched.sysmonwait, 1) unlock(&sched.lock) // Make wake-up period small enough // for the sampling to be correct. maxsleep := forcegcperiod / 2 if scavengelimit < forcegcperiod { maxsleep = scavengelimit / 2 } shouldRelax := true if osRelaxMinNS > 0 { next := timeSleepUntil() now := nanotime() if next-now < osRelaxMinNS { shouldRelax = false } } if shouldRelax { osRelax(true) } notetsleep(&sched.sysmonnote, maxsleep) if shouldRelax { osRelax(false) } lock(&sched.lock) atomic.Store(&sched.sysmonwait, 0) noteclear(&sched.sysmonnote) idle = 0 delay = 20 } unlock(&sched.lock) } // trigger libc interceptors if needed if *cgo_yield != nil { asmcgocall(*cgo_yield, nil) } // poll network if not polled for more than 10ms lastpoll := int64(atomic.Load64(&sched.lastpoll)) now := nanotime() if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now { atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now)) gp := netpoll(false) // non-blocking - returns list of goroutines if gp != nil { // Need to decrement number of idle locked M's // (pretending that one more is running) before injectglist. // Otherwise it can lead to the following situation: // injectglist grabs all P's but before it starts M's to run the P's, // another M returns from syscall, finishes running its G, // observes that there is no work to do and no other running M's // and reports deadlock. incidlelocked(-1) injectglist(gp) incidlelocked(1) } } // retake P's blocked in syscalls // and preempt long running G's if retake(now) != 0 { idle = 0 } else { idle++ } // check if we need to force a GC if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 { lock(&forcegc.lock) forcegc.idle = 0 forcegc.g.schedlink = 0 injectglist(forcegc.g) unlock(&forcegc.lock) } // scavenge heap once in a while if lastscavenge+scavengelimit/2 < now { mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit)) lastscavenge = now nscavenge++ } if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now { lasttrace = now schedtrace(debug.scheddetail > 0) } } }
枚舉全部的P 若是P在系統調用中(_Psyscall), 且通過了一次sysmon循環(20us~10ms), 則搶佔這個P, 調用handoffp解除M和P之間的關聯, 若是P在運行中(_Prunning), 且通過了一次sysmon循環而且G運行時間超過forcePreemptNS(10ms), 則搶佔這個P
並設置g.preempt = true,g.stackguard0 = stackPreempt。
爲何設置了stackguard就能夠實現搶佔?
由於這個值用於檢查當前棧空間是否足夠, go函數的開頭會比對這個值判斷是否須要擴張棧。
newstack函數判斷g.stackguard0等於stackPreempt, 就知道這是搶佔觸發的, 這時會再檢查一遍是否要搶佔。
搶佔機制保證了不會有一個G長時間的運行致使其餘G沒法運行的狀況發生。
func retake(now int64) uint32 { n := 0 // Prevent allp slice changes. This lock will be completely // uncontended unless we're already stopping the world. lock(&allpLock) // We can't use a range loop over allp because we may // temporarily drop the allpLock. Hence, we need to re-fetch // allp each time around the loop. for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick s := _p_.status if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). t := int64(_p_.syscalltick) if int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ handoffp(_p_) } incidlelocked(1) lock(&allpLock) } else if s == _Prunning { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now continue } if pd.schedwhen+forcePreemptNS > now { continue } preemptone(_p_) } } unlock(&allpLock) return uint32(n) }