golang調度學習-調度流程 (六) 搶佔調度

golang調度高效祕訣之一是它的搶佔式調度。當任務函數執行的時間超過了必定的時間,
sysmon方法會不斷的檢測全部p上任務的執行狀況,當有超過預約執行時間的g時,會發起搶佔。這一切也是在retake函數中實現的,上文描述了該函數在系統調用中的功能,這裏講下該函數如何執行搶佔。html

retake

retake()函數會遍歷全部的P,若是一個P處於執行狀態, 且已經連續執行了較長時間,就會被搶佔。retake()調用preemptone()將P的stackguard0設爲 stackPreempt(關於stackguard的詳細內容,能夠參考 Split Stacks),這將致使該P中正在執行的G進行下一次函數調用時,致使棧空間檢查失敗。進而觸發morestack()(彙編代碼,位於asm_XXX.s中)而後進行一連串的函數調用,主要的調用過程以下:
morestack()(彙編代碼)-> newstack() -> gopreempt_m() -> goschedImpl() -> schedule()
http://ga0.github.io/golang/2...git

func retake(now int64) uint32 {
    n := 0
    lock(&allpLock)
    
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        sysretake := false
        if s == _Prunning || s == _Psyscall {
            // Preempt G if it's running for too long.
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
            } else if pd.schedwhen+forcePreemptNS <= now {
                // 超時搶佔
                preemptone(_p_)
                // In case of syscall, preemptone() doesn't
                // work, because there is no M wired to P.
                sysretake = true
            }
        }
        //p在系統調用中或者被調用
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            t := int64(_p_.syscalltick)
            if !sysretake && int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
             //沒有能夠調度的任務且時間阻塞時間未到閥值,直接跳過
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // Drop allpLock so we can take sched.lock.
            // 這裏出發了系統調用長時間阻塞的調度
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1)
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                //關鍵方法,將對長時間阻塞的p進行從新調度
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

preemptone

// 告訴處理器P上運行的goroutine中止。
// 此功能純粹是盡力而爲。 它可能會錯誤地沒法通知goroutine。 它能夠發送通知錯誤的goroutine。 即便它通知了正確的goroutine,但若是goroutine同時執行newstack,該goroutine可能會忽略該請求。 無需鎖定。 若是發出了搶佔請求,則返回true。 實際的搶佔將在未來的某個時候發生,而且將經過gp-> status指示,再也不處於「Grunning」狀態

func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }
        // 標記可搶佔
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    gp.stackguard0 = stackPreempt

    // Request an async preemption of this P.
    // gorotuine 中的每一個調用都會經過將當前堆棧指針與 gp->stackguard0 進行比較來檢查堆棧溢出。
        // 將 gp->stackguard0 設置爲 stackPreempt 會將搶佔摺疊爲正常的堆棧溢出檢查。
    if preemptMSupported && debug.asyncpreemptoff == 0 {
        _p_.preempt = true
        preemptM(mp)
    }

    return true
}

能夠看到只是設置了兩個參數,並無執行實際的搶佔工做,事實上這個過程是異步的,將在其餘的地方執行真正的搶佔操做。github

stackguard0自己是用來檢測goroutine的棧是否須要擴充的,當設置爲stackPreempt時,在執行函數的時候,便會觸發棧擴充,調用morestack()方法,morestack會調用newstack,該方法會擴充g的棧空間,也兼職了goroutine的搶佔功能。
preempt 爲搶佔的備用手段,在stackguard0設置stackPreempt且在newstack中未能被搶佔時,該標記也會在其餘地方設置stackguard0的值爲stackPreempt,再次觸發搶佔。golang

func newstack() {
    thisg := getg()
    gp := thisg.m.curg
    // 注意:若是另外一個線程即將嘗試搶佔gp,則stackguard0可能會在發生變化。
    // 因此如今讀一次,判斷是否被搶佔。
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
  
    if preempt {
        //如下狀況不會被搶佔
        if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
            // Let the goroutine keep running for now.
            // gp->preempt is set, so it will be preempted next time.
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) // never return
        }
    }
  
    if preempt {
        casgstatus(gp, _Grunning, _Gwaiting)
        //gc掃描搶佔
        if gp.preemptscan {
            for !castogscanstatus(gp, _Gwaiting, _Gscanwaiting) {
            }
            if !gp.gcscandone {
                //掃描當前gp棧
                gcw := &gp.m.p.ptr().gcw
                scanstack(gp, gcw)
                if gcBlackenPromptly {
                    gcw.dispose()
                }
                gp.gcscandone = true
            }
            gp.preemptscan = false
            gp.preempt = false
            casfrom_Gscanstatus(gp, _Gscanwaiting, _Gwaiting)
            // This clears gcscanvalid.
            casgstatus(gp, _Gwaiting, _Grunning)
            gp.stackguard0 = gp.stack.lo + _StackGuard
            gogo(&gp.sched) //  恢復後繼續執行
        }
        //轉換狀態爲 _Gwaiting
        casgstatus(gp, _Gwaiting, _Grunning)
        gopreempt_m(gp) // never return
    }
  ...
}

這裏最終會取消m和g的綁定,並將g放入全局隊列中,而後開始調度m執行新的任務app

以上是golang搶佔調度的基本內容,總結以下:異步

正常goroutine的搶佔都時由監控線程的sysmon發起的,超時執行的goroutine會被打上可搶佔的標誌。(gc scan階段也會發生搶佔,主要是爲了掃描正在運行的g的棧空間)
在任務的每一個函數中,編譯器會加上棧空間檢測代碼,有須要棧空間擴容或者搶佔便會進入morestack,而後調用newstack方法
newstack中會檢測是否搶佔和搶佔類型。gc掃描觸發的搶佔回掃描當前g棧上的內容,而後繼續執行當前g。而普通搶佔則會解綁當前g,將g放入全局隊列,而後繼續調度。async

handoffp

當系統調用時間過長的時候,會調用handoffp()方法:函數

// p的切換,系統調用或者綁定M時使用
func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
    //當前p有任務或者全局任務隊列有任務,觸發一次調度
        //startm()上文有描述,會獲取一個m來調度當前p的任務,當前p爲nil時,會調度其餘p任務隊列
    if !runqempty(_p_) || sched.runqsize != 0 {
        startm(_p_, false)
        return
    }
    //gc標記階段且當前p有標記任務,觸發調度
    if gcBlackenEnabled != 0 && gcMarkWorkAvailable(_p_) {
        startm(_p_, false)
        return
    }
    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
    // //有自旋m或空閒p,觸發調度
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock)
    if sched.gcwaiting != 0 {
        _p_.status = _Pgcstop
        sched.stopwait--
        if sched.stopwait == 0 {
            notewakeup(&sched.stopnote)
        }
        unlock(&sched.lock)
        return
    }
    if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
        sched.safePointFn(_p_)
        sched.safePointWait--
        if sched.safePointWait == 0 {
            notewakeup(&sched.safePointNote)
        }
    }
    //全局隊列不爲空
    if sched.runqsize != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
    // 其餘隊列能夠偷
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }

    // The scheduler lock cannot be held when calling wakeNetPoller below
    // because wakeNetPoller may call wakep which may call startm.
    when := nobarrierWakeTime(_p_)
    //實在沒任務,放入空閒隊列
    pidleput(_p_)
    unlock(&sched.lock)

    if when != 0 {
        wakeNetPoller(when)
    }
}
相關文章
相關標籤/搜索