本文是《Go語言調度器源代碼情景分析》系列的第16篇,也是第三章《Goroutine調度策略》的第1小節。算法
在調度器概述一節咱們提到過,所謂的goroutine調度,是指程序代碼按照必定的算法在適當的時候挑選出合適的goroutine並放到CPU上去運行的過程。這句話揭示了調度系統須要解決的三大核心問題:數組
調度時機:何時會發生調度?併發
調度策略:使用什麼策略來挑選下一個進入運行的goroutine?負載均衡
切換機制:如何把挑選出來的goroutine放到CPU上運行?函數
對這三大問題的解決構成了調度器的全部工做,於是咱們對調度器的分析也必將圍繞着它們所展開。ui
第二章咱們已經詳細的分析了調度器的初始化以及goroutine的切換機制,本章將重點討論調度器如何挑選下一個goroutine出來運行的策略問題,而剩下的與調度時機相關的內容咱們將在第4~6章進行全面的分析。atom
再探schedule函數spa
在討論main goroutine的調度時咱們已經見過schedule函數,由於當時咱們的主要關注點在於main goroutine是如何被調度到CPU上運行的,因此並未對schedule函數如何挑選下一個goroutine出來運行作深刻的分析,如今是從新回到schedule函數詳細分析其調度策略的時候了。線程
runtime/proc.go : 2467指針
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() { _g_ := getg() //_g_ = m.g0 ...... var gp *g ...... if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. //爲了保證調度的公平性,每一個工做線程每進行61次調度就須要優先從全局運行隊列中獲取goroutine出來運行, //由於若是隻調度本地運行隊列中的goroutine,則全局運行隊列中的goroutine有可能得不到運行 if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { lock(&sched.lock) //全部工做線程都能訪問全局運行隊列,因此須要加鎖 gp = globrunqget(_g_.m.p.ptr(), 1) //從全局運行隊列中獲取1個goroutine unlock(&sched.lock) } } if gp == nil { //從與m關聯的p的本地運行隊列中獲取goroutine gp, inheritTime = runqget(_g_.m.p.ptr()) if gp != nil && _g_.m.spinning { throw("schedule: spinning with local work") } } if gp == nil { //若是從本地運行隊列和全局運行隊列都沒有找到須要運行的goroutine, //則調用findrunnable函數從其它工做線程的運行隊列中偷取,若是偷取不到,則當前工做線程進入睡眠, //直到獲取到須要運行的goroutine以後findrunnable函數纔會返回。 gp, inheritTime = findrunnable() // blocks until work is available } ...... //當前運行的是runtime的代碼,函數調用棧使用的是g0的棧空間 //調用execte切換到gp的代碼和棧空間去運行 execute(gp, inheritTime) }
schedule函數分三步分別從各運行隊列中尋找可運行的goroutine:
第一步,從全局運行隊列中尋找goroutine。爲了保證調度的公平性,每一個工做線程每通過61次調度就須要優先嚐試從全局運行隊列中找出一個goroutine來運行,這樣才能保證位於全局運行隊列中的goroutine獲得調度的機會。全局運行隊列是全部工做線程均可以訪問的,因此在訪問它以前須要加鎖。
第二步,從工做線程本地運行隊列中尋找goroutine。若是不須要或不能從全局運行隊列中獲取到goroutine則從本地運行隊列中獲取。
第三步,從其它工做線程的運行隊列中偷取goroutine。若是上一步也沒有找到須要運行的goroutine,則調用findrunnable從其餘工做線程的運行隊列中偷取goroutine,findrunnable函數在偷取以前會再次嘗試從全局運行隊列和當前線程的本地運行隊列中查找須要運行的goroutine。
下面咱們先來看如何從全局運行隊列中獲取goroutine。
從全局運行隊列中獲取goroutine
從全局運行隊列中獲取可運行的goroutine是經過globrunqget函數來完成的,該函數的第一個參數是與當前工做線程綁定的p,第二個參數max表示最多能夠從全局隊列中拿多少個g到當前工做線程的本地運行隊列中來。
runtime/proc.go : 4663
// Try get a batch of G's from the global runnable queue. // Sched must be locked. func globrunqget(_p_ *p, max int32) *g { if sched.runqsize == 0 { //全局運行隊列爲空 return nil } //根據p的數量平分全局運行隊列中的goroutines n := sched.runqsize / gomaxprocs + 1 if n > sched.runqsize { //上面計算n的方法可能致使n大於全局運行隊列中的goroutine數量 n = sched.runqsize } if max > 0 && n > max { n = max //最多取max個goroutine } if n > int32(len(_p_.runq)) / 2 { n = int32(len(_p_.runq)) / 2 //最多隻能取本地隊列容量的一半 } sched.runqsize -= n //直接經過函數返回gp,其它的goroutines經過runqput放入本地運行隊列 gp := sched.runq.pop() //pop從全局運行隊列的隊列頭取 n-- for ; n > 0; n-- { gp1 := sched.runq.pop() //從全局運行隊列中取出一個goroutine runqput(_p_, gp1, false) //放入本地運行隊列 } return gp }
globrunqget函數首先會根據全局運行隊列中goroutine的數量,函數參數max以及_p_的本地隊列的容量計算出到底應該拿多少個goroutine,而後把第一個g結構體對象經過返回值的方式返回給調用函數,其它的則經過runqput函數放入當前工做線程的本地運行隊列。這段代碼值得一提的是,計算應該從全局運行隊列中拿走多少個goroutine時根據p的數量(gomaxprocs)作了負載均衡。
若是沒有從全局運行隊列中獲取到goroutine,那麼接下來就在工做線程的本地運行隊列中尋找須要運行的goroutine。
從工做線程本地運行隊列中獲取goroutine
從代碼上來看,工做線程的本地運行隊列其實分爲兩個部分,一部分是由p的runq、runqhead和runqtail這三個成員組成的一個無鎖循環隊列,該隊列最多可包含256個goroutine;另外一部分是p的runnext成員,它是一個指向g結構體對象的指針,它最多隻包含一個goroutine。
從本地運行隊列中尋找goroutine是經過runqget函數完成的,尋找時,代碼首先查看runnext成員是否爲空,若是不爲空則返回runnext所指的goroutine,並把runnext成員清零,若是runnext爲空,則繼續從循環隊列中查找goroutine。
runtime/proc.go : 4825
// Get g from local runnable queue. // If inheritTime is true, gp should inherit the remaining time in the // current time slice. Otherwise, it should start a new time slice. // Executed only by the owner P. func runqget(_p_ *p) (gp *g, inheritTime bool) { // If there's a runnext, it's the next G to run. //從runnext成員中獲取goroutine for { //查看runnext成員是否爲空,不爲空則返回該goroutine next := _p_.runnext if next == 0 { break } if _p_.runnext.cas(next, 0) { return next.ptr(), true } } //從循環隊列中獲取goroutine for { h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers t := _p_.runqtail if t == h { return nil, false } gp := _p_.runq[h%uint32(len(_p_.runq))].ptr() if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume return gp, false } } }
這裏首先須要注意的是不論是從runnext仍是從循環隊列中拿取goroutine都使用了cas操做,這裏的cas操做是必需的,由於可能有其餘工做線程此時此刻也正在訪問這兩個成員,從這裏偷取可運行的goroutine。
其次,代碼中對runqhead的操做使用了atomic.LoadAcq和atomic.CasRel,它們分別提供了load-acquire和cas-release語義。
對於atomic.LoadAcq來講,其語義主要包含以下幾條:
原子讀取,也就是說無論代碼運行在哪一種平臺,保證在讀取過程當中不會有其它線程對該變量進行寫入;
位於atomic.LoadAcq以後的代碼,對內存的讀取和寫入必須在atomic.LoadAcq讀取完成後才能執行,編譯器和CPU都不能打亂這個順序;
當前線程執行atomic.LoadAcq時能夠讀取到其它線程最近一次經過atomic.CasRel對同一個變量寫入的值,與此同時,位於atomic.LoadAcq以後的代碼,無論讀取哪一個內存地址中的值,均可以讀取到其它線程中位於atomic.CasRel(對同一個變量操做)以前的代碼最近一次對內存的寫入。
對於atomic.CasRel來講,其語義主要包含以下幾條:
原子的執行比較並交換的操做;
位於atomic.CasRel以前的代碼,對內存的讀取和寫入必須在atomic.CasRel對內存的寫入以前完成,編譯器和CPU都不能打亂這個順序;
線程執行atomic.CasRel完成後其它線程經過atomic.LoadAcq讀取同一個變量能夠讀到最新的值,與此同時,位於atomic.CasRel以前的代碼對內存寫入的值,能夠被其它線程中位於atomic.LoadAcq(對同一個變量操做)以後的代碼讀取到。
由於可能有多個線程會併發的修改和讀取runqhead,以及須要依靠runqhead的值來讀取runq數組的元素,因此須要使用atomic.LoadAcq和atomic.CasRel來保證上述語義。
咱們可能會問,爲何讀取p的runqtail成員不須要使用atomic.LoadAcq或atomic.load?由於runqtail不會被其它線程修改,只會被當前工做線程修改,此時沒有人修改它,因此也就不須要使用原子相關的操做。
最後,由p的runq、runqhead和runqtail這三個成員組成的這個無鎖循環隊列很是精妙,咱們會在後面的章節對這個循環隊列進行分析。
CAS操做與ABA問題
咱們知道使用cas操做須要特別注意ABA的問題,那麼runqget函數這兩個使用cas的地方會不會有問題呢?答案是這兩個地方都不會有ABA的問題。緣由分析以下:
首先來看對runnext的cas操做。只有跟_p_綁定的當前工做線程纔會去修改runnext爲一個非0值,其它線程只會把runnext的值從一個非0值修改成0值,然而跟_p_綁定的當前工做線程正在此處執行代碼,因此在當前工做線程讀取到值A以後,不可能有線程修改其值爲B(0)以後再修改回A。
再來看對runq的cas操做。當前工做線程操做的是_p_的本地隊列,只有跟_p_綁定在一塊兒的當前工做線程纔會由於往該隊列裏面添加goroutine而去修改runqtail,而其它工做線程不會往該隊列裏面添加goroutine,也就不會去修改runqtail,它們只會修改runqhead,因此,當咱們這個工做線程從runqhead讀取到值A以後,其它工做線程也就不可能修改runqhead的值爲B以後再第二次把它修改成值A(由於runqtail在這段時間以內不可能被修改,runqhead的值也就沒法越過runqtail再回繞到A值),也就是說,代碼從邏輯上已經杜絕了引起ABA的條件。
到此,咱們已經分析完工做線程從全局運行隊列和本地運行隊列獲取goroutine的代碼,因爲篇幅的限制,咱們下一節再來分析從其它工做線程的運行隊列偷取goroutine的流程。