Go語言調度器之盜取goroutine(17)

本文是《Go語言調度器源代碼情景分析》系列的第17篇,也是第三章《Goroutine調度策略》的第2小節。linux


 

上一小節咱們分析了從全局運行隊列與工做線程的本地運行隊列獲取goroutine的過程,這一小節咱們繼續分析因沒法從上述兩個隊列中拿到須要運行的goroutine而致使的從其它工做線程的本地運行隊列中盜取goroutine的過程。算法

findrunnable() 函數負責處理與盜取相關的邏輯,該函數代碼很繁雜,由於它還作了與gc和netpoll等相關的事情,爲了避免影響咱們的分析思路,這裏咱們仍然把不相關的代碼刪掉了,不過代碼仍是比較多,但總結起來就一句話:盡力去各個運行隊列中尋找goroutine,若是實在找不到則進入睡眠狀態。下面是代碼細節:app

runtime/proc.go : 2176dom

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()

	// The conditions here and in handoffp must agree: if
	// findrunnable would return a G to run, handoffp must start
	// an M.

top:
	_p_ := _g_.m.p.ptr()
	
    ......

	// local runq
    //再次看一下本地運行隊列是否有須要運行的goroutine
	if gp, inheritTime := runqget(_p_); gp != nil {
		return gp, inheritTime
	}

	// global runq
    //再看看全局運行隊列是否有須要運行的goroutine
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false
		}
	}

    ......

	// Steal work from other P's.
    //若是除了當前工做線程還在運行外,其它工做線程已經處於休眠中,那麼也就不用去偷了,確定沒有
	procs := uint32(gomaxprocs)
	if atomic.Load(&sched.npidle) == procs-1 {
		// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
		// New work can appear from returning syscall/cgocall, network or timers.
		// Neither of that submits to local run queues, so no point in stealing.
		goto stop
	}
	// If number of spinning M's >= number of busy P's, block.
	// This is necessary to prevent excessive CPU consumption
	// when GOMAXPROCS>>1 but the program parallelism is low.
    // 這個判斷主要是爲了防止由於尋找可運行的goroutine而消耗太多的CPU。
    // 由於已經有足夠多的工做線程正在尋找可運行的goroutine,讓他們去找就行了,本身偷個懶去睡覺
	if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
		goto stop
	}
	if !_g_.m.spinning {
        //設置m的狀態爲spinning
		_g_.m.spinning = true
        //處於spinning狀態的m數量加一
		atomic.Xadd(&sched.nmspinning, 1)
	}
    
    //從其它p的本地運行隊列盜取goroutine
	for i := 0; i < 4; i++ {
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				goto top
			}
			stealRunNextG := i > 2 // first look for ready queues with more than 1 g
			if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
				return gp, false
			}
		}
	}

stop:
	
	......

	// Before we drop our P, make a snapshot of the allp slice,
	// which can change underfoot once we no longer block
	// safe-points. We don't need to snapshot the contents because
	// everything up to cap(allp) is immutable.
	allpSnapshot := allp

	// return P and block
	lock(&sched.lock)
  
	......
  
	if sched.runqsize != 0 {
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		return gp, false
	}
    
    // 當前工做線程解除與p之間的綁定,準備去休眠
	if releasep() != _p_ {
		throw("findrunnable: wrong p")
	}
    //把p放入空閒隊列
	pidleput(_p_)
	unlock(&sched.lock)

	// Delicate dance: thread transitions from spinning to non-spinning state,
	// potentially concurrently with submission of new goroutines. We must
	// drop nmspinning first and then check all per-P queues again (with
	// #StoreLoad memory barrier in between). If we do it the other way around,
	// another thread can submit a goroutine after we've checked all run queues
	// but before we drop nmspinning; as the result nobody will unpark a thread
	// to run the goroutine.
	// If we discover new work below, we need to restore m.spinning as a signal
	// for resetspinning to unpark a new worker thread (because there can be more
	// than one starving goroutine). However, if after discovering new work
	// we also observe no idle Ps, it is OK to just park the current thread:
	// the system is fully loaded so no spinning threads are required.
	// Also see "Worker thread parking/unparking" comment at the top of the file.
	wasSpinning := _g_.m.spinning
	if _g_.m.spinning {
        //m即將睡眠,狀態再也不是spinning
		_g_.m.spinning = false
		if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
			throw("findrunnable: negative nmspinning")
		}
	}

	// check all runqueues once again
    // 休眠以前再看一下是否有工做要作
	for _, _p_ := range allpSnapshot {
		if !runqempty(_p_) {
			lock(&sched.lock)
			_p_ = pidleget()
			unlock(&sched.lock)
			if _p_ != nil {
				acquirep(_p_)
				if wasSpinning {
					_g_.m.spinning = true
					atomic.Xadd(&sched.nmspinning, 1)
				}
				goto top
			}
			break
		}
	}

	......
    //休眠
	stopm()
	goto top
}

從上面的代碼能夠看到,工做線程在放棄尋找可運行的goroutine而進入睡眠以前,會反覆嘗試從各個運行隊列尋找須要運行的goroutine,可謂是全力以赴了。這個函數須要重點注意如下兩點:ide

第一點,工做線程M的自旋狀態(spinning)工做線程在從其它工做線程的本地運行隊列中盜取goroutine時的狀態稱爲自旋狀態。從上面代碼能夠看到,當前M在去其它p的運行隊列盜取goroutine以前把spinning標誌設置成了true,同時增長處於自旋狀態的M的數量,而盜取結束以後則把spinning標誌還原爲false,同時減小處於自旋狀態的M的數量,從後面的分析咱們能夠看到,當有空閒P又有goroutine須要運行的時候,這個處於自旋狀態的M的數量決定了是否須要喚醒或者建立新的工做線程。函數

第二點,盜取算法。盜取過程用了兩個嵌套for循環。內層循環實現了盜取邏輯,從代碼能夠看出盜取的實質就是遍歷allp中的全部p,查看其運行隊列是否有goroutine,若是有,則取其一半到當前工做線程的運行隊列,而後從findrunnable返回,若是沒有則繼續遍歷下一個p。但這裏爲了保證公平性,遍歷allp時並非固定的從allp[0]即第一個p開始,而是從隨機位置上的p開始,並且遍歷的順序也隨機化了,並非如今訪問了第i個p下一次就訪問第i+1個p,而是使用了一種僞隨機的方式遍歷allp中的每一個p,防止每次遍歷時使用一樣的順序訪問allp中的元素。下面是這個算法的僞代碼:ui

offset := uint32(random()) % nprocs
coprime := 隨機選取一個小於nprocs且與nprocs互質的數
for i := 0; i < nprocs; i++ {
    p := allp[offset]
    從p的運行隊列偷取goroutine
    if 偷取成功 {
        break
    }
    offset += coprime
    offset = offset % nprocs
}

下面舉例說明一下上述算法過程,現假設nprocs爲8,也就是一共有8個p。atom

若是第一次隨機選擇的offset = 6,coprime = 3(3與8互質,知足算法要求)的話,則從allp切片中偷取的下標順序爲6, 1, 4, 7, 2, 5, 0, 3,計算過程:spa

6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3

若是第二次隨機選擇的offset = 4,coprime = 5的話,則從allp切片中偷取的下標順序爲1, 6, 3, 0, 5, 2, 7, 4,計算過程:操作系統

1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4

能夠看到只要隨機數不同,偷取p的順序也不同,但能夠保證通過8次循環,每一個p都會被訪問到。能夠用數論知識證實,無論nprocs是多少,這個算法均可以保證通過nprocs次循環,每一個p均可以獲得訪問。

挑選出盜取的對象p以後,則調用runqsteal盜取p的運行隊列中的goroutine,runqsteal函數再調用runqgrap從p的隊列中批量拿出多個goroutine,這兩個函數自己比較簡單,但runqgrab有一個小細節須要注意一下,見下面代碼:

runtime/proc.go : 4854

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
	for {
		h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
		t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
		n := t - h        //計算隊列中有多少個goroutine
		n = n - n/2     //取隊列中goroutine個數的一半
		if n == 0 {
			......
			return ......
		}
        //小細節:按理說隊列中的goroutine個數最多就是len(_p_.runq),
        //因此n的最大值也就是len(_p_.runq)/2,那爲何須要這個判斷呢?
		if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
			continue
		}
        
		......
	}
}

代碼中n的計算很簡單,從計算過程來看n應該是runq隊列中goroutine數量的一半,它的最大值不會超過隊列容量的一半,但爲何這裏的代碼卻恰恰要去判斷n是否大於隊列容量的一半呢?這裏關鍵點在於讀取runqhead和runqtail是兩個操做而非一個原子操做,當咱們讀取runqhead以後但還未讀取runqtail以前,若是有其它線程快速的在增長(這是徹底有可能的,其它偷取者從隊列中偷取goroutine會增長runqhead,而隊列的全部者往隊列中添加goroutine會增長runqtail)這兩個值,則會致使咱們讀取出來的runqtail已經遠遠大於咱們以前讀取出來放在局部變量h裏面的runqhead了,也就是代碼註釋中所說的h和t已經不一致了,因此這裏須要這個if判斷來檢測異常狀況。

工做線程進入睡眠

分析完盜取過程,咱們繼續回到findrunnable函數。

若是工做線程通過屢次努力一直找不到須要運行的goroutine則調用stopm進入睡眠狀態,等待被其它工做線程喚醒。

runtime/proc.go : 1918

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
   _g_ := getg()

   if _g_.m.locks != 0 {
      throw("stopm holding locks")
   }
   if _g_.m.p != 0 {
      throw("stopm holding p")
   }
   if _g_.m.spinning {
      throw("stopm spinning")
   }

   lock(&sched.lock)
   mput(_g_.m)   //把m結構體對象放入sched.midle空閒隊列
   unlock(&sched.lock)
   notesleep(&_g_.m.park)  //進入睡眠狀態
  
   //被其它工做線程喚醒
   noteclear(&_g_.m.park)
   acquirep(_g_.m.nextp.ptr())
   _g_.m.nextp = 0
}

stopm的核心是調用mput把m結構體對象放入sched的midle空閒隊列,而後經過notesleep(&m.park)函數讓本身進入睡眠狀態

note是go runtime實現的一次性睡眠和喚醒機制,一個線程能夠經過調用notesleep(*note)進入睡眠狀態,而另一個線程則能夠經過notewakeup(*note)把其喚醒。note的底層實現機制跟操做系統相關,不一樣系統使用不一樣的機制,好比linux下使用的futex系統調用,而mac下則是使用的pthread_cond_t條件變量,note對這些底層機制作了一個抽象和封裝,這種封裝給擴展性帶來了很大的好處,好比當睡眠和喚醒功能須要支持新平臺時,只須要在note層增長對特定平臺的支持便可,不須要修改上層的任何代碼。

回到stopm,當從notesleep函數返回後,須要再次綁定一個p,而後返回到findrunnable函數繼續從新尋找可運行的goroutine,一旦找到可運行的goroutine就會返回到schedule函數,並把找到的goroutine調度起來運行,如何把goroutine調度起來運行的代碼咱們已經分析過了。如今繼續看notesleep函數。

runtime/lock_futex.go : 139

func notesleep(n *note) {
	gp := getg()
	if gp != gp.m.g0 {
		throw("notesleep not on g0")
	}
	ns := int64(-1)  //超時時間設置爲-1,表示無限期等待
	if *cgo_yield != nil {
		// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
		ns = 10e6
	}
  
        //使用循環,保證不是意外被喚醒
	for atomic.Load(key32(&n.key)) == 0 {
		gp.m.blocked = true
		futexsleep(key32(&n.key), 0, ns)
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield, nil)
		}
		gp.m.blocked = false
	}
}

notesleep函數調用futexsleep進入睡眠,這裏之因此須要用一個循環,是由於futexsleep有可能意外從睡眠中返回,因此從futexsleep函數返回後還須要檢查note.key是否仍是0,若是是0則表示並非其它工做線程喚醒了咱們,只是futexsleep意外返回了,須要再次調用futexsleep進入睡眠。

futexsleep調用futex函數進入睡眠。

runtime/os_linux.go : 32

// Atomically,
//	if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
	var ts timespec

	// Some Linux kernels have a bug where futex of
	// FUTEX_WAIT returns an internal error code
	// as an errno. Libpthread ignores the return value
	// here, and so can we: as it says a few lines up,
	// spurious wakeups are allowed.
	if ns < 0 {
         //調用futex進入睡眠
		futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
		return
	}

	// It's difficult to live within the no-split stack limits here.
	// On ARM and 386, a 64-bit divide invokes a general software routine
	// that needs more stack than we can afford. So we use timediv instead.
	// But on real 64-bit systems, where words are larger but the stack limit
	// is not, even timediv is too heavy, and we really need to use just an
	// ordinary machine instruction.
	if sys.PtrSize == 8 {
		ts.set_sec(ns / 1000000000)
		ts.set_nsec(int32(ns % 1000000000))
	} else {
		ts.tv_nsec = 0
		ts.set_sec(int64(timediv(ns, 1000000000, (*int32)(unsafe.Pointer(&ts.tv_nsec)))))
	}
	futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}

futex是go彙編實現的函數,主要功能就是執行futex系統調用進入操做系統內核進行睡眠。

runtime/sys_linux_amd64.s : 525

// int64 futex(int32 *uaddr, int32 op, int32 val,
//    struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
    #下面的6條指令在爲futex系統調用準備參數
    MOVQ    addr+0(FP), DI
    MOVL    op+8(FP), SI
    MOVL    val+12(FP), DX
    MOVQ    ts+16(FP), R10
    MOVQ    addr2+24(FP), R8
    MOVL    val3+32(FP), R9
    
    MOVL    $SYS_futex, AX   #系統調用編號放入AX寄存器
    SYSCALL  #執行futex系統調用進入睡眠,從睡眠中被喚醒後接着執行下一條MOVL指令
    MOVL    AX, ret+40(FP)    #保存系統調用的返回值
    RET

futex系統的參數比較多,其函數原型爲

int64 futex(int32*uaddr, int32op, int32val, structtimespec*timeout, int32*uaddr2, int32val2);

這裏,futex系統調用爲咱們提供的功能爲若是 *uaddr == val 則進入睡眠,不然直接返回。順便說一下,爲何futex系統調用須要第三個參數val,須要在內核判斷*uaddr與val是否相等,而不能在用戶態先判斷它們是否相等,若是相等才進入內核睡眠豈不是更高效?緣由在於判斷*uaddr與val是否相等和進入睡眠這兩個操做必須是一個原子操做,不然會存在一個競態條件:若是不是原子操做,則當前線程在第一步判斷完*uaddr與val相等以後進入睡眠以前的這一小段時間內,有另一個線程經過喚醒操做把*uaddr的值修改了,這就會致使當前工做線程永遠處於睡眠狀態而無人喚醒它。而在用戶態沒法實現判斷與進入睡眠這兩步爲一個原子操做,因此須要內核來爲其實現原子操做。

咱們知道線程一旦進入睡眠狀態就中止了運行,那麼若是後來又有可運行的goroutine須要工做線程去運行,正在睡眠的線程怎麼知道有工做可作了呢?

從前面的代碼咱們已經看到,stopm調用notesleep時給它傳遞的參數是m結構體的park成員,而m又早已經過mput放入了全局的milde空閒隊列,這樣其它運行着的線程一旦發現有更多的goroutine須要運行時就能夠經過全局的m空閒隊列找處處於睡眠狀態的m,而後調用notewakeup(&m.park)將其喚醒,至於怎麼喚醒,咱們在其它章節繼續討論。

到此,咱們已經完整分析了調度器的調度策略,從下一章起咱們將開始討論有關調度的另一個話題:調度時機,即何時會發生調度。

相關文章
相關標籤/搜索