Posted on 2021年3月20日 by luozhiyun
轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客: https://www.luozhiyun.com/arc...緩存
本文使用的 Go 的源碼1.15.7安全
stopTheWorldWithSema
與startTheWorldWithSema
是一對用於暫停和恢復程序的核心函數。併發
func stopTheWorldWithSema() { _g_ := getg() lock(&sched.lock) sched.stopwait = gomaxprocs // 標記 gcwaiting,調度時看見此標記會進入等待 atomic.Store(&sched.gcwaiting, 1) // 發送搶佔信號 preemptall() // 暫停當前 P _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic. sched.stopwait-- // 遍歷全部的 P ,修改 P 的狀態爲 _Pgcstop 中止運行 for _, p := range allp { s := p.status if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) { if trace.enabled { traceGoSysBlock(p) traceProcStop(p) } p.syscalltick++ sched.stopwait-- } } // 中止空閒的 P 列表 for { p := pidleget() if p == nil { break } p.status = _Pgcstop sched.stopwait-- } wait := sched.stopwait > 0 unlock(&sched.lock) if wait { for { // 等待 100 us if notetsleep(&sched.stopnote, 100*1000) { noteclear(&sched.stopnote) break } // 再次進行發送搶佔信號 preemptall() } } // 安全檢測 bad := "" if sched.stopwait != 0 { bad = "stopTheWorld: not stopped (stopwait != 0)" } else { for _, p := range allp { if p.status != _Pgcstop { bad = "stopTheWorld: not stopped (status != _Pgcstop)" } } } if atomic.Load(&freezing) != 0 { lock(&deadlock) lock(&deadlock) } if bad != "" { throw(bad) } }
這個方法會經過sched.stopwait
來檢測是否全部的P都已暫停。首先會經過調用preemptall
發送搶佔信號進行搶佔全部運行中的G,而後遍歷P將全部狀態爲_Psyscall
、空閒的P都暫停,若是仍有須要中止的P, 則等待它們中止。函數
func startTheWorldWithSema(emitTraceEvent bool) int64 { mp := acquirem() // disable preemption because it can be holding p in a local var // 判斷收到的 netpoll 事件並添加對應的G到待運行隊列 if netpollinited() { list := netpoll(0) // non-blocking injectglist(&list) } lock(&sched.lock) procs := gomaxprocs if newprocs != 0 { procs = newprocs newprocs = 0 } // 擴容或者縮容全局的處理器 p1 := procresize(procs) // 取消GC等待標記 sched.gcwaiting = 0 // 若是 sysmon (後臺監控線程) 在等待則喚醒它 if sched.sysmonwait != 0 { sched.sysmonwait = 0 notewakeup(&sched.sysmonnote) } unlock(&sched.lock) // 喚醒有可運行任務的P for p1 != nil { p := p1 p1 = p1.link.ptr() if p.m != 0 { mp := p.m.ptr() p.m = 0 if mp.nextp != 0 { throw("startTheWorld: inconsistent mp->nextp") } mp.nextp.set(p) notewakeup(&mp.park) } else { // Start M to run P newm(nil, p, -1) } } startTime := nanotime() if emitTraceEvent { traceGCSTWDone() } // 若是有空閒的P,而且沒有自旋中的M則喚醒或者建立一個M wakep() releasem(mp) return startTime }
func gcBgMarkStartWorkers() { // 遍歷全部 P for _, p := range allp { // 若是已啓動則不重複啓動 if p.gcBgMarkWorker == 0 { // 爲全局每一個處理器建立用於執行後臺標記任務的 Goroutine go gcBgMarkWorker(p) // 啓動後等待該任務通知信號量 bgMarkReady 再繼續 notetsleepg(&work.bgMarkReady, -1) noteclear(&work.bgMarkReady) } } }
gcBgMarkStartWorkers
會爲全局每一個 P 建立用於執行後臺標記任務的Goroutine,每個 Goroutine 都會運行gcBgMarkWorker
,notetsleepg
會等待gcBgMarkWorker
通知信號量bgMarkReady
再繼續。ui
這裏雖然爲每一個P啓動了一個後臺標記任務, 可是能夠同時工做的只有25%,調度器在調度循環runtime.schedule
中經過調用 gcController.findRunnableGCWorker
方法進行控制。atom
在看這個方法以前,先來了解一個概念, Mark Worker Mode
標記工做模式,目前來講有三種,這三種是爲了保證後臺的標記線程的利用率。spa
type gcMarkWorkerMode int const ( // gcMarkWorkerDedicatedMode indicates that the P of a mark // worker is dedicated to running that mark worker. The mark // worker should run without preemption. gcMarkWorkerDedicatedMode gcMarkWorkerMode = iota // gcMarkWorkerFractionalMode indicates that a P is currently // running the "fractional" mark worker. The fractional worker // is necessary when GOMAXPROCS*gcBackgroundUtilization is not // an integer. The fractional worker should run until it is // preempted and will be scheduled to pick up the fractional // part of GOMAXPROCS*gcBackgroundUtilization. gcMarkWorkerFractionalMode // gcMarkWorkerIdleMode indicates that a P is running the mark // worker because it has nothing else to do. The idle worker // should run until it is preempted and account its time // against gcController.idleMarkTime. gcMarkWorkerIdleMode )
經過代碼註釋能夠知道:pwa
func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g { ... // 原子減小對應的值, 若是減小後大於等於0則返回true, 不然返回false decIfPositive := func(ptr *int64) bool { if *ptr > 0 { if atomic.Xaddint64(ptr, -1) >= 0 { return true } // We lost a race atomic.Xaddint64(ptr, +1) } return false } // 減小dedicatedMarkWorkersNeeded, 成功時後臺標記任務的模式是Dedicated if decIfPositive(&c.dedicatedMarkWorkersNeeded) { _p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode } else if c.fractionalUtilizationGoal == 0 { // No need for fractional workers. return nil } else { // 執行標記任務的時間 delta := nanotime() - gcController.markStartTime if delta > 0 && float64(_p_.gcFractionalMarkTime)/float64(delta) > c.fractionalUtilizationGoal { // Nope. No need to run a fractional worker. return nil } _p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode } gp := _p_.gcBgMarkWorker.ptr() casgstatus(gp, _Gwaiting, _Grunnable) return gp }
看過個人《詳解Go語言調度循環源碼實現》的同窗應該都知道,搶佔調度運行到這裏的時候,一般是 P 搶佔不到 G 了,打算進行休眠了,所以在休眠以前能夠安全的進行標記任務的執行。線程
沒看過調度循環的同窗能夠看這裏:詳解Go語言調度循環源碼實現 https://www.luozhiyun.com/arc... 。code
併發掃描標記能夠大概歸納爲如下幾個部分:
func gcBgMarkWorker(_p_ *p) { gp := getg() type parkInfo struct { m muintptr attach puintptr } gp.m.preemptoff = "GC worker init" // 初始化 park park := new(parkInfo) gp.m.preemptoff = "" // 設置當前的M並禁止搶佔 park.m.set(acquirem()) // 設置當前的P park.attach.set(_p_) // 通知gcBgMarkStartWorkers能夠繼續處理 notewakeup(&work.bgMarkReady) for { // 讓當前 G 進入休眠 gopark(func(g *g, parkp unsafe.Pointer) bool { park := (*parkInfo)(parkp) releasem(park.m.ptr()) // 設置關聯的 P if park.attach != 0 { p := park.attach.ptr() park.attach.set(nil) // 把當前的G設到P的gcBgMarkWorker成員 if !p.gcBgMarkWorker.cas(0, guintptr(unsafe.Pointer(g))) { return false } } return true }, unsafe.Pointer(park), waitReasonGCWorkerIdle, traceEvGoBlock, 0) ... } }
在 gcBgMarkStartWorkers 中咱們看到,它會遍歷全部的 P ,而後爲每一個 P 建立一個負責 Mark Work 的 G,這裏雖然爲每一個 P 啓動了一個後臺標記任務, 可是不可能每一個 P 都會去執行標記任務,後臺標記任務默認資源佔用率是 25%,因此 gcBgMarkWorker 中會初始化 park 並將 G 和 P 的 gcBgMarkWorker 進行綁定後進行休眠。
調度器在調度循環runtime.schedule
中經過調用gcController.findRunnableGCWorker
方法進行控制,讓哪些 Mark Work 能夠執行,上面代碼已經貼過了,這裏就不重複了
在喚醒後,咱們會根據gcMarkWorkerMode
選擇不一樣的標記執行策略,不一樣的執行策略都會調用runtime.gcDrain
:
func gcBgMarkWorker(_p_ *p) { gp := getg() ... for { ... // 檢查P的gcBgMarkWorker是否和當前的G一致, 不一致時結束當前的任務 if _p_.gcBgMarkWorker.ptr() != gp { break } // 禁止G被搶佔 park.m.set(acquirem()) // 記錄開始時間 startTime := nanotime() _p_.gcMarkWorkerStartTime = startTime decnwait := atomic.Xadd(&work.nwait, -1) systemstack(func() { // 設置G的狀態爲等待中這樣它的棧能夠被掃描 casgstatus(gp, _Grunning, _Gwaiting) // 判斷後臺標記任務的模式 switch _p_.gcMarkWorkerMode { default: throw("gcBgMarkWorker: unexpected gcMarkWorkerMode") case gcMarkWorkerDedicatedMode: // 這個模式下P應該專心執行標記 gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit) if gp.preempt { // 被搶佔時把本地運行隊列中的全部G都踢到全局運行隊列 lock(&sched.lock) for { gp, _ := runqget(_p_) if gp == nil { break } globrunqput(gp) } unlock(&sched.lock) } // 繼續執行標記 gcDrain(&_p_.gcw, gcDrainFlushBgCredit) case gcMarkWorkerFractionalMode: // 執行標記 gcDrain(&_p_.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit) case gcMarkWorkerIdleMode: // 執行標記, 直到被搶佔或者達到必定的量 gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit) } // 恢復G的狀態到運行中 casgstatus(gp, _Gwaiting, _Grunning) }) ... } }
在上面已經講了不一樣的 Mark Worker Mode 的區別,不記得的同窗能夠往上翻一下。執行標記這部分主要在 switch 判斷中,根據不一樣的模式傳入不一樣的參數到 gcDrain 函數中執行。
須要注意的是,傳入到 gcDrain 中的是一個 gcWork 的結構體,它至關於每一個 P 的私有緩存空間,存放須要被掃描的對象,爲垃圾收集器提供了生產和消費任務的抽象,,該結構體持有了兩個重要的工做緩衝區 wbuf1 和 wbuf2:
當咱們向該結構體中增長或者刪除對象時,它總會先操做 wbuf1 緩衝區,一旦 wbuf1 緩衝區空間不足或者沒有對象,會觸發緩衝區的切換,而當兩個緩衝區空間都不足或者都爲空時,會從全局的工做緩衝區中插入或者獲取對象:
func (w *gcWork) tryGet() uintptr { wbuf := w.wbuf1 ... // wbuf1緩衝區無數據時 if wbuf.nobj == 0 { // wbuf1 與 wbuf2 進行對象互換 w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1 wbuf = w.wbuf1 if wbuf.nobj == 0 { owbuf := wbuf // 從 work 的 full 隊列中獲取 wbuf = trygetfull() ... } } wbuf.nobj-- return wbuf.obj[wbuf.nobj] }
當咱們向該結構體中增長或者刪除對象時,它總會先操做 wbuf1 緩衝區,一旦 wbuf1 緩衝區空間不足或者沒有對象,會觸發緩衝區的切換,而當兩個緩衝區空間都不足或者都爲空時,會從全局的工做緩衝區中插入或者獲取對象:
func (w *gcWork) tryGet() uintptr { wbuf := w.wbuf1 ... // wbuf1緩衝區無數據時 if wbuf.nobj == 0 { // wbuf1 與 wbuf2 進行對象互換 w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1 wbuf = w.wbuf1 if wbuf.nobj == 0 { owbuf := wbuf // 從 work 的 full 隊列中獲取 wbuf = trygetfull() ... } } wbuf.nobj-- return wbuf.obj[wbuf.nobj] }
繼續上面的 gcBgMarkWorker 方法,在標記完以後就要進行標記完成:
func gcBgMarkWorker(_p_ *p) { gp := getg() ... for { ... // 累加所用時間 duration := nanotime() - startTime switch _p_.gcMarkWorkerMode { case gcMarkWorkerDedicatedMode: atomic.Xaddint64(&gcController.dedicatedMarkTime, duration) atomic.Xaddint64(&gcController.dedicatedMarkWorkersNeeded, 1) case gcMarkWorkerFractionalMode: atomic.Xaddint64(&gcController.fractionalMarkTime, duration) atomic.Xaddint64(&_p_.gcFractionalMarkTime, duration) case gcMarkWorkerIdleMode: atomic.Xaddint64(&gcController.idleMarkTime, duration) } incnwait := atomic.Xadd(&work.nwait, +1) // 判斷是否全部後臺標記任務都完成, 而且沒有更多的任務 if incnwait == work.nproc && !gcMarkWorkAvailable(nil) { // 取消和P的關聯 _p_.gcBgMarkWorker.set(nil) // 容許G被搶佔 releasem(park.m.ptr()) // 準備進入完成標記階段 gcMarkDone() // 休眠以前會從新關聯P // 由於上面容許被搶佔, 到這裏的時候可能就會變成其餘P // 若是從新關聯P失敗則這個任務會結束 park.m.set(acquirem()) park.attach.set(_p_) } } }
gcBgMarkWorker 會根據 incnwait 來檢查是不是最後一個 worker,而後調用 gcMarkWorkAvailable 函數來校驗 gcwork的任務和全局任務是否已經所有都處理完了,若是都確認沒問題,那麼調用 gcMarkDone 進入完成標記階段。
下面咱們來看看 gcDrain:
func gcDrain(gcw *gcWork, flags gcDrainFlags) { gp := getg().m.curg // 看到搶佔標誌時是否要返回 preemptible := flags&gcDrainUntilPreempt != 0 // 是否計算後臺的掃描量來減小協助線程和喚醒等待中的G flushBgCredit := flags&gcDrainFlushBgCredit != 0 // 是否只執行必定量的工做 idle := flags&gcDrainIdle != 0 // 記錄初始的已掃描數量 initScanWork := gcw.scanWork checkWork := int64(1<<63 - 1) var check func() bool if flags&(gcDrainIdle|gcDrainFractional) != 0 { // drainCheckThreshold 默認 100000 checkWork = initScanWork + drainCheckThreshold if idle { check = pollWork } else if flags&gcDrainFractional != 0 { check = pollFractionalWorkerExit } } // 若是根對象未掃描完, 則先掃描根對象 if work.markrootNext < work.markrootJobs { // 一直循環直到被搶佔或 STW for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) { // 從根對象掃描隊列取出一個值 job := atomic.Xadd(&work.markrootNext, +1) - 1 if job >= work.markrootJobs { break } // 執行根對象掃描工做 markroot(gcw, job) if check != nil && check() { goto done } } } ... }
gcDrain 函數在開始的時候,會根據 flags 不一樣而選擇不一樣的策略。
完成標記後會獲取待執行的任務:
func gcDrain(gcw *gcWork, flags gcDrainFlags) { ... // 根對象已經在標記隊列中, 消費標記隊列 // 一直循環直到被搶佔或 STW for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) { // 將本地一部分工做放回全局隊列中 if work.full == 0 { gcw.balance() } // 獲取任務 b := gcw.tryGetFast() if b == 0 { b = gcw.tryGet() if b == 0 { wbBufFlush(nil, 0) b = gcw.tryGet() } } // 獲取不到對象, 標記隊列已爲空, 跳出循環 if b == 0 { break } // 掃描獲取到的對象 scanobject(b, gcw) // 若是已經掃描了必定數量的對象,gcCreditSlack值是2000 if gcw.scanWork >= gcCreditSlack { // 把掃描的對象數量添加到全局 atomic.Xaddint64(&gcController.scanWork, gcw.scanWork) if flushBgCredit { // 記錄此次掃描的內存字節數用於減小輔助標記的工做量 gcFlushBgCredit(gcw.scanWork - initScanWork) initScanWork = 0 } checkWork -= gcw.scanWork gcw.scanWork = 0 if checkWork <= 0 { checkWork += drainCheckThreshold if check != nil && check() { break } } } } done: // 把掃描的對象數量添加到全局 if gcw.scanWork > 0 { atomic.Xaddint64(&gcController.scanWork, gcw.scanWork) if flushBgCredit { // 記錄此次掃描的內存字節數用於減小輔助標記的工做量 gcFlushBgCredit(gcw.scanWork - initScanWork) } gcw.scanWork = 0 } }
這裏在獲取緩存隊列以前會調用runtime.gcWork.balance
,會將gcWork緩存一部分工做放回全局隊列中,這個方法主要是用來平衡一下不一樣 P 的負載狀況。
而後獲取gcWork的緩存任務,並將獲取到的任務交給scanobject執行,該函數會從傳入的位置開始掃描,並會給找到的活躍對象上色。runtime.gcFlushBgCredit
會記錄此次掃描的內存字節數用於減小輔助標記的工做量。
這裏我來總結一下gcWork出入隊狀況。gcWork的出隊就是咱們上面的scanobject
方法,會獲取到 gcWork 緩存對象並執行,可是同時若是找到活躍對象也會再次的入隊到 gcWork 中。
除了 scanobject 之外,寫屏障、根對象掃描和棧掃描都會向 gcWork 中增長額外的灰色對象等待處理。