Go語言GC實現原理及源碼分析 (go1.15.7)[2]

Posted on 2021年3月20日 by luozhiyun
轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客: https://www.luozhiyun.com/arc...緩存

本文使用的 Go 的源碼1.15.7安全

stopTheWorldWithSema 與 startTheWorldWithSema

stopTheWorldWithSemastartTheWorldWithSema是一對用於暫停和恢復程序的核心函數。併發

func stopTheWorldWithSema() {
    _g_ := getg() 

    lock(&sched.lock)
    sched.stopwait = gomaxprocs
    // 標記 gcwaiting,調度時看見此標記會進入等待
    atomic.Store(&sched.gcwaiting, 1)
    // 發送搶佔信號
    preemptall() 
    // 暫停當前 P
    _g_.m.p.ptr().status = _Pgcstop // Pgcstop is only diagnostic.
    sched.stopwait--
    // 遍歷全部的 P ,修改 P 的狀態爲 _Pgcstop 中止運行
    for _, p := range allp {
        s := p.status
        if s == _Psyscall && atomic.Cas(&p.status, s, _Pgcstop) {
            if trace.enabled {
                traceGoSysBlock(p)
                traceProcStop(p)
            }
            p.syscalltick++
            sched.stopwait--
        }
    }
    // 中止空閒的 P 列表
    for {
        p := pidleget()
        if p == nil {
            break
        }
        p.status = _Pgcstop
        sched.stopwait--
    }
    wait := sched.stopwait > 0
    unlock(&sched.lock)
    if wait {
        for {
            //  等待 100 us
            if notetsleep(&sched.stopnote, 100*1000) {
                noteclear(&sched.stopnote)
                break
            }
            // 再次進行發送搶佔信號
            preemptall()
        }
    }
    // 安全檢測
    bad := ""
    if sched.stopwait != 0 {
        bad = "stopTheWorld: not stopped (stopwait != 0)"
    } else {
        for _, p := range allp {
            if p.status != _Pgcstop {
                bad = "stopTheWorld: not stopped (status != _Pgcstop)"
            }
        }
    }
    if atomic.Load(&freezing) != 0 {
        lock(&deadlock)
        lock(&deadlock)
    }
    if bad != "" {
        throw(bad)
    }
}

這個方法會經過sched.stopwait來檢測是否全部的P都已暫停。首先會經過調用preemptall發送搶佔信號進行搶佔全部運行中的G,而後遍歷P將全部狀態爲_Psyscall、空閒的P都暫停,若是仍有須要中止的P, 則等待它們中止。函數

func startTheWorldWithSema(emitTraceEvent bool) int64 {
    mp := acquirem() // disable preemption because it can be holding p in a local var
    // 判斷收到的 netpoll 事件並添加對應的G到待運行隊列
    if netpollinited() {
        list := netpoll(0) // non-blocking
        injectglist(&list)
    }
    lock(&sched.lock)

    procs := gomaxprocs
    if newprocs != 0 {
        procs = newprocs
        newprocs = 0
    }
  // 擴容或者縮容全局的處理器
    p1 := procresize(procs)
    // 取消GC等待標記
    sched.gcwaiting = 0
    // 若是 sysmon (後臺監控線程) 在等待則喚醒它
    if sched.sysmonwait != 0 {
        sched.sysmonwait = 0
        notewakeup(&sched.sysmonnote)
    }
    unlock(&sched.lock)
    // 喚醒有可運行任務的P
    for p1 != nil {
        p := p1
        p1 = p1.link.ptr()
        if p.m != 0 {
            mp := p.m.ptr()
            p.m = 0
            if mp.nextp != 0 {
                throw("startTheWorld: inconsistent mp->nextp")
            }
            mp.nextp.set(p)
            notewakeup(&mp.park)
        } else {
            // Start M to run P 
            newm(nil, p, -1)
        }
    } 
    startTime := nanotime()
    if emitTraceEvent {
        traceGCSTWDone()
    }
    // 若是有空閒的P,而且沒有自旋中的M則喚醒或者建立一個M
    wakep() 
    releasem(mp) 
    return startTime
}

建立後臺標記 Worker

func gcBgMarkStartWorkers() {
    // 遍歷全部 P
    for _, p := range allp {
        // 若是已啓動則不重複啓動
        if p.gcBgMarkWorker == 0 {
            // 爲全局每一個處理器建立用於執行後臺標記任務的 Goroutine
            go gcBgMarkWorker(p)
            // 啓動後等待該任務通知信號量 bgMarkReady 再繼續
            notetsleepg(&work.bgMarkReady, -1)
            noteclear(&work.bgMarkReady)
        }
    }
}

gcBgMarkStartWorkers會爲全局每一個 P 建立用於執行後臺標記任務的Goroutine,每個 Goroutine 都會運行gcBgMarkWorkernotetsleepg 會等待gcBgMarkWorker通知信號量bgMarkReady再繼續。ui

這裏雖然爲每一個P啓動了一個後臺標記任務, 可是能夠同時工做的只有25%,調度器在調度循環runtime.schedule中經過調用 gcController.findRunnableGCWorker方法進行控制。atom

在看這個方法以前,先來了解一個概念, Mark Worker Mode 標記工做模式,目前來講有三種,這三種是爲了保證後臺的標記線程的利用率。spa

type gcMarkWorkerMode int

const (
    // gcMarkWorkerDedicatedMode indicates that the P of a mark
    // worker is dedicated to running that mark worker. The mark
    // worker should run without preemption.
    gcMarkWorkerDedicatedMode gcMarkWorkerMode = iota

    // gcMarkWorkerFractionalMode indicates that a P is currently
    // running the "fractional" mark worker. The fractional worker
    // is necessary when GOMAXPROCS*gcBackgroundUtilization is not
    // an integer. The fractional worker should run until it is
    // preempted and will be scheduled to pick up the fractional
    // part of GOMAXPROCS*gcBackgroundUtilization.
    gcMarkWorkerFractionalMode

    // gcMarkWorkerIdleMode indicates that a P is running the mark
    // worker because it has nothing else to do. The idle worker
    // should run until it is preempted and account its time
    // against gcController.idleMarkTime.
    gcMarkWorkerIdleMode
)

經過代碼註釋能夠知道:pwa

  • gcMarkWorkerDedicatedMode :P 專門負責標記對象,不會被調度器搶佔;
  • gcMarkWorkerFractionalMode:主要是因爲如今默認標記線程的佔用率要爲 25%,因此若是 CPU 核數不是4的倍數,就沒法除得整數,啓動該類型的工做模式幫助垃圾收集達到利用率的目標;
  • gcMarkWorkerIdleMode:表示 P 當前只有標記線程在跑,沒有其餘能夠執行的 G ,它會運行垃圾收集的標記任務直到被搶佔;
func (c *gcControllerState) findRunnableGCWorker(_p_ *p) *g {
    ...
    // 原子減小對應的值, 若是減小後大於等於0則返回true, 不然返回false
    decIfPositive := func(ptr *int64) bool {
        if *ptr > 0 {
            if atomic.Xaddint64(ptr, -1) >= 0 {
                return true
            }
            // We lost a race
            atomic.Xaddint64(ptr, +1)
        }
        return false
    }
    // 減小dedicatedMarkWorkersNeeded, 成功時後臺標記任務的模式是Dedicated
    if decIfPositive(&c.dedicatedMarkWorkersNeeded) { 
        _p_.gcMarkWorkerMode = gcMarkWorkerDedicatedMode
    } else if c.fractionalUtilizationGoal == 0 {
        // No need for fractional workers.
        return nil
    } else {
        // 執行標記任務的時間
        delta := nanotime() - gcController.markStartTime 
        if delta > 0 && float64(_p_.gcFractionalMarkTime)/float64(delta) > c.fractionalUtilizationGoal {
            // Nope. No need to run a fractional worker.
            return nil
        }
        _p_.gcMarkWorkerMode = gcMarkWorkerFractionalMode
    }

    gp := _p_.gcBgMarkWorker.ptr()
    casgstatus(gp, _Gwaiting, _Grunnable) 
    return gp
}

看過個人《詳解Go語言調度循環源碼實現》的同窗應該都知道,搶佔調度運行到這裏的時候,一般是 P 搶佔不到 G 了,打算進行休眠了,所以在休眠以前能夠安全的進行標記任務的執行。線程

沒看過調度循環的同窗能夠看這裏:詳解Go語言調度循環源碼實現 https://www.luozhiyun.com/arc...code

併發掃描標記

併發掃描標記能夠大概歸納爲如下幾個部分:

  1. 將當前傳入的 P 打包成 parkInfo ,而後調用 gopark 讓當前 G 進入休眠,在休眠前會將 P 的 gcBgMarkWorker 與 G 進行綁定,等待喚醒;
  2. 根據 Mark Worker Mode 調用不一樣的策略調用 gcDrain 執行標記;
  3. 判斷是否全部後臺標記任務都完成, 而且沒有更多的任務,調用 gcMarkDone 準備進入完成標記階段;

後臺標記休眠等待

func gcBgMarkWorker(_p_ *p) {
    gp := getg()

    type parkInfo struct {
        m      muintptr  
        attach puintptr  
    }  
    gp.m.preemptoff = "GC worker init"
    // 初始化 park
    park := new(parkInfo)
    gp.m.preemptoff = ""
    // 設置當前的M並禁止搶佔
    park.m.set(acquirem())
    // 設置當前的P
    park.attach.set(_p_) 
    // 通知gcBgMarkStartWorkers能夠繼續處理
    notewakeup(&work.bgMarkReady)

    for { 
        // 讓當前 G 進入休眠
        gopark(func(g *g, parkp unsafe.Pointer) bool {
            park := (*parkInfo)(parkp)

            releasem(park.m.ptr()) 
            // 設置關聯的 P
            if park.attach != 0 {
                p := park.attach.ptr()
                park.attach.set(nil) 
                // 把當前的G設到P的gcBgMarkWorker成員
                if !p.gcBgMarkWorker.cas(0, guintptr(unsafe.Pointer(g))) { 
                    return false
                }
            }
            return true
        }, unsafe.Pointer(park), waitReasonGCWorkerIdle, traceEvGoBlock, 0)
        ...
    }
}

在 gcBgMarkStartWorkers 中咱們看到,它會遍歷全部的 P ,而後爲每一個 P 建立一個負責 Mark Work 的 G,這裏雖然爲每一個 P 啓動了一個後臺標記任務, 可是不可能每一個 P 都會去執行標記任務,後臺標記任務默認資源佔用率是 25%,因此 gcBgMarkWorker 中會初始化 park 並將 G 和 P 的 gcBgMarkWorker 進行綁定後進行休眠。

調度器在調度循環runtime.schedule中經過調用gcController.findRunnableGCWorker方法進行控制,讓哪些 Mark Work 能夠執行,上面代碼已經貼過了,這裏就不重複了

後臺標記

在喚醒後,咱們會根據gcMarkWorkerMode選擇不一樣的標記執行策略,不一樣的執行策略都會調用runtime.gcDrain:

func gcBgMarkWorker(_p_ *p) {
    gp := getg()
    ...
    for { 
        ...
        // 檢查P的gcBgMarkWorker是否和當前的G一致, 不一致時結束當前的任務
        if _p_.gcBgMarkWorker.ptr() != gp {
            break
        }
        // 禁止G被搶佔
        park.m.set(acquirem())

        // 記錄開始時間
        startTime := nanotime()
        _p_.gcMarkWorkerStartTime = startTime

        decnwait := atomic.Xadd(&work.nwait, -1)

        systemstack(func() { 
            // 設置G的狀態爲等待中這樣它的棧能夠被掃描
            casgstatus(gp, _Grunning, _Gwaiting)
            // 判斷後臺標記任務的模式
            switch _p_.gcMarkWorkerMode {
            default:
                throw("gcBgMarkWorker: unexpected gcMarkWorkerMode")
            case gcMarkWorkerDedicatedMode:
                // 這個模式下P應該專心執行標記
                gcDrain(&_p_.gcw, gcDrainUntilPreempt|gcDrainFlushBgCredit)
                if gp.preempt { 
                    // 被搶佔時把本地運行隊列中的全部G都踢到全局運行隊列
                    lock(&sched.lock)
                    for {
                        gp, _ := runqget(_p_)
                        if gp == nil {
                            break
                        }
                        globrunqput(gp)
                    }
                    unlock(&sched.lock)
                }
                // 繼續執行標記
                gcDrain(&_p_.gcw, gcDrainFlushBgCredit)
            case gcMarkWorkerFractionalMode:
                // 執行標記
                gcDrain(&_p_.gcw, gcDrainFractional|gcDrainUntilPreempt|gcDrainFlushBgCredit)
            case gcMarkWorkerIdleMode:
                // 執行標記, 直到被搶佔或者達到必定的量
                gcDrain(&_p_.gcw, gcDrainIdle|gcDrainUntilPreempt|gcDrainFlushBgCredit)
            }
            // 恢復G的狀態到運行中
            casgstatus(gp, _Gwaiting, _Grunning)
        }) 
        ...
    }
}

在上面已經講了不一樣的 Mark Worker Mode 的區別,不記得的同窗能夠往上翻一下。執行標記這部分主要在 switch 判斷中,根據不一樣的模式傳入不一樣的參數到 gcDrain 函數中執行。

須要注意的是,傳入到 gcDrain 中的是一個 gcWork 的結構體,它至關於每一個 P 的私有緩存空間,存放須要被掃描的對象,爲垃圾收集器提供了生產和消費任務的抽象,,該結構體持有了兩個重要的工做緩衝區 wbuf1 和 wbuf2:

當咱們向該結構體中增長或者刪除對象時,它總會先操做 wbuf1 緩衝區,一旦 wbuf1 緩衝區空間不足或者沒有對象,會觸發緩衝區的切換,而當兩個緩衝區空間都不足或者都爲空時,會從全局的工做緩衝區中插入或者獲取對象:

func (w *gcWork) tryGet() uintptr {
    wbuf := w.wbuf1
    ...
    // wbuf1緩衝區無數據時
    if wbuf.nobj == 0 {
        // wbuf1 與 wbuf2 進行對象互換
        w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1
        wbuf = w.wbuf1
        if wbuf.nobj == 0 {
            owbuf := wbuf
            // 從 work 的 full 隊列中獲取
            wbuf = trygetfull()
            ...
        }
    } 
    wbuf.nobj--
    return wbuf.obj[wbuf.nobj]
}

當咱們向該結構體中增長或者刪除對象時,它總會先操做 wbuf1 緩衝區,一旦 wbuf1 緩衝區空間不足或者沒有對象,會觸發緩衝區的切換,而當兩個緩衝區空間都不足或者都爲空時,會從全局的工做緩衝區中插入或者獲取對象:

func (w *gcWork) tryGet() uintptr {
    wbuf := w.wbuf1
    ...
    // wbuf1緩衝區無數據時
    if wbuf.nobj == 0 {
        // wbuf1 與 wbuf2 進行對象互換
        w.wbuf1, w.wbuf2 = w.wbuf2, w.wbuf1
        wbuf = w.wbuf1
        if wbuf.nobj == 0 {
            owbuf := wbuf
            // 從 work 的 full 隊列中獲取
            wbuf = trygetfull()
            ...
        }
    } 
    wbuf.nobj--
    return wbuf.obj[wbuf.nobj]
}

繼續上面的 gcBgMarkWorker 方法,在標記完以後就要進行標記完成:

func gcBgMarkWorker(_p_ *p) {
    gp := getg()
    ...
    for { 
        ...  
        // 累加所用時間
        duration := nanotime() - startTime
        switch _p_.gcMarkWorkerMode {
        case gcMarkWorkerDedicatedMode:
            atomic.Xaddint64(&gcController.dedicatedMarkTime, duration)
            atomic.Xaddint64(&gcController.dedicatedMarkWorkersNeeded, 1)
        case gcMarkWorkerFractionalMode:
            atomic.Xaddint64(&gcController.fractionalMarkTime, duration)
            atomic.Xaddint64(&_p_.gcFractionalMarkTime, duration)
        case gcMarkWorkerIdleMode:
            atomic.Xaddint64(&gcController.idleMarkTime, duration)
        }

        incnwait := atomic.Xadd(&work.nwait, +1)

        // 判斷是否全部後臺標記任務都完成, 而且沒有更多的任務
        if incnwait == work.nproc && !gcMarkWorkAvailable(nil) { 
            // 取消和P的關聯
            _p_.gcBgMarkWorker.set(nil)
            // 容許G被搶佔
            releasem(park.m.ptr())
            // 準備進入完成標記階段
            gcMarkDone()

            // 休眠以前會從新關聯P
            // 由於上面容許被搶佔, 到這裏的時候可能就會變成其餘P
            // 若是從新關聯P失敗則這個任務會結束
            park.m.set(acquirem())
            park.attach.set(_p_)
        }
    }
}

gcBgMarkWorker 會根據 incnwait 來檢查是不是最後一個 worker,而後調用 gcMarkWorkAvailable 函數來校驗 gcwork的任務和全局任務是否已經所有都處理完了,若是都確認沒問題,那麼調用 gcMarkDone 進入完成標記階段。

標記掃描

下面咱們來看看 gcDrain:

func gcDrain(gcw *gcWork, flags gcDrainFlags) {
    gp := getg().m.curg
    // 看到搶佔標誌時是否要返回
    preemptible := flags&gcDrainUntilPreempt != 0
    // 是否計算後臺的掃描量來減小協助線程和喚醒等待中的G
    flushBgCredit := flags&gcDrainFlushBgCredit != 0
    // 是否只執行必定量的工做
    idle := flags&gcDrainIdle != 0
    // 記錄初始的已掃描數量
    initScanWork := gcw.scanWork

    checkWork := int64(1<<63 - 1)
    var check func() bool
    if flags&(gcDrainIdle|gcDrainFractional) != 0 {
        // drainCheckThreshold 默認 100000
        checkWork = initScanWork + drainCheckThreshold
        if idle {
            check = pollWork
        } else if flags&gcDrainFractional != 0 {
            check = pollFractionalWorkerExit
        }
    } 
    // 若是根對象未掃描完, 則先掃描根對象
    if work.markrootNext < work.markrootJobs {
        // 一直循環直到被搶佔或 STW
        for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) {
            // 從根對象掃描隊列取出一個值
            job := atomic.Xadd(&work.markrootNext, +1) - 1
            if job >= work.markrootJobs {
                break
            }
            // 執行根對象掃描工做
            markroot(gcw, job)
            if check != nil && check() {
                goto done
            }
        }
    } 
    ...
}

gcDrain 函數在開始的時候,會根據 flags 不一樣而選擇不一樣的策略。

  • gcDrainUntilPreempt:當 G 被搶佔時返回;
  • gcDrainIdle:調用 runtime.pollWork,當 P 上包含其餘待執行 G 時返回;
  • gcDrainFractional:調用 runtime.pollFractionalWorkerExit,當 CPU 的佔用率超過 fractionalUtilizationGoal 的 20% 時返回;
    設置完 check 變量後就能夠執行 runtime.markroot進行根對象掃描,每次掃描完畢都會調用 check 函數校驗是否應該退出標記任務,若是是那麼就跳到 done 代碼塊中退出標記。

完成標記後會獲取待執行的任務:

func gcDrain(gcw *gcWork, flags gcDrainFlags) {
    ...
    // 根對象已經在標記隊列中, 消費標記隊列
    // 一直循環直到被搶佔或 STW
    for !(gp.preempt && (preemptible || atomic.Load(&sched.gcwaiting) != 0)) { 
        // 將本地一部分工做放回全局隊列中
        if work.full == 0 {
            gcw.balance()
        }
        // 獲取任務
        b := gcw.tryGetFast()
        if b == 0 {
            b = gcw.tryGet()
            if b == 0 { 
                wbBufFlush(nil, 0)
                b = gcw.tryGet()
            }
        }
        // 獲取不到對象, 標記隊列已爲空, 跳出循環
        if b == 0 { 
            break
        }
        // 掃描獲取到的對象
        scanobject(b, gcw)

        // 若是已經掃描了必定數量的對象,gcCreditSlack值是2000
        if gcw.scanWork >= gcCreditSlack {
            // 把掃描的對象數量添加到全局
            atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)
            if flushBgCredit {
                // 記錄此次掃描的內存字節數用於減小輔助標記的工做量
                gcFlushBgCredit(gcw.scanWork - initScanWork)
                initScanWork = 0
            }
            checkWork -= gcw.scanWork
            gcw.scanWork = 0

            if checkWork <= 0 {
                checkWork += drainCheckThreshold
                if check != nil && check() {
                    break
                }
            }
        }
    }

done:
    // 把掃描的對象數量添加到全局
    if gcw.scanWork > 0 {
        atomic.Xaddint64(&gcController.scanWork, gcw.scanWork)
        if flushBgCredit {
            // 記錄此次掃描的內存字節數用於減小輔助標記的工做量
            gcFlushBgCredit(gcw.scanWork - initScanWork)
        }
        gcw.scanWork = 0
    }
}

這裏在獲取緩存隊列以前會調用runtime.gcWork.balance,會將gcWork緩存一部分工做放回全局隊列中,這個方法主要是用來平衡一下不一樣 P 的負載狀況。

而後獲取gcWork的緩存任務,並將獲取到的任務交給scanobject執行,該函數會從傳入的位置開始掃描,並會給找到的活躍對象上色。runtime.gcFlushBgCredit會記錄此次掃描的內存字節數用於減小輔助標記的工做量。

這裏我來總結一下gcWork出入隊狀況。gcWork的出隊就是咱們上面的scanobject方法,會獲取到 gcWork 緩存對象並執行,可是同時若是找到活躍對象也會再次的入隊到 gcWork 中。

除了 scanobject 之外,寫屏障、根對象掃描和棧掃描都會向 gcWork 中增長額外的灰色對象等待處理。

相關文章
相關標籤/搜索