golang調度高效祕訣之一是它的搶佔式調度。當任務函數執行的時間超過了必定的時間,
sysmon方法會不斷的檢測全部p上任務的執行狀況,當有超過預約執行時間的g時,會發起搶佔。這一切也是在retake函數中實現的,上文描述了該函數在系統調用中的功能,這裏講下該函數如何執行搶佔。html
retake()函數會遍歷全部的P,若是一個P處於執行狀態, 且已經連續執行了較長時間,就會被搶佔。retake()調用preemptone()將P的stackguard0設爲 stackPreempt(關於stackguard的詳細內容,能夠參考 Split Stacks),這將致使該P中正在執行的G進行下一次函數調用時,致使棧空間檢查失敗。進而觸發morestack()(彙編代碼,位於asm_XXX.s中)而後進行一連串的函數調用,主要的調用過程以下:
morestack()(彙編代碼)-> newstack() -> gopreempt_m() -> goschedImpl() -> schedule()
http://ga0.github.io/golang/2...git
func retake(now int64) uint32 { n := 0 lock(&allpLock) for i := 0; i < len(allp); i++ { _p_ := allp[i] if _p_ == nil { // This can happen if procresize has grown // allp but not yet created new Ps. continue } pd := &_p_.sysmontick s := _p_.status sysretake := false if s == _Prunning || s == _Psyscall { // Preempt G if it's running for too long. t := int64(_p_.schedtick) if int64(pd.schedtick) != t { pd.schedtick = uint32(t) pd.schedwhen = now } else if pd.schedwhen+forcePreemptNS <= now { // 超時搶佔 preemptone(_p_) // In case of syscall, preemptone() doesn't // work, because there is no M wired to P. sysretake = true } } //p在系統調用中或者被調用 if s == _Psyscall { // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us). t := int64(_p_.syscalltick) if !sysretake && int64(pd.syscalltick) != t { pd.syscalltick = uint32(t) pd.syscallwhen = now continue } // On the one hand we don't want to retake Ps if there is no other work to do, // but on the other hand we want to retake them eventually // because they can prevent the sysmon thread from deep sleep. //沒有能夠調度的任務且時間阻塞時間未到閥值,直接跳過 if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now { continue } // Drop allpLock so we can take sched.lock. // 這裏出發了系統調用長時間阻塞的調度 unlock(&allpLock) // Need to decrement number of idle locked M's // (pretending that one more is running) before the CAS. // Otherwise the M from which we retake can exit the syscall, // increment nmidle and report deadlock. incidlelocked(-1) if atomic.Cas(&_p_.status, s, _Pidle) { if trace.enabled { traceGoSysBlock(_p_) traceProcStop(_p_) } n++ _p_.syscalltick++ //關鍵方法,將對長時間阻塞的p進行從新調度 handoffp(_p_) } incidlelocked(1) lock(&allpLock) } } unlock(&allpLock) return uint32(n) }
// 告訴處理器P上運行的goroutine中止。 // 此功能純粹是盡力而爲。 它可能會錯誤地沒法通知goroutine。 它能夠發送通知錯誤的goroutine。 即便它通知了正確的goroutine,但若是goroutine同時執行newstack,該goroutine可能會忽略該請求。 無需鎖定。 若是發出了搶佔請求,則返回true。 實際的搶佔將在未來的某個時候發生,而且將經過gp-> status指示,再也不處於「Grunning」狀態 func preemptone(_p_ *p) bool { mp := _p_.m.ptr() if mp == nil || mp == getg().m { return false } gp := mp.curg if gp == nil || gp == mp.g0 { return false } // 標記可搶佔 gp.preempt = true // Every call in a go routine checks for stack overflow by // comparing the current stack pointer to gp->stackguard0. // Setting gp->stackguard0 to StackPreempt folds // preemption into the normal stack overflow check. gp.stackguard0 = stackPreempt // Request an async preemption of this P. // gorotuine 中的每一個調用都會經過將當前堆棧指針與 gp->stackguard0 進行比較來檢查堆棧溢出。 // 將 gp->stackguard0 設置爲 stackPreempt 會將搶佔摺疊爲正常的堆棧溢出檢查。 if preemptMSupported && debug.asyncpreemptoff == 0 { _p_.preempt = true preemptM(mp) } return true }
能夠看到只是設置了兩個參數,並無執行實際的搶佔工做,事實上這個過程是異步的,將在其餘的地方執行真正的搶佔操做。github
stackguard0自己是用來檢測goroutine的棧是否須要擴充的,當設置爲stackPreempt時,在執行函數的時候,便會觸發棧擴充,調用morestack()方法,morestack會調用newstack,該方法會擴充g的棧空間,也兼職了goroutine的搶佔功能。
preempt 爲搶佔的備用手段,在stackguard0設置stackPreempt且在newstack中未能被搶佔時,該標記也會在其餘地方設置stackguard0的值爲stackPreempt,再次觸發搶佔。golang
func newstack() { thisg := getg() gp := thisg.m.curg // 注意:若是另外一個線程即將嘗試搶佔gp,則stackguard0可能會在發生變化。 // 因此如今讀一次,判斷是否被搶佔。 preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt if preempt { //如下狀況不會被搶佔 if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning { // Let the goroutine keep running for now. // gp->preempt is set, so it will be preempted next time. gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // never return } } if preempt { casgstatus(gp, _Grunning, _Gwaiting) //gc掃描搶佔 if gp.preemptscan { for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) { } if !gp.gcscandone { //掃描當前gp棧 gcw := &gp.m.p.ptr().gcw scanstack(gp, gcw) if gcBlackenPromptly { gcw.dispose() } gp.gcscandone = true } gp.preemptscan = false gp.preempt = false casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting) // This clears gcscanvalid. casgstatus(gp, _Gwaiting, _Grunning) gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // 恢復後繼續執行 } //轉換狀態爲 _Gwaiting casgstatus(gp, _Gwaiting, _Grunning) gopreempt_m(gp) // never return } ... }
這裏最終會取消m和g的綁定,並將g放入全局隊列中,而後開始調度m執行新的任務app
以上是golang搶佔調度的基本內容,總結以下:異步
正常goroutine的搶佔都時由監控線程的sysmon發起的,超時執行的goroutine會被打上可搶佔的標誌。(gc scan階段也會發生搶佔,主要是爲了掃描正在運行的g的棧空間)
在任務的每一個函數中,編譯器會加上棧空間檢測代碼,有須要棧空間擴容或者搶佔便會進入morestack,而後調用newstack方法
newstack中會檢測是否搶佔和搶佔類型。gc掃描觸發的搶佔回掃描當前g棧上的內容,而後繼續執行當前g。而普通搶佔則會解綁當前g,將g放入全局隊列,而後繼續調度。async
當系統調用時間過長的時候,會調用handoffp()方法:函數
// p的切換,系統調用或者綁定M時使用 func handoffp(_p_ *p) { // handoffp must start an M in any situation where // findrunnable would return a G to run on _p_. // if it has local work, start it straight away //當前p有任務或者全局任務隊列有任務,觸發一次調度 //startm()上文有描述,會獲取一個m來調度當前p的任務,當前p爲nil時,會調度其餘p任務隊列 if !runqempty(_p_) || sched.runqsize != 0 { startm(_p_, false) return } //gc標記階段且當前p有標記任務,觸發調度 if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) { startm(_p_, false) return } // no local work, check that there are no spinning/idle M's, // otherwise our help is not required // //有自旋m或空閒p,觸發調度 if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic startm(_p_, true) return } lock(&sched.lock) if sched.gcwaiting != 0 { _p_.status = _Pgcstop sched.stopwait-- if sched.stopwait == 0 { notewakeup(&sched.stopnote) } unlock(&sched.lock) return } if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) { sched.safePointFn(_p_) sched.safePointWait-- if sched.safePointWait == 0 { notewakeup(&sched.safePointNote) } } //全局隊列不爲空 if sched.runqsize != 0 { unlock(&sched.lock) startm(_p_, false) return } // If this is the last running P and nobody is polling network, // need to wakeup another M to poll network. // 其餘隊列能夠偷 if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 { unlock(&sched.lock) startm(_p_, false) return } // The scheduler lock cannot be held when calling wakeNetPoller below // because wakeNetPoller may call wakep which may call startm. when := nobarrierWakeTime(_p_) //實在沒任務,放入空閒隊列 pidleput(_p_) unlock(&sched.lock) if when != 0 { wakeNetPoller(when) } }