深刻理解Go-goroutine的實現及Scheduler分析

在學習Go的過程當中,最讓人驚歎的莫過於goroutine了。可是goroutine是什麼,咱們用go關鍵字就能夠建立一個goroutine,這麼多的goroutine之間,是如何調度的呢?html

1. 結構概覽

在看Go源碼的過程當中,遍地可見g、p、m,咱們首先就看一下這些關鍵字的結構及相互之間的關係web

1.1. 數據結構

這裏咱們僅列出來告終構體裏面比較關鍵的一些成員bootstrap

1.1.1. G(gouroutine)

goroutine是運行時的最小執行單元segmentfault

type g struct {
    // Stack parameters.
    // stack describes the actual stack memory: [stack.lo, stack.hi).
    // stackguard0 is the stack pointer compared in the Go stack growth prologue.
    // It is stack.lo+StackGuard normally, but can be StackPreempt to trigger a preemption.
    // stackguard1 is the stack pointer compared in the C stack growth prologue.
    // It is stack.lo+StackGuard on g0 and gsignal stacks.
    // It is ~0 on other goroutine stacks, to trigger a call to morestackc (and crash).
  // 當前g使用的棧空間,stack結構包括 [lo, hi]兩個成員
    stack       stack   // offset known to runtime/cgo
  // 用於檢測是否須要進行棧擴張,go代碼使用
    stackguard0 uintptr // offset known to liblink
  // 用於檢測是否須要進行棧擴展,原生代碼使用的
    stackguard1 uintptr // offset known to liblink
  // 當前g所綁定的m
    m              *m      // current m; offset known to arm liblink
  // 當前g的調度數據,當goroutine切換時,保存當前g的上下文,用於恢復
    sched          gobuf
    // g當前的狀態
    atomicstatus   uint32
  // 當前g的id
    goid           int64
  // 下一個g的地址,經過guintptr結構體的ptr set函數能夠設置和獲取下一個g,經過這個字段和sched.gfreeStack sched.gfreeNoStack 能夠把 free g串成一個鏈表
    schedlink      guintptr
  // 判斷g是否容許被搶佔
    preempt        bool       // preemption signal, duplicates stackguard0 = stackpreempt
    // g是否要求要回到這個M執行, 有的時候g中斷了恢復會要求使用原來的M執行
    lockedm        muintptr
}

1.1.2. P(process)

P是M運行G所需的資源windows

type p struct {
   lock mutex

   id          int32
   // p的狀態,稍後介紹
   status      uint32 // one of pidle/prunning/...
   // 下一個p的地址,可參考 g.schedlink
   link        puintptr
   // p所關聯的m
   m           muintptr   // back-link to associated m (nil if idle)
   // 內存分配的時候用的,p所屬的m的mcache用的也是這個
   mcache      *mcache
  
   // Cache of goroutine ids, amortizes accesses to runtime·sched.goidgen.
   // 從sched中獲取並緩存的id,避免每次分配goid都從sched分配
     goidcache    uint64
     goidcacheend uint64

   // Queue of runnable goroutines. Accessed without lock.
   // p 本地的runnbale的goroutine造成的隊列
   runqhead uint32
   runqtail uint32
   runq     [256]guintptr
   // runnext, if non-nil, is a runnable G that was ready'd by
   // the current G and should be run next instead of what's in
   // runq if there's time remaining in the running G's time
   // slice. It will inherit the time left in the current time
   // slice. If a set of goroutines is locked in a
   // communicate-and-wait pattern, this schedules that set as a
   // unit and eliminates the (potentially large) scheduling
   // latency that otherwise arises from adding the ready'd
   // goroutines to the end of the run queue.
   // 下一個執行的g,若是是nil,則從隊列中獲取下一個執行的g
   runnext guintptr

   // Available G's (status == Gdead)
   // 狀態爲 Gdead的g的列表,能夠進行復用
   gfree    *g
   gfreecnt int32
}

1.1.3. M(machine)

type m struct {
   // g0是用於調度和執行系統調用的特殊g
   g0      *g     // goroutine with scheduling stack
     // m當前運行的g
   curg          *g       // current running goroutine
   // 當前擁有的p
   p             puintptr // attached p for executing go code (nil if not executing go code)
   // 線程的 local storage
   tls           [6]uintptr   // thread-local storage
   // 喚醒m時,m會擁有這個p
   nextp         puintptr
   id            int64
   // 若是 !="", 繼續運行curg
   preemptoff    string // if != "", keep curg running on this m
   // 自旋狀態,用於判斷m是否工做已結束,並尋找g進行工做
   spinning      bool // m is out of work and is actively looking for work
   // 用於判斷m是否進行休眠狀態
   blocked       bool // m is blocked on a note
     // m休眠和喚醒經過這個,note裏面有一個成員key,對這個key所指向的地址進行值的修改,進而達到喚醒和休眠的目的
   park          note
   // 全部m組成的一個鏈表
   alllink       *m // on allm
   // 下一個m,經過這個字段和sched.midle 能夠串成一個m的空閒鏈表
   schedlink     muintptr
   // mcache,m擁有p的時候,會把本身的mcache給p
   mcache        *mcache
   // lockedm的對應值
   lockedg       guintptr
   // 待釋放的m的list,經過sched.freem 串成一個鏈表
   freelink      *m      // on sched.freem
}

1.1.4. sched

type schedt struct {
   // 全局的go id分配
   goidgen  uint64
   // 記錄的最後一次從i/o中查詢g的時間
   lastpoll uint64

   lock mutex

   // When increasing nmidle, nmidlelocked, nmsys, or nmfreed, be
   // sure to call checkdead().
     // m的空閒鏈表,結合m.schedlink 就能夠組成一個空閒鏈表了
   midle        muintptr // idle m's waiting for work
   nmidle       int32    // number of idle m's waiting for work
   nmidlelocked int32    // number of locked m's waiting for work
   // 下一個m的id,也用來記錄建立的m數量
   mnext        int64    // number of m's that have been created and next M ID
   // 最多容許的m的數量
   maxmcount    int32    // maximum number of m's allowed (or die)
   nmsys        int32    // number of system m's not counted for deadlock
   // free掉的m的數量,exit的m的數量
   nmfreed      int64    // cumulative number of freed m's

   ngsys uint32 // number of system goroutines; updated atomically

   pidle      puintptr // idle p's
   npidle     uint32
   nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.

   // Global runnable queue.
   // 這個就是全局的g的隊列了,若是p的本地隊列沒有g或者太多,會跟全局隊列進行平衡
   // 根據runqhead能夠獲取隊列頭的g,而後根據g.schedlink 獲取下一個,從而造成了一個鏈表
   runqhead guintptr
   runqtail guintptr
   runqsize int32

   // freem is the list of m's waiting to be freed when their
   // m.exited is set. Linked through m.freelink.
   // 等待釋放的m的列表
   freem *m
}

在這裏插一下狀態的解析數組

1.1.5. g.status

  • _Gidle: goroutine剛剛建立尚未初始化
  • _Grunnable: goroutine處於運行隊列中,可是尚未運行,沒有本身的棧
  • _Grunning: 這個狀態的g可能處於運行用戶代碼的過程當中,擁有本身的m和p
  • _Gsyscall: 運行systemcall中
  • _Gwaiting: 這個狀態的goroutine正在阻塞中,相似於等待channel
  • _Gdead: 這個狀態的g沒有被使用,有多是剛剛退出,也有多是正在初始化中
  • _Gcopystack: 表示g當前的棧正在被移除,新棧分配中

1.1.6. p.status

  • _Pidle: 空閒狀態,此時p不綁定m
  • _Prunning: m獲取到p的時候,p的狀態就是這個狀態了,而後m可使用這個p的資源運行g
  • _Psyscall: 當go調用原生代碼,原生代碼又反過來調用go的時候,使用的p就會變成此態
  • _Pdead: 當運行中,須要減小p的數量時,被減掉的p的狀態就是這個了

1.1.7. m.status

m的status沒有p、g的那麼明確,可是在運行流程的分析中,主要有如下幾個狀態緩存

  • 運行中: 拿到p,執行g的過程當中
  • 運行原生代碼: 正在執行原聲代碼或者阻塞的syscall
  • 休眠中: m發現無待運行的g時,進入休眠,並加入到空閒列表中
  • 自旋中(spining): 當前工做結束,正在尋找下一個待運行的g

在上面的結構中,存在不少的鏈表,g m p結構中還有指向對方地址的成員,那麼他們的關係究竟是什麼樣的數據結構

咱們能夠從上圖,簡單的表述一下 m p g的關係app

2. 流程概覽

從下圖,能夠簡單的一窺go的整個調度流程的大概less

接下來咱們就從源碼的角度來具體的分析整個調度流程(本人彙編不照,彙編方面的就不分析了🤪)

3. 源碼分析

3.1. 初始化

go的啓動流程分爲4步

  1. call osinit, 這裏就是設置了全局變量ncpu = cpu核心數量
  2. call schedinit
  3. make & queue new G (runtime.newproc, go func()也是調用這個函數來建立goroutine)
  4. call runtime·mstart

其中,schedinit 就是調度器的初始化,出去schedinit 中對內存分配,垃圾回收等操做,針對調度器的初始化大體就是初始化自身,設置最大的maxmcount, 肯定p的數量並初始化這些操做

3.1.1. schedinit

schedinit這裏對當前m進行了初始化,並根據osinit獲取到的cpu核數和設置的GOMAXPROCS 肯定p的數量,並進行初始化

func schedinit() {
    // 從TLS或者專用寄存器獲取當前g的指針類型
    _g_ := getg()
    // 設置m最大的數量
    sched.maxmcount = 10000

    // 初始化棧的複用空間
    stackinit()
    // 初始化當前m
    mcommoninit(_g_.m)

    // osinit的時候會設置 ncpu這個全局變量,這裏就是根據cpu核心數和參數GOMAXPROCS來肯定p的數量
    procs := ncpu
    if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
        procs = n
    }
    // 生成設定數量的p
    if procresize(procs) != nil {
        throw("unknown runnable goroutine during bootstrap")
    }
}

3.1.2. mcommoninit

func mcommoninit(mp *m) {
    _g_ := getg()

    lock(&sched.lock)
    // 判斷mnext的值是否溢出,mnext須要賦值給m.id
    if sched.mnext+1 < sched.mnext {
        throw("runtime: thread ID overflow")
    }
    mp.id = sched.mnext
    sched.mnext++
    // 判斷m的數量是否比maxmcount設定的要多,若是超出直接報異常
    checkmcount()
    // 建立一個新的g用於處理signal,並分配棧
    mpreinit(mp)
    if mp.gsignal != nil {
        mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
    }

    // Add to allm so garbage collector doesn't free g->m
    // when it is just in a register or thread-local storage.
    // 接下來的兩行,首先將當前m放到allm的頭,而後原子操做,將當前m的地址,賦值給m,這樣就將當前m添加到了allm鏈表的頭了
    mp.alllink = allm

    // NumCgoCall() iterates over allm w/o schedlock,
    // so we need to publish it safely.
    atomicstorep(unsafe.Pointer(&allm), unsafe.Pointer(mp))
    unlock(&sched.lock)

    // Allocate memory to hold a cgo traceback if the cgo call crashes.
    if iscgo || GOOS == "solaris" || GOOS == "windows" {
        mp.cgoCallers = new(cgoCallers)
    }
}

在這裏就開始涉及到了m鏈表了,這個鏈表能夠以下圖表示,其餘的p g鏈表能夠參考,只是使用的結構體的字段不同

3.1.3. allm鏈表示意圖

3.1.4. procresize

更改p的數量,多退少補的原則,在初始化過程當中,因爲最開始是沒有p的,因此這裏的做用就是初始化設定數量的p了

procesize 不只在初始化的時候會調用,當用戶手動調用 runtime.GOMAXPROCS 的時候,會從新設定 nprocs,而後執行 startTheWorld()startTheWorld()會是使用新的 nprocs 再次調用procresize 這個方法

func procresize(nprocs int32) *p {
    old := gomaxprocs
    if old < 0 || nprocs <= 0 {
        throw("procresize: invalid arg")
    }
    // update statistics
    now := nanotime()
    if sched.procresizetime != 0 {
        sched.totaltime += int64(old) * (now - sched.procresizetime)
    }
    sched.procresizetime = now

    // Grow allp if necessary.
    // 若是新給的p的數量比原先的p的數量多,則新建增加的p
    if nprocs > int32(len(allp)) {
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        // 判斷allp 的cap是否知足增加後的長度,知足就直接使用,不知足,則須要擴張這個slice
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            nallp := make([]*p, nprocs)
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's
    // 初始化新增的p
    for i := int32(0); i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            pp = new(p)
            pp.id = i
            pp.status = _Pgcstop
            pp.sudogcache = pp.sudogbuf[:0]
            for i := range pp.deferpool {
                pp.deferpool[i] = pp.deferpoolbuf[i][:0]
            }
            pp.wbBuf.reset()
            // allp是一個slice,直接將新增的p放到對應的索引下面就ok了
            atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
        }
        if pp.mcache == nil {
            // 初始化時,old=0,第一個新建的p給當前的m使用
            if old == 0 && i == 0 {
                if getg().m.mcache == nil {
                    throw("missing mcache?")
                }
                pp.mcache = getg().m.mcache // bootstrap
            } else {
                // 爲p分配內存
                pp.mcache = allocmcache()
            }
        }
    }

    // free unused P's
    // 釋放掉多餘的p,當新設置的p的數量,比原先設定的p的數量少的時候,會走到這個流程
    // 經過 runtime.GOMAXPROCS 就能夠動態的修改nprocs
    for i := nprocs; i < old; i++ {
        p := allp[i]
        // move all runnable goroutines to the global queue
        // 把當前p的運行隊列裏的g轉移到全局的g的隊列
        for p.runqhead != p.runqtail {
            // pop from tail of local queue
            p.runqtail--
            gp := p.runq[p.runqtail%uint32(len(p.runq))].ptr()
            // push onto head of global queue
            globrunqputhead(gp)
        }
        // 把runnext裏的g也轉移到全局隊列
        if p.runnext != 0 {
            globrunqputhead(p.runnext.ptr())
            p.runnext = 0
        }
        // if there's a background worker, make it runnable and put
        // it on the global queue so it can clean itself up
        // 若是有gc worker的話,修改g的狀態,而後再把它放到全局隊列中
        if gp := p.gcBgMarkWorker.ptr(); gp != nil {
            casgstatus(gp, _Gwaiting, _Grunnable)
            globrunqput(gp)
            // This assignment doesn't race because the
            // world is stopped.
            p.gcBgMarkWorker.set(nil)
        }
        // sudoig的buf和cache,以及deferpool所有清空
        for i := range p.sudogbuf {
            p.sudogbuf[i] = nil
        }
        p.sudogcache = p.sudogbuf[:0]
        for i := range p.deferpool {
            for j := range p.deferpoolbuf[i] {
                p.deferpoolbuf[i][j] = nil
            }
            p.deferpool[i] = p.deferpoolbuf[i][:0]
        }
        // 釋放掉當前p的mcache
        freemcache(p.mcache)
        p.mcache = nil
        // 把當前p的gfree轉移到全局
        gfpurge(p)
        // 修改p的狀態,讓他自生自滅去了
        p.status = _Pdead
        // can't free P itself because it can be referenced by an M in syscall
    }

    // Trim allp.
    if int32(len(allp)) != nprocs {
        lock(&allpLock)
        allp = allp[:nprocs]
        unlock(&allpLock)
    }
    // 判斷當前g是否有p,有的話更改當前使用的p的狀態,繼續使用
    _g_ := getg()
    if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
        // continue to use the current P
        _g_.m.p.ptr().status = _Prunning
    } else {
        // release the current P and acquire allp[0]
        // 若是當前g有p,可是擁有的是已經釋放的p,則再也不使用這個p,從新分配
        if _g_.m.p != 0 {
            _g_.m.p.ptr().m = 0
        }
        // 分配allp[0]給當前g使用
        _g_.m.p = 0
        _g_.m.mcache = nil
        p := allp[0]
        p.m = 0
        p.status = _Pidle
        // 將p m g綁定,並把m.mcache指向p.mcache,並修改p的狀態爲_Prunning
        acquirep(p)
    }
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        if _g_.m.p.ptr() == p {
            continue
        }
        p.status = _Pidle
        // 根據 runqempty 來判斷當前p的g運行隊列是否爲空
        if runqempty(p) {
            // g運行隊列爲空的p,放到 sched的pidle隊列裏面
            pidleput(p)
        } else {
            // g 運行隊列不爲空的p,組成一個可運行隊列,並最後返回
            p.m.set(mget())
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    stealOrder.reset(uint32(nprocs))
    var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
    atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
    return runnablePs
}
  • runqempty: 這個函數比較簡單,就不深究了,就是根據 p.runqtail == p.runqhead 和 p.runnext 來判斷有沒有待運行的g
  • pidleput: 將當前的p設置爲 sched.pidle,而後根據p.link將空閒p串聯起來,可參考上圖allm的鏈表示意圖

3.2. 任務

建立一個goroutine,只須要使用 go func 就能夠了,編譯器會將go func 翻譯成 newproc 進行調用,那麼新建的任務是如何調用的呢,咱們從建立開始進行跟蹤

3.2.1. newproc

newproc 函數獲取了參數和當前g的pc信息,並經過g0調用newproc1去真正的執行建立或獲取可用的g

func newproc(siz int32, fn *funcval) {
    // 獲取第一參數地址
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    // 獲取當前執行的g
    gp := getg()
    // 獲取當前g的pc
    pc := getcallerpc()
    systemstack(func() {
        // 使用g0去執行newproc1函數
        newproc1(fn, (*uint8)(argp), siz, gp, pc)
    })
}

3.2.2. newproc1

newporc1 的做用就是建立或者獲取一個空間的g,初始化這個g,並嘗試尋找一個p和m去執行g

func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
    _g_ := getg()

    if fn == nil {
        _g_.m.throwing = -1 // do not dump full stacks
        throw("go of nil func value")
    }
    // 加鎖禁止被搶佔
    _g_.m.locks++ // disable preemption because it can be holding p in a local var
    siz := narg
    siz = (siz + 7) &^ 7

    // We could allocate a larger initial stack if necessary.
    // Not worth it: this is almost always an error.
    // 4*sizeof(uintreg): extra space added below
    // sizeof(uintreg): caller's LR (arm) or return address (x86, in gostartcall).
    // 若是參數過多,則直接拋出異常,棧大小是2k
    if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
        throw("newproc: function arguments too large for new goroutine")
    }

    _p_ := _g_.m.p.ptr()
    // 嘗試獲取一個空閒的g,若是獲取不到,則新建一個,並添加到allg裏面
    // gfget首先會嘗試從p本地獲取空閒的g,若是本地沒有的話,則從全局獲取一堆平衡到本地p
    newg := gfget(_p_)
    if newg == nil {
        newg = malg(_StackMin)
        casgstatus(newg, _Gidle, _Gdead)
        // 新建的g,添加到全局的 allg裏面,allg是一個slice, append進去便可
        allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
    }
    // 判斷獲取的g的棧是否正常
    if newg.stack.hi == 0 {
        throw("newproc1: newg missing stack")
    }
    // 判斷g的狀態是否正常
    if readgstatus(newg) != _Gdead {
        throw("newproc1: new g is not Gdead")
    }
    // 預留一點空間,防止讀取超出一點點
    totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
    // 空間大小進行對齊
    totalSize += -totalSize & (sys.SpAlign - 1) // align to spAlign
    sp := newg.stack.hi - totalSize
    spArg := sp
    // usesLr 爲0,這裏不執行
    if usesLR {
        // caller's LR
        *(*uintptr)(unsafe.Pointer(sp)) = 0
        prepGoExitFrame(sp)
        spArg += sys.MinFrameSize
    }
    if narg > 0 {
        // 將參數拷貝入棧
        memmove(unsafe.Pointer(spArg), unsafe.Pointer(argp), uintptr(narg))
        // ... 省略 ...
    }
    // 初始化用於保存現場的區域及初始化基本狀態
    memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
    newg.sched.sp = sp
    newg.stktopsp = sp
    // 這裏保存了goexit的地址,在用戶函數執行完成後,會根據pc來執行goexit
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    // 這裏調整 sched 信息,pc = goexit的地址
    gostartcallfn(&newg.sched, fn)
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    if _g_.m.curg != nil {
        newg.labels = _g_.m.curg.labels
    }
    if isSystemGoroutine(newg) {
        atomic.Xadd(&sched.ngsys, +1)
    }
    newg.gcscanvalid = false
    casgstatus(newg, _Gdead, _Grunnable)
    // 若是p緩存的goid已經用完,本地再從sched批量獲取一點
    if _p_.goidcache == _p_.goidcacheend {
        // Sched.goidgen is the last allocated id,
        // this batch must be [sched.goidgen+1, sched.goidgen+GoidCacheBatch].
        // At startup sched.goidgen=0, so main goroutine receives goid=1.
        _p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
        _p_.goidcache -= _GoidCacheBatch - 1
        _p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
    }
    // 分配goid
    newg.goid = int64(_p_.goidcache)
    _p_.goidcache++
    // 把新的g放到 p 的可運行g隊列中
    runqput(_p_, newg, true)
    // 判斷是否有空閒p,且是否須要喚醒一個m來執行g
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
        wakep()
    }
    _g_.m.locks--
    if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
        _g_.stackguard0 = stackPreempt
    }
}

3.2.2.1. gfget

這個函數的邏輯比較簡單,就是看一下p有沒有空閒的g,沒有則去全局的freeg隊列查找,這裏就涉及了p本地和全局平衡的一個交互了

func gfget(_p_ *p) *g {
retry:
    gp := _p_.gfree
    // 本地的g隊列爲空,且全局隊列不爲空,則從全局隊列一次獲取至多32個下來,若是全局隊列不夠就算了
    if gp == nil && (sched.gfreeStack != nil || sched.gfreeNoStack != nil) {
        lock(&sched.gflock)
        for _p_.gfreecnt < 32 {
            if sched.gfreeStack != nil {
                // Prefer Gs with stacks.
                gp = sched.gfreeStack
                sched.gfreeStack = gp.schedlink.ptr()
            } else if sched.gfreeNoStack != nil {
                gp = sched.gfreeNoStack
                sched.gfreeNoStack = gp.schedlink.ptr()
            } else {
                break
            }
            _p_.gfreecnt++
            sched.ngfree--
            gp.schedlink.set(_p_.gfree)
            _p_.gfree = gp
        }
        // 已經從全局拿了g了,再去從頭開始判斷
        unlock(&sched.gflock)
        goto retry
    }
    // 若是拿到了g,則判斷g是否有棧,沒有棧就分配
    // 棧的分配跟內存分配差很少,首先建立幾個固定大小的棧的數組,而後到指定大小的數組裏面去分配就ok了,過大則直接全局分配
    if gp != nil {
        _p_.gfree = gp.schedlink.ptr()
        _p_.gfreecnt--
        if gp.stack.lo == 0 {
            // Stack was deallocated in gfput. Allocate a new one.
            systemstack(func() {
                gp.stack = stackalloc(_FixedStack)
            })
            gp.stackguard0 = gp.stack.lo + _StackGuard
        } else {
            // ... 省略 ...
        }
    }
    // 注意: 若是全局沒有g,p也沒有g,則返回的gp仍是nil
    return gp
}

3.2.2.2. runqput

runqput會把g放到p的本地隊列或者p.runnext,若是p的本地隊列過長,則把g到全局隊列,同時平衡p本地隊列的一半到全局

func runqput(_p_ *p, gp *g, next bool) {
    if randomizeScheduler && next && fastrand()%2 == 0 {
        next = false
    }
    // 若是next爲true,則放入到p.runnext裏面,並把原先runnext的g交換出來
    if next {
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
            goto retryNext
        }
        if oldnext == 0 {
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr()
    }

retry:
    h := atomic.Load(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    // 判斷p的隊列的長度是否超了, runq是一個長度爲256的數組,超出的話就會放到全局隊列了
    if t-h < uint32(len(_p_.runq)) {
        _p_.runq[t%uint32(len(_p_.runq))].set(gp)
        atomic.Store(&_p_.runqtail, t+1) // store-release, makes the item available for consumption
        return
    }
    // 把g放到全局隊列
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    goto retry
}

3.2.2.3. runqputslow

func runqputslow(_p_ *p, gp *g, h, t uint32) bool {
    var batch [len(_p_.runq)/2 + 1]*g

    // First, grab a batch from local queue.
    n := t - h
    n = n / 2
    if n != uint32(len(_p_.runq)/2) {
        throw("runqputslow: queue is not full")
    }
    // 獲取p後面的一半
    for i := uint32(0); i < n; i++ {
        batch[i] = _p_.runq[(h+i)%uint32(len(_p_.runq))].ptr()
    }
    if !atomic.Cas(&_p_.runqhead, h, h+n) { // cas-release, commits consume
        return false
    }
    batch[n] = gp

    // Link the goroutines.
    for i := uint32(0); i < n; i++ {
        batch[i].schedlink.set(batch[i+1])
    }

    // Now put the batch on global queue.
    // 放到全局隊列隊尾
    lock(&sched.lock)
    globrunqputbatch(batch[0], batch[n], int32(n+1))
    unlock(&sched.lock)
    return true
}

新建任務至此基本結束,建立完成任務後,等待調度執行就行了,從上面能夠看出,任務的優先級是 p.runnext > p.runq > sched.runq

g從建立到執行結束並放入free隊列中的狀態轉換大體以下圖所示

3.2.3 wakep

當 newproc1建立完任務後,會嘗試喚醒m來執行任務

func wakep() {
    // be conservative about spinning threads
    // 一次應該只有一個m在spining,不然就退出
    if !atomic.Cas(&sched.nmspinning, 0, 1) {
        return
    }
    // 調用startm來執行
    startm(nil, true)
}

3.2.4 startm

調度m或者建立m來運行p,若是p==nil,就會嘗試獲取一個空閒p,p的隊列中有g,拿到p後才能拿到g

func startm(_p_ *p, spinning bool) {
    lock(&sched.lock)
    if _p_ == nil {
        // 若是沒有指定p, 則從sched.pidle獲取空閒的p
        _p_ = pidleget()
        if _p_ == nil {
            unlock(&sched.lock)
            // 若是沒有獲取到p,重置nmspinning
            if spinning {
                // The caller incremented nmspinning, but there are no idle Ps,
                // so it's okay to just undo the increment and give up.
                if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
                    throw("startm: negative nmspinning")
                }
            }
            return
        }
    }
    // 首先嚐試從 sched.midle獲取一個空閒的m
    mp := mget()
    unlock(&sched.lock)
    if mp == nil {
        // 若是獲取不到空閒的m,則建立一個 mspining = true的m,並將p綁定到m上,直接返回
        var fn func()
        if spinning {
            // The caller incremented nmspinning, so set m.spinning in the new M.
            fn = mspinning
        }
        newm(fn, _p_)
        return
    }
    // 判斷獲取到的空閒m是不是spining狀態
    if mp.spinning {
        throw("startm: m is spinning")
    }
    // 判斷獲取到的m是否有p
    if mp.nextp != 0 {
        throw("startm: m has p")
    }
    if spinning && !runqempty(_p_) {
        throw("startm: p has runnable gs")
    }
    // The caller incremented nmspinning, so set m.spinning in the new M.
    // 調用函數的父函數已經增長了nmspinning, 這裏只須要設置m.spining就ok了,同時把p綁上來
    mp.spinning = spinning
    mp.nextp.set(_p_)
    // 喚醒m
    notewakeup(&mp.park)
}

3.2.4.1. newm

newm 經過allocm函數來建立新m

func newm(fn func(), _p_ *p) {
    // 新建一個m
    mp := allocm(_p_, fn)
    // 爲這個新建的m綁定指定的p
    mp.nextp.set(_p_)
    // ... 省略 ...
    // 建立系統線程
    newm1(mp)
}

3.2.4.2. new1m

func newm1(mp *m) {
    // runtime cgo包會把iscgo設置爲true,這裏不分析
    if iscgo {
        var ts cgothreadstart
        if _cgo_thread_start == nil {
            throw("_cgo_thread_start missing")
        }
        ts.g.set(mp.g0)
        ts.tls = (*uint64)(unsafe.Pointer(&mp.tls[0]))
        ts.fn = unsafe.Pointer(funcPC(mstart))
        if msanenabled {
            msanwrite(unsafe.Pointer(&ts), unsafe.Sizeof(ts))
        }
        execLock.rlock() // Prevent process clone.
        asmcgocall(_cgo_thread_start, unsafe.Pointer(&ts))
        execLock.runlock()
        return
    }
    execLock.rlock() // Prevent process clone.
    newosproc(mp)
    execLock.runlock()
}

3.2.4.3. newosproc

newosproc 建立一個新的系統線程,並執行mstart_stub函數,以後調用mstart函數進入調度,後面在執行流程會分析

func newosproc(mp *m) {
    stk := unsafe.Pointer(mp.g0.stack.hi)
    // Initialize an attribute object.
    var attr pthreadattr
    var err int32
    err = pthread_attr_init(&attr)

    // Finally, create the thread. It starts at mstart_stub, which does some low-level
    // setup and then calls mstart.
    var oset sigset
    sigprocmask(_SIG_SETMASK, &sigset_all, &oset)
    // 建立線程,並傳入啓動啓動函數 mstart_stub, mstart_stub 以後調用mstart
    err = pthread_create(&attr, funcPC(mstart_stub), unsafe.Pointer(mp))
    sigprocmask(_SIG_SETMASK, &oset, nil)
    if err != 0 {
        write(2, unsafe.Pointer(&failthreadcreate[0]), int32(len(failthreadcreate)))
        exit(1)
    }
}

3.2.4.4. allocm

allocm這裏首先會釋放 sched的freem,而後再去建立m,並初始化m

func allocm(_p_ *p, fn func()) *m {
    _g_ := getg()
    _g_.m.locks++ // disable GC because it can be called from sysmon
    if _g_.m.p == 0 {
        acquirep(_p_) // temporarily borrow p for mallocs in this function
    }

    // Release the free M list. We need to do this somewhere and
    // this may free up a stack we can use.
    // 首先釋放掉freem列表
    if sched.freem != nil {
        lock(&sched.lock)
        var newList *m
        for freem := sched.freem; freem != nil; {
            if freem.freeWait != 0 {
                next := freem.freelink
                freem.freelink = newList
                newList = freem
                freem = next
                continue
            }
            stackfree(freem.g0.stack)
            freem = freem.freelink
        }
        sched.freem = newList
        unlock(&sched.lock)
    }

    mp := new(m)
    // 啓動函數,根據startm調用來看,這個fn就是 mspinning, 會將m.mspinning設置爲true
    mp.mstartfn = fn
    // 初始化m,上面已經分析了
    mcommoninit(mp)
    // In case of cgo or Solaris or Darwin, pthread_create will make us a stack.
    // Windows and Plan 9 will layout sched stack on OS stack.
    // 爲新的m建立g0
    if iscgo || GOOS == "solaris" || GOOS == "windows" || GOOS == "plan9" || GOOS == "darwin" {
        mp.g0 = malg(-1)
    } else {
        mp.g0 = malg(8192 * sys.StackGuardMultiplier)
    }
    // 爲mp的g0綁定本身
    mp.g0.m = mp
    // 若是當前的m所綁定的是參數傳遞過來的p,解除綁定,由於參數傳遞過來的p稍後要綁定新建的m
    if _p_ == _g_.m.p.ptr() {
        releasep()
    }

    _g_.m.locks--
    if _g_.m.locks == 0 && _g_.preempt { // restore the preemption request in case we've cleared it in newstack
        _g_.stackguard0 = stackPreempt
    }

    return mp
}

3.2.4.5. notewakeup

func notewakeup(n *note) {
    var v uintptr
    // 設置m 爲locked
    for {
        v = atomic.Loaduintptr(&n.key)
        if atomic.Casuintptr(&n.key, v, locked) {
            break
        }
    }

    // Successfully set waitm to locked.
    // What was it before?
    // 根據m的原先的狀態,來判斷後面的執行流程,0則直接返回,locked則衝突,不然認爲是wating,喚醒
    switch {
    case v == 0:
        // Nothing was waiting. Done.
    case v == locked:
        // Two notewakeups! Not allowed.
        throw("notewakeup - double wakeup")
    default:
        // Must be the waiting m. Wake it up.
        // 喚醒系統線程
        semawakeup((*m)(unsafe.Pointer(v)))
    }
}

至此的話,建立完任務g後,將g放入了p的local隊列或者是全局隊列,而後開始獲取了一個空閒的m或者新建一個m來執行g,m, p, g 都已經準備完成了,下面就是開始調度,來運行任務g了

3.3. 執行

在startm函數分析的過程當中會,能夠看到,有兩種獲取m的方式

  • 新建: 這時候執行newm1下的newosproc,同時最終調用mstart來執行調度
  • 喚醒空閒m:從休眠的地方繼續執行

m執行g有兩個起點,一個是線程啓動函數 mstart, 另外一個則是休眠被喚醒後的調度schedule了,咱們從頭開始,也就是mstartmstart 走到最後也是 schedule 調度

3.3.1. mstart

func mstart() {
    _g_ := getg()

    osStack := _g_.stack.lo == 0
    if osStack {
        // Initialize stack bounds from system stack.
        // Cgo may have left stack size in stack.hi.
        // minit may update the stack bounds.
        // 從系統堆棧上直接劃出所需的範圍
        size := _g_.stack.hi
        if size == 0 {
            size = 8192 * sys.StackGuardMultiplier
        }
        _g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
        _g_.stack.lo = _g_.stack.hi - size + 1024
    }
    // Initialize stack guards so that we can start calling
    // both Go and C functions with stack growth prologues.
    _g_.stackguard0 = _g_.stack.lo + _StackGuard
    _g_.stackguard1 = _g_.stackguard0
    // 調用mstart1來處理
    mstart1()

    // Exit this thread.
    if GOOS == "windows" || GOOS == "solaris" || GOOS == "plan9" || GOOS == "darwin" {
        // Window, Solaris, Darwin and Plan 9 always system-allocate
        // the stack, but put it in _g_.stack before mstart,
        // so the logic above hasn't set osStack yet.
        osStack = true
    }
    // 退出m,正常狀況下mstart1調用schedule() 時,是再也不返回的,因此,不用擔憂系統線程的頻繁建立退出
    mexit(osStack)
}

3.3.2. mstart1

func mstart1() {
    _g_ := getg()

    if _g_ != _g_.m.g0 {
        throw("bad runtime·mstart")
    }

    // Record the caller for use as the top of stack in mcall and
    // for terminating the thread.
    // We're never coming back to mstart1 after we call schedule,
    // so other calls can reuse the current frame.
    // 保存調用者的pc sp等信息
    save(getcallerpc(), getcallersp())
    asminit()
    // 初始化m的sigal的棧和mask
    minit()

    // Install signal handlers; after minit so that minit can
    // prepare the thread to be able to handle the signals.
    // 安裝sigal處理器
    if _g_.m == &m0 {
        mstartm0()
    }
    // 若是設置了mstartfn,就先執行這個
    if fn := _g_.m.mstartfn; fn != nil {
        fn()
    }

    if _g_.m.helpgc != 0 {
        _g_.m.helpgc = 0
        stopm()
    } else if _g_.m != &m0 {
        // 獲取nextp
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()
}

3.3.2.1. acquirep

acquirep 函數主要是改變p的狀態,綁定 m p,經過吧p的mcache與m共享

func acquirep(_p_ *p) {
    // Do the part that isn't allowed to have write barriers.
    acquirep1(_p_)

    // have p; write barriers now allowed
    _g_ := getg()
    // 把p的mcache與m共享
    _g_.m.mcache = _p_.mcache
}

3.3.2.2. acquirep1

func acquirep1(_p_ *p) {
    _g_ := getg()

    // 讓m p互相綁定
    _g_.m.p.set(_p_)
    _p_.m.set(_g_.m)
    _p_.status = _Prunning
}

3.3.2.3. schedule

開始進入到調度函數了,這是一個由schedule、execute、goroutine fn、goexit構成的邏輯循環,就算m是喚醒後,也是從設置的斷點開始執行

func schedule() {
    _g_ := getg()

    if _g_.m.locks != 0 {
        throw("schedule: holding locks")
    }
    // 若是有lockg,中止執行當前的m
    if _g_.m.lockedg != 0 {
        // 解除lockedm的鎖定,並執行當前g
        stoplockedm()
        execute(_g_.m.lockedg.ptr(), false) // Never returns.
    }

    // We should not schedule away from a g that is executing a cgo call,
    // since the cgo call is using the m's g0 stack.
    if _g_.m.incgo {
        throw("schedule: in cgo")
    }

top:
    // gc 等待
    if sched.gcwaiting != 0 {
        gcstopm()
        goto top
    }

    var gp *g
    var inheritTime bool

    if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        // 爲了保證公平,每隔61次,從全局隊列上獲取g
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        // 全局隊列上獲取不到待運行的g,則從p local隊列中獲取
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
    if gp == nil {
        // 若是p local獲取不到待運行g,則開始查找,這個函數會從 全局 io poll, p locl和其餘p local獲取待運行的g,後面詳細分析
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
    if _g_.m.spinning {
        // 若是m是自旋狀態,取消自旋
        resetspinning()
    }

    if gp.lockedm != 0 {
        // Hands off own p to the locked m,
        // then blocks waiting for a new p.
        // 若是g有lockedm,則休眠上交p,休眠m,等待新的m,喚醒後從這裏開始執行,跳轉到top
        startlockedm(gp)
        goto top
    }
    // 開始執行這個g
    execute(gp, inheritTime)
}
3.3.2.3.1. stoplockedm

由於當前的m綁定了lockedg,而當前g不是指定的lockedg,因此這個m不能執行,上交當前m綁定的p,而且休眠m直到調度lockedg

func stoplockedm() {
    _g_ := getg()

    if _g_.m.lockedg == 0 || _g_.m.lockedg.ptr().lockedm.ptr() != _g_.m {
        throw("stoplockedm: inconsistent locking")
    }
    if _g_.m.p != 0 {
        // Schedule another M to run this p.
        // 釋放當前p
        _p_ := releasep()
        handoffp(_p_)
    }
    incidlelocked(1)
    // Wait until another thread schedules lockedg again.
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
    status := readgstatus(_g_.m.lockedg.ptr())
    if status&^_Gscan != _Grunnable {
        print("runtime:stoplockedm: g is not Grunnable or Gscanrunnable\n")
        dumpgstatus(_g_)
        throw("stoplockedm: not runnable")
    }
    // 上交了當前的p,將nextp設置爲可執行的p
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}
3.3.2.3.2. startlockedm

調度 lockedm去運行lockedg

func startlockedm(gp *g) {
    _g_ := getg()

    mp := gp.lockedm.ptr()
    if mp == _g_.m {
        throw("startlockedm: locked to me")
    }
    if mp.nextp != 0 {
        throw("startlockedm: m has p")
    }
    // directly handoff current P to the locked m
    incidlelocked(-1)
    // 移交當前p給lockedm,並設置爲lockedm.nextp,以便於lockedm喚醒後,能夠獲取
    _p_ := releasep()
    mp.nextp.set(_p_)
    // m被喚醒後,從m休眠的地方開始執行,也就是schedule()函數中
    notewakeup(&mp.park)
    stopm()
}
3.3.2.3.3. handoffp
func handoffp(_p_ *p) {
    // handoffp must start an M in any situation where
    // findrunnable would return a G to run on _p_.

    // if it has local work, start it straight away
    if !runqempty(_p_) || sched.runqsize != 0 {
        // 調用startm開始調度
        startm(_p_, false)
        return
    }

    // no local work, check that there are no spinning/idle M's,
    // otherwise our help is not required
    // 判斷有沒有正在尋找p的m以及有沒有空閒的p
    if atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) == 0 && atomic.Cas(&sched.nmspinning, 0, 1) { // TODO: fast atomic
        startm(_p_, true)
        return
    }
    lock(&sched.lock)

    if _p_.runSafePointFn != 0 && atomic.Cas(&_p_.runSafePointFn, 1, 0) {
        sched.safePointFn(_p_)
        sched.safePointWait--
        if sched.safePointWait == 0 {
            notewakeup(&sched.safePointNote)
        }
    }
    // 若是 全局待運行g隊列不爲空,嘗試使用startm進行調度
    if sched.runqsize != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // If this is the last running P and nobody is polling network,
    // need to wakeup another M to poll network.
    if sched.npidle == uint32(gomaxprocs-1) && atomic.Load64(&sched.lastpoll) != 0 {
        unlock(&sched.lock)
        startm(_p_, false)
        return
    }
    // 把p放入到全局的空閒隊列,放回隊列就很少說了,參考allm的放回
    pidleput(_p_)
    unlock(&sched.lock)
}
3.3.2.3.4. execute

開始執行g的代碼了

func execute(gp *g, inheritTime bool) {
    _g_ := getg()
    // 更改g的狀態,並不容許搶佔
    casgstatus(gp, _Grunnable, _Grunning)
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard
    if !inheritTime {
        // 調度計數
        _g_.m.p.ptr().schedtick++
    }
    _g_.m.curg = gp
    gp.m = _g_.m
    // 開始執行g的代碼了
    gogo(&gp.sched)
}
3.3.2.3.5. gogo

gogo函數承載的做用就是切換到g的棧,開始執行g的代碼,彙編內容就不分析了,可是有一個疑問就是,gogo執行完函數後,怎麼再次進入調度呢?

咱們回到newproc1函數的L63 newg.sched.pc = funcPC(goexit) + sys.PCQuantum ,這裏保存了pc的質地爲goexit的地址,因此當執行完用戶代碼後,就會進入 goexit 函數

3.3.2.3.6. goexit0

goexit 在彙編層面就是調用 runtime.goexit1,而goexit1經過 mcall 調用了goexit0 因此這裏直接分析了goexit0

goexit0 重置g的狀態,並從新進行調度,這樣就調度就又回到了schedule() 了,開始循環往復的調度

func goexit0(gp *g) {
    _g_ := getg()
    // 轉換g的狀態爲dead,以放回空閒列表
    casgstatus(gp, _Grunning, _Gdead)
    if isSystemGoroutine(gp) {
        atomic.Xadd(&sched.ngsys, -1)
    }
    // 清空g的狀態
    gp.m = nil
    locked := gp.lockedm != 0
    gp.lockedm = 0
    _g_.m.lockedg = 0
    gp.paniconfault = false
    gp._defer = nil // should be true already but just in case.
    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    // Note that gp's stack scan is now "valid" because it has no
    // stack.
    gp.gcscanvalid = true
    dropg()

    // 把g放回空閒列表,以備複用
    gfput(_g_.m.p.ptr(), gp)
    // 再次進入調度循環
    schedule()
}

至此,單次調度結束,再次進入調度,循環往復

3.3.2.3.7. findrunnable
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()

    // The conditions here and in handoffp must agree: if
    // findrunnable would return a G to run, handoffp must start
    // an M.

top:
    _p_ := _g_.m.p.ptr()

    // local runq
    // 從p local 去獲取g
    if gp, inheritTime := runqget(_p_); gp != nil {
        return gp, inheritTime
    }

    // global runq
    // 從全局的待運行d隊列獲取
    if sched.runqsize != 0 {
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    // Poll network.
    // This netpoll is only an optimization before we resort to stealing.
    // We can safely skip it if there are no waiters or a thread is blocked
    // in netpoll already. If there is any kind of logical race with that
    // blocked thread (e.g. it has already returned from netpoll, but does
    // not set lastpoll yet), this thread will do blocking netpoll below
    // anyway.
    // 看看netpoll中有沒有已經準備好的g
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if gp := netpoll(false); gp != nil { // non-blocking
            // netpoll returns list of goroutines linked by schedlink.
            injectglist(gp.schedlink.ptr())
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

    // Steal work from other P's.
    // 若是sched.pidle == procs - 1,說明全部的p都是空閒的,無需遍歷其餘p了
    procs := uint32(gomaxprocs)
    if atomic.Load(&sched.npidle) == procs-1 {
        // Either GOMAXPROCS=1 or everybody, except for us, is idle already.
        // New work can appear from returning syscall/cgocall, network or timers.
        // Neither of that submits to local run queues, so no point in stealing.
        goto stop
    }
    // If number of spinning M's >= number of busy P's, block.
    // This is necessary to prevent excessive CPU consumption
    // when GOMAXPROCS>>1 but the program parallelism is low.
    // 若是尋找p的m的數量,大於有g的p的數量的通常,就再也不去尋找了
    if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
        goto stop
    }
    // 設置當前m的自旋狀態
    if !_g_.m.spinning {
        _g_.m.spinning = true
        atomic.Xadd(&sched.nmspinning, 1)
    }
    // 開始竊取其餘p的待運行g了
    for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2 // first look for ready queues with more than 1 g
            // 從其餘的p偷取通常的任務數量,還會隨機偷取p的runnext(過度了),偷取部分就不分析了,就是slice的操做而已
            if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }

stop:
    // 對all作個鏡像備份
    allpSnapshot := allp

    // return P and block
    lock(&sched.lock)

    if sched.runqsize != 0 {
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        return gp, false
    }
    if releasep() != _p_ {
        throw("findrunnable: wrong p")
    }
    pidleput(_p_)
    unlock(&sched.lock)

    wasSpinning := _g_.m.spinning
    if _g_.m.spinning {
        // 設置非自旋狀態,由於找p的工做已經結束了
        _g_.m.spinning = false
        if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
            throw("findrunnable: negative nmspinning")
        }
    }

    // check all runqueues once again
    for _, _p_ := range allpSnapshot {
        if !runqempty(_p_) {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                if wasSpinning {
                    _g_.m.spinning = true
                    atomic.Xadd(&sched.nmspinning, 1)
                }
                goto top
            }
            break
        }
    }
    // poll network
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
        if _g_.m.p != 0 {
            throw("findrunnable: netpoll with p")
        }
        if _g_.m.spinning {
            throw("findrunnable: netpoll with spinning")
        }
        gp := netpoll(true) // block until new work is available
        atomic.Store64(&sched.lastpoll, uint64(nanotime()))
        if gp != nil {
            lock(&sched.lock)
            _p_ = pidleget()
            unlock(&sched.lock)
            if _p_ != nil {
                acquirep(_p_)
                injectglist(gp.schedlink.ptr())
                casgstatus(gp, _Gwaiting, _Grunnable)
                if trace.enabled {
                    traceGoUnpark(gp, 0)
                }
                return gp, false
            }
            injectglist(gp)
        }
    }
    stopm()
    goto top
}

這裏真的是無奈啊,爲了尋找一個可運行的g,也是煞費苦心,及時進入了stop 的label,仍是不死心,又來了一邊尋找。大體尋找過程能夠總結爲一下幾個:

  • 從p本身的local隊列中獲取可運行的g
  • 從全局隊列中獲取可運行的g
  • 從netpoll中獲取一個已經準備好的g
  • 從其餘p的local隊列中獲取可運行的g,隨機偷取p的runnext,有點任性
  • 不管如何都獲取不到的話,就stopm了
3.3.2.3.7. stopm

stop會把當前m放到空閒列表裏面,同時綁定m.nextp 與 m

func stopm() {
    _g_ := getg()
retry:
    lock(&sched.lock)
    // 把當前m放到sched.midle 的空閒列表裏
    mput(_g_.m)
    unlock(&sched.lock)
    // 休眠,等待被喚醒
    notesleep(&_g_.m.park)
    noteclear(&_g_.m.park)
    // 綁定p
    acquirep(_g_.m.nextp.ptr())
    _g_.m.nextp = 0
}

3.4. 監控

3.4.1. sysmon

go的監控是依靠函數 sysmon 來完成的,監控主要作一下幾件事

  • 釋放閒置超過5分鐘的span物理內存
  • 若是超過兩分鐘沒有執行垃圾回收,則強制執行
  • 將長時間未處理的netpoll結果添加到任務隊列
  • 向長時間運行的g進行搶佔
  • 收回由於syscall而長時間阻塞的p

監控線程並非時刻在運行的,監控線程首次休眠20us,每次執行完後,增長一倍的休眠時間,可是最多休眠10ms

func sysmon() {
    lock(&sched.lock)
    sched.nmsys++
    checkdead()
    unlock(&sched.lock)

    // If a heap span goes unused for 5 minutes after a garbage collection,
    // we hand it back to the operating system.
    scavengelimit := int64(5 * 60 * 1e9)

    if debug.scavenge > 0 {
        // Scavenge-a-lot for testing.
        forcegcperiod = 10 * 1e6
        scavengelimit = 20 * 1e6
    }

    lastscavenge := nanotime()
    nscavenge := 0

    lasttrace := int64(0)
    idle := 0 // how many cycles in succession we had not wokeup somebody
    delay := uint32(0)
    for {
        // 判斷當前循環,應該休眠的時間
        if idle == 0 { // start with 20us sleep...
            delay = 20
        } else if idle > 50 { // start doubling the sleep after 1ms...
            delay *= 2
        }
        if delay > 10*1000 { // up to 10ms
            delay = 10 * 1000
        }
        usleep(delay)
        // STW時休眠sysmon
        if debug.schedtrace <= 0 && (sched.gcwaiting != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs)) {
            lock(&sched.lock)
            if atomic.Load(&sched.gcwaiting) != 0 || atomic.Load(&sched.npidle) == uint32(gomaxprocs) {
                atomic.Store(&sched.sysmonwait, 1)
                unlock(&sched.lock)
                // Make wake-up period small enough
                // for the sampling to be correct.
                maxsleep := forcegcperiod / 2
                if scavengelimit < forcegcperiod {
                    maxsleep = scavengelimit / 2
                }
                shouldRelax := true
                if osRelaxMinNS > 0 {
                    next := timeSleepUntil()
                    now := nanotime()
                    if next-now < osRelaxMinNS {
                        shouldRelax = false
                    }
                }
                if shouldRelax {
                    osRelax(true)
                }
                // 進行休眠
                notetsleep(&sched.sysmonnote, maxsleep)
                if shouldRelax {
                    osRelax(false)
                }
                lock(&sched.lock)
                // 喚醒後,清除休眠狀態,繼續執行
                atomic.Store(&sched.sysmonwait, 0)
                noteclear(&sched.sysmonnote)
                idle = 0
                delay = 20
            }
            unlock(&sched.lock)
        }
        // trigger libc interceptors if needed
        if *cgo_yield != nil {
            asmcgocall(*cgo_yield, nil)
        }
        // poll network if not polled for more than 10ms
        lastpoll := int64(atomic.Load64(&sched.lastpoll))
        now := nanotime()
        // 若是netpoll不爲空,每隔10ms檢查一下是否有ok的
        if netpollinited() && lastpoll != 0 && lastpoll+10*1000*1000 < now {
            atomic.Cas64(&sched.lastpoll, uint64(lastpoll), uint64(now))
            // 返回了已經獲取到結果的goroutine的列表
            gp := netpoll(false) // non-blocking - returns list of goroutines
            if gp != nil {
                incidlelocked(-1)
                // 把獲取到的g的列表加入到全局待運行隊列中
                injectglist(gp)
                incidlelocked(1)
            }
        }
        // retake P's blocked in syscalls
        // and preempt long running G's
        // 搶奪syscall長時間阻塞的p和長時間運行的g
        if retake(now) != 0 {
            idle = 0
        } else {
            idle++
        }
        // check if we need to force a GC
        // 經過gcTrigger.test() 函數判斷是否超過設定的強制觸發gc的時間間隔,
        if t := (gcTrigger{kind: gcTriggerTime, now: now}); t.test() && atomic.Load(&forcegc.idle) != 0 {
            lock(&forcegc.lock)
            forcegc.idle = 0
            forcegc.g.schedlink = 0
            // 把gc的g加入待運行隊列,等待調度運行
            injectglist(forcegc.g)
            unlock(&forcegc.lock)
        }
        // scavenge heap once in a while
        // 判斷是否有5分鐘未使用的span,有的話,歸還給系統
        if lastscavenge+scavengelimit/2 < now {
            mheap_.scavenge(int32(nscavenge), uint64(now), uint64(scavengelimit))
            lastscavenge = now
            nscavenge++
        }
        if debug.schedtrace > 0 && lasttrace+int64(debug.schedtrace)*1000000 <= now {
            lasttrace = now
            schedtrace(debug.scheddetail > 0)
        }
    }
}

掃描netpoll,並把g存放到去全局隊列比較好理解,跟前面添加p和m的邏輯差很少,可是搶佔這裏就不是很理解了,你說搶佔就搶佔,被搶佔的g豈不是很沒面子,並且怎麼搶佔呢?

3.4.2. retake

const forcePreemptNS = 10 * 1000 * 1000 // 10ms

func retake(now int64) uint32 {
    n := 0
    // Prevent allp slice changes. This lock will be completely
    // uncontended unless we're already stopping the world.
    lock(&allpLock)
    // We can't use a range loop over allp because we may
    // temporarily drop the allpLock. Hence, we need to re-fetch
    // allp each time around the loop.
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        if _p_ == nil {
            // This can happen if procresize has grown
            // allp but not yet created new Ps.
            continue
        }
        pd := &_p_.sysmontick
        s := _p_.status
        if s == _Psyscall {
            // Retake P from syscall if it's there for more than 1 sysmon tick (at least 20us).
            // pd.syscalltick 即 _p_.sysmontick.syscalltick 只有在sysmon的時候會更新,而 _p_.syscalltick 則會每次都更新,因此,當syscall以後,第一個sysmon檢測到的時候並不會搶佔,而是第二次開始纔會搶佔,中間間隔至少有20us,最多會有10ms
            t := int64(_p_.syscalltick)
            if int64(pd.syscalltick) != t {
                pd.syscalltick = uint32(t)
                pd.syscallwhen = now
                continue
            }
            // On the one hand we don't want to retake Ps if there is no other work to do,
            // but on the other hand we want to retake them eventually
            // because they can prevent the sysmon thread from deep sleep.
            // 是否有空p,有尋找p的m,以及當前的p在syscall以後,有沒有超過10ms
            if runqempty(_p_) && atomic.Load(&sched.nmspinning)+atomic.Load(&sched.npidle) > 0 && pd.syscallwhen+10*1000*1000 > now {
                continue
            }
            // Drop allpLock so we can take sched.lock.
            unlock(&allpLock)
            // Need to decrement number of idle locked M's
            // (pretending that one more is running) before the CAS.
            // Otherwise the M from which we retake can exit the syscall,
            // increment nmidle and report deadlock.
            incidlelocked(-1)
            // 搶佔p,把p的狀態轉爲idle狀態
            if atomic.Cas(&_p_.status, s, _Pidle) {
                if trace.enabled {
                    traceGoSysBlock(_p_)
                    traceProcStop(_p_)
                }
                n++
                _p_.syscalltick++
                // 把當前p移交出去,上面已經分析過了
                handoffp(_p_)
            }
            incidlelocked(1)
            lock(&allpLock)
        } else if s == _Prunning {
            // Preempt G if it's running for too long.
            // 若是p是running狀態,若是p下面的g執行過久了,則搶佔
            t := int64(_p_.schedtick)
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
                continue
            }
            // 判斷是否超出10ms, 不超過不搶佔
            if pd.schedwhen+forcePreemptNS > now {
                continue
            }
            // 開始搶佔
            preemptone(_p_)
        }
    }
    unlock(&allpLock)
    return uint32(n)
}

3.4.3. preemptone

這個函數的註釋,做者就代表這種搶佔並非很靠譜😂,咱們先看一下實現吧

func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    if mp == nil || mp == getg().m {
        return false
    }
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }
    // 標識搶佔字段
    gp.preempt = true

    // Every call in a go routine checks for stack overflow by
    // comparing the current stack pointer to gp->stackguard0.
    // Setting gp->stackguard0 to StackPreempt folds
    // preemption into the normal stack overflow check.
    // 更新stackguard0,保證能檢測到棧溢
    gp.stackguard0 = stackPreempt
    return true
}

在這裏,做者會更新 gp.stackguard0 = stackPreempt,而後讓g誤覺得棧不夠用了,那就只有乖乖的去進行棧擴張,站擴張的話就用調用newstack 分配一個新棧,而後把原先的棧的內容拷貝過去,而在 newstack 裏面有一段以下

if preempt {
    if thisg.m.locks != 0 || thisg.m.mallocing != 0 || thisg.m.preemptoff != "" || thisg.m.p.ptr().status != _Prunning {
        // Let the goroutine keep running for now.
        // gp->preempt is set, so it will be preempted next time.
        gp.stackguard0 = gp.stack.lo + _StackGuard
        gogo(&gp.sched) // never return
    }
}

而後這裏就發現g被搶佔了,那你棧不夠用就有多是假的,可是管你呢,你再去調度去吧,也不給你擴棧了,雖然做者和雨痕大神都吐槽了一下這個,可是這種搶佔方式自動1.5(也可能更早)就一直存在,且穩定運行,就說明仍是很牛逼的了

4. 總結

在調度器的設置上,最明顯的就是複用:g 的free鏈表, m的free列表, p的free列表,這樣就避免了重複建立銷燬鎖浪費的資源

其次就是多級緩存: 這一塊跟內存上的設計思想也是一直的,p一直有一個 g 的待運行隊列,本身沒有貨過多的時候,纔會平衡到全局隊列,全局隊列操做須要鎖,則本地操做則不須要,大大減小了鎖的建立銷燬所消耗的資源

至此,g m p的關係及狀態轉換大體都講解完成了,因爲對彙編這塊比較薄弱,因此基本略過了,右面有機會仍是須要多瞭解一點

5. 參考文檔

相關文章
相關標籤/搜索