golang核心原理-協程調度時機

golang調度模型

模型總攬

核心實體

Goroutines (G)

golang調度單元,golang能夠開啓成千上萬個g,每一個g能夠理解爲一個任務,等待被調度。其存儲了goroutine的執行stack信息、goroutine狀態以及goroutine的任務函數等。g只能感知到p,下文說的m對其透明的。golang

OSThread (M)

系統線程,實際執行g的狠角色,但m並不維護g的狀態,一切都是由幕後黑手p來控制。算法

Processor (P)

維護m執行時所須要的上下文,p的個數一般和cpu核數一致(能夠設置),表明gorotine的併發度。其維護了g的隊列。bash

實體間的關係

一圖勝千言,直接看這個經典的圖網絡

調度本質

即schedule函數,經過調度,放棄目前執行的g,選擇一個g來執行。選擇算法不是本文重點,這裏不作過多講述。併發

切換時機

  • 會阻塞的系統調用,好比文件io,網絡io;
  • time系列定時操做;
  • go func的時候, func執行完的時候;
  • 管道讀寫阻塞的狀況;
  • 垃圾回收以後。
  • 主動調用runtime.Gosched()

調度時機分析

阻塞性系統調用

系統調用,如read,golang重寫了全部系統調用,在系統調用加入了調度邏輯 拿read舉例框架

/usr/local/go/src/os/file.go:97
 
// Read reads up to len(b) bytes from the File.
// It returns the number of bytes read and an error, if any.
// EOF is signaled by a zero count with err set to io.EOF.
func (f *File) Read(b []byte) (n int, err error) {
	if f == nil {
		return 0, ErrInvalid
	}
	n, e := f.read(b)
	if n == 0 && len(b) > 0 && e == nil {
		return 0, io.EOF
	}
	if e != nil {
		err = &PathError{"read", f.name, e}
	}
	return n, err
}
複製代碼

嵌套到幾層,就不所有貼出來,跟究竟是以下函數:函數

func read(fd int, p []byte) (n int, err error) {
	var _p0 unsafe.Pointer
	if len(p) > 0 {
		_p0 = unsafe.Pointer(&p[0])
	} else {
		_p0 = unsafe.Pointer(&_zero)
	}
	r0, _, e1 := Syscall(SYS_READ, uintptr(fd), uintptr(_p0), uintptr(len(p)))
	n = int(r0)
	if e1 != 0 {
		err = errnoErr(e1)
	}
	return
}

func Syscall(trap, a1, a2, a3 uintptr) (r1, r2 uintptr, err Errno)
複製代碼

Syscall是彙編實現學習

TEXT    ·Syscall(SB),NOSPLIT,$0-56
    BL  runtime·entersyscall(SB)
    MOVD    a1+8(FP), R3
    MOVD    a2+16(FP), R4
    MOVD    a3+24(FP), R5
    MOVD    R0, R6
    MOVD    R0, R7
    MOVD    R0, R8
    MOVD    trap+0(FP), R9  // syscall entry
    SYSCALL R9
    BVC ok
    MOVD    $-1, R4
    MOVD    R4, r1+32(FP)   // r1
    MOVD    R0, r2+40(FP)   // r2
    MOVD    R3, err+48(FP)  // errno
    BL  runtime·exitsyscall(SB)
    RET
ok:
    MOVD    R3, r1+32(FP)   // r1
    MOVD    R4, r2+40(FP)   // r2
    MOVD    R0, err+48(FP)  // errno
    BL  runtime·exitsyscall(SB)
    RET
複製代碼

能夠看到,進入系統調用時,是調用entersyscall,當離開系統調用,會運行exitsyscallui

// Standard syscall entry used by the go syscall library and normal cgo calls.
//go:nosplit
func entersyscall(dummy int32) {
    reentersyscall(getcallerpc(unsafe.Pointer(&dummy)), getcallersp(unsafe.Pointer(&dummy)))
}

func reentersyscall(pc, sp uintptr) {
	_g_ := getg()
 
	// Disable preemption because during this function g is in Gsyscall status,
	// but can have inconsistent g->sched, do not let GC observe it.
	_g_.m.locks++
 
	// Entersyscall must not call any function that might split/grow the stack.
	// (See details in comment above.)
	// Catch calls that might, by replacing the stack guard with something that
	// will trip any stack check and leaving a flag to tell newstack to die.
	_g_.stackguard0 = stackPreempt
	_g_.throwsplit = true
 
	// Leave SP around for GC and traceback.
	save(pc, sp)
	_g_.syscallsp = sp
	_g_.syscallpc = pc
	casgstatus(_g_, _Grunning, _Gsyscall)
	if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
		systemstack(func() {
			print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
			throw("entersyscall")
		})
	}
 
	if trace.enabled {
		systemstack(traceGoSysCall)
		// systemstack itself clobbers g.sched.{pc,sp} and we might
		// need them later when the G is genuinely blocked in a
		// syscall
		save(pc, sp)
	}
 
	if atomic.Load(&sched.sysmonwait) != 0 { // TODO: fast atomic
		systemstack(entersyscall_sysmon)
		save(pc, sp)
	}
 
	if _g_.m.p.ptr().runSafePointFn != 0 {
		// runSafePointFn may stack split if run on this stack
		systemstack(runSafePointFn)
		save(pc, sp)
	}
 
	_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
	_g_.sysblocktraced = true
	_g_.m.mcache = nil
	_g_.m.p.ptr().m = 0
	atomic.Store(&_g_.m.p.ptr().status, _Psyscall)
	if sched.gcwaiting != 0 {
		systemstack(entersyscall_gcwait)
		save(pc, sp)
	}
 
	// Goroutines must not split stacks in Gsyscall status (it would corrupt g->sched).
	// We set _StackGuard to StackPreempt so that first split stack check calls morestack.
	// Morestack detects this case and throws.
	_g_.stackguard0 = stackPreempt
	_g_.m.locks--
}
複製代碼

進入系統調用時,p和m分離,當前運行的g狀態變爲_Gsyscall。this

_Gsyscall恢復時機:

  1. 當m執行完,調用exitsyscall從新和以前的p綁定,其中調度的仍是schedule函數;
  2. sysmon線程,發現該p必定時間沒有執行,會其分配一個新的m。此時進入調度。

time定時類操做

都拿time.Sleep舉例

// Sleep pauses the current goroutine for at least the duration d.
// A negative or zero duration causes Sleep to return immediately.
func Sleep(d Duration)

實際定義在runtime
// timeSleep puts the current goroutine to sleep for at least ns nanoseconds.
//go:linkname timeSleep time.Sleep
func timeSleep(ns int64) {
    if ns <= 0 {
        return
    }

    t := getg().timer
    if t == nil {
        t = new(timer)
        getg().timer = t
    }
    *t = timer{}
    t.when = nanotime() + ns
    t.f = goroutineReady
    t.arg = getg()
    lock(&timers.lock)
    addtimerLocked(t)
    goparkunlock(&timers.lock, "sleep", traceEvGoSleep, 2)
}

複製代碼

goparkunlock 最終調用gopark

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason string, traceEv byte, traceskip int) {
	mp := acquirem()
	gp := mp.curg
	status := readgstatus(gp)
	if status != _Grunning && status != _Gscanrunning {
		throw("gopark: bad g status")
	}
	mp.waitlock = lock
	mp.waitunlockf = *(*unsafe.Pointer)(unsafe.Pointer(&unlockf))
	gp.waitreason = reason
	mp.waittraceev = traceEv
	mp.waittraceskip = traceskip
	releasem(mp)
	// can't do anything that might move the G between Ms here. mcall(park_m) } 複製代碼

mcall(fn) 是切換到g0,讓g0來調用fn,這裏咱們看下park_m定義 park_m

func park_m(gp *g) {mcall(park_m) 
	_g_ := getg()

	if trace.enabled {
		traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
	}

	casgstatus(gp, _Grunning, _Gwaiting)
	dropg()

	if _g_.m.waitunlockf != nil {
		fn := *(*func(*g, unsafe.Pointer) bool)(unsafe.Pointer(&_g_.m.waitunlockf))
		ok := fn(gp, _g_.m.waitlock)
		_g_.m.waitunlockf = nil
		_g_.m.waitlock = nil
		if !ok {
			if trace.enabled {
				traceGoUnpark(gp, 2)
			}
			casgstatus(gp, _Gwaiting, _Grunnable)
			execute(gp, true) // Schedule it back, never returns.
		}
	}
	schedule()
}
複製代碼

能夠看到,先把狀態轉化爲_Gwaiting, 再進行了一次schedule 針對_Gwaiting的g,須要調用goready,才能恢復。

新起一個協程和退出

新開一個協程,g狀態會變爲_GIdle,觸發調度。當協程執行完,會調用goexit1 此時狀態變爲_GDead _Gdead能夠被複用,或者被gc清除。

管道阻塞

chansend即c<-chanel的實現

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, "chan send (nil chan)", traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(unsafe.Pointer(c), callerpc, funcPC(chansend))
	}

    ........
    // 省略無關代碼
    ........
    
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.selectdone = nil
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	c.sendq.enqueue(mysg)
	goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}
複製代碼

能夠看到,實際仍是調用goparkunlock->gopark,來進行調度。

gc以後

stw以後,會從新選擇g開始執行。此處不對垃圾回收作過多擴展。

主動調用runtime.Gosched()

沒有找到非要調用runtime.Gosched的場景,主要做用仍是爲了調試,學習runtime吧

// Gosched yields the processor, allowing other goroutines to run. It does not
// suspend the current goroutine, so execution resumes automatically.
//go:nosplit
func Gosched() {
	mcall(gosched_m)
}
複製代碼

第一步就將環境切換到g0,而後執行一個叫gosched_m的函數

// Gosched continuation on g0.
func gosched_m(gp *g) {
	if trace.enabled {
		traceGoSched()
	}
	goschedImpl(gp)
}

func goschedImpl(gp *g) {
	status := readgstatus(gp)
	if status&^_Gscan != _Grunning {
		dumpgstatus(gp)
		throw("bad g status")
	}
	casgstatus(gp, _Grunning, _Grunnable)
	dropg()
	lock(&sched.lock)
	globrunqput(gp)
	unlock(&sched.lock)

	schedule()
}
複製代碼

能夠看到,當前g被設置爲_Grunnable,放入執行隊列。而後調用schedule,選擇一個合適的g進行執行。

總結

golang協程調度時機主要是阻塞性操做開始,結束。研究每一個場景相關代碼,便可對golang有更深的理解。這裏也分享一個閱讀源碼的小經驗,每次帶着一個特定問題去尋找答案,好比本文的調度時機,後面再看調度算法,垃圾回收,這樣每次能忽略無關因素,經過多個不一樣的主題,整個框架會愈來愈完善。

參考文章

A complete journey with Goroutines

Go's work-stealing scheduler

相關文章
相關標籤/搜索