本文是《Go語言調度器源代碼情景分析》系列的第19篇,也是第四章《Goroutine被動調度》的第2小節。linux
本文須要重點關注:網絡
如何喚醒睡眠中的工做線程函數
如何建立新的工做線程oop
上一篇文章咱們分析到了ready函數經過把須要喚醒的goroutine放入運行隊列來喚醒它,本文接着上文繼續分析。ui
喚醒空閒的Patom
爲了充分利用CPU,ready函數在喚醒goroutine以後會去判斷是否須要啓動新工做線程出來工做,判斷規則是,若是當前有空閒的p並且沒有工做線程正在嘗試從各個工做線程的本地運行隊列偷取goroutine的話(沒有處於spinning狀態的工做線程),那麼就須要把空閒的p喚醒起來工做,詳見下面的ready函數:spa
runtime/proc.go : 639操作系統
// Mark gp ready to run. func ready(gp *g, traceskip int, next bool) { ...... // Mark runnable. _g_ := getg() ...... // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq casgstatus(gp, _Gwaiting, _Grunnable) runqput(_g_.m.p.ptr(), gp, next) //放入運行隊列 if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 { //有空閒的p並且沒有正在偷取goroutine的工做線程,則須要喚醒p出來工做 wakep() } ...... }
而喚醒空閒的p是由wakep函數完成的。線程
runtime/proc.go : 2051指針
// Tries to add one more P to execute G's. // Called when a G is made runnable (newproc, ready). func wakep() { // be conservative about spinning threads if !atomic.Cas(&sched.nmspinning, 0, 1) { return } startm(nil, true) }
wakep首先經過cas操做再次確認是否有其它工做線程正處於spinning狀態,這裏之因此須要使用cas操做再次進行確認,緣由在於,在當前工做線程經過以下條件
atomic.Load(&sched.npidle) != 0 & &atomic.Load(&sched.nmspinning) == 0
判斷到須要啓動工做線程以後到真正啓動工做線程以前的這一段時間以內,若是已經有工做線程進入了spinning狀態而在四處尋找須要運行的goroutine,這樣的話咱們就沒有必要再啓動一個多餘的工做線程出來了。
若是cas操做成功,則繼續調用startm建立一個新的或喚醒一個處於睡眠狀態的工做線程出來工做。
runtime/proc.go : 1947
// Schedules some M to run the p (creates an M if necessary). // If p==nil, tries to get an idle P, if no idle P's does nothing. // May run with m.p==nil, so write barriers are not allowed. // If spinning is set, the caller has incremented nmspinning and startm will // either decrement nmspinning or set m.spinning in the newly started M. //go:nowritebarrierrec func startm(_p_ *p, spinning bool) { lock(&sched.lock) if _p_ == nil { //沒有指定p的話須要從p的空閒隊列中獲取一個p _p_ = pidleget() //從p的空閒隊列中獲取空閒p if _p_ == nil { unlock(&sched.lock) if spinning { // The caller incremented nmspinning, but there are no idle Ps, // so it's okay to just undo the increment and give up. //spinning爲true表示進入這個函數以前已經對sched.nmspinning加了1,須要還原 if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 { throw("startm: negative nmspinning") } } return //沒有空閒的p,直接返回 } } mp := mget() //從m空閒隊列中獲取正處於睡眠之中的工做線程,全部處於睡眠狀態的m都在此隊列中 unlock(&sched.lock) if mp == nil { //沒有處於睡眠狀態的工做線程 var fn func() if spinning { // The caller incremented nmspinning, so set m.spinning in the new M. fn = mspinning } newm(fn, _p_) //建立新的工做線程 return } if mp.spinning { throw("startm: m is spinning") } if mp.nextp != 0 { throw("startm: m has p") } if spinning && !runqempty(_p_) { throw("startm: p has runnable gs") } // The caller incremented nmspinning, so set m.spinning in the new M. mp.spinning = spinning mp.nextp.set(_p_) //喚醒處於休眠狀態的工做線程 notewakeup(&mp.park) }
startm函數首先判斷是否有空閒的p結構體對象,若是沒有則直接返回,若是有則須要建立或喚醒一個工做線程出來與之綁定,從這裏能夠看出所謂的喚醒p,其實就是把空閒的p利用起來。
在確保有能夠綁定的p對象以後,startm函數首先嚐試從m的空閒隊列中查找正處於休眠狀態的工做線程,若是找到則經過notewakeup函數喚醒它,不然調用newm函數建立一個新的工做線程出來。
下面咱們首先分析notewakeup函數是如何喚醒工做線程的,而後再討論newm函數建立工做線程的流程。
喚醒睡眠中的工做線程
在第三章咱們討論過,當找不到須要運行的goroutine時,工做線程會經過notesleep函數睡眠在m.park成員上,因此這裏使用m.park成員做爲參數調用notewakeup把睡眠在該成員之上的工做線程喚醒。
runtime/lock_futex.go : 130
func notewakeup(n *note) { //設置n.key = 1, 被喚醒的線程經過查看該值是否等於1來肯定是被其它線程喚醒仍是意外從睡眠中甦醒 old := atomic.Xchg(key32(&n.key), 1) if old != 0 { print("notewakeup - double wakeup (", old, ")\n") throw("notewakeup - double wakeup") } //調用futexwakeup喚醒 futexwakeup(key32(&n.key), 1) }
notewakeup函數首先使用atomic.Xchg設置note.key值爲1,這是爲了使被喚醒的線程能夠經過查看該值是否等於1來肯定是被其它線程喚醒仍是意外從睡眠中甦醒了過來,若是該值爲1則表示是被喚醒的,能夠繼續工做了,但若是該值爲0則表示是意外甦醒,須要再次進入睡眠,工做線程甦醒以後的處理邏輯咱們已經在notesleep函數中見過,因此這裏略過。
把note.key的值設置爲1後,notewakeup函數繼續調用futexwakeup函數
runtime/os_linux.go : 66
// If any procs are sleeping on addr, wake up at most cnt. //go:nosplit func futexwakeup(addr *uint32, cnt uint32) { //調用futex函數喚醒工做線程 ret := futex(unsafe.Pointer(addr), _FUTEX_WAKE_PRIVATE, cnt, nil, nil, 0) if ret >= 0 { return } // I don't know that futex wakeup can return // EAGAIN or EINTR, but if it does, it would be // safe to loop and call futex again. systemstack(func() { print("futexwakeup addr=", addr, " returned ", ret, "\n") }) *(*int32)(unsafe.Pointer(uintptr(0x1006))) = 0x1006 }
對於Linux平臺來講,工做線程經過note睡眠實際上是經過futex系統調用睡眠在內核之中,因此喚醒處於睡眠狀態的線程也須要經過futex系統調用進入內核來喚醒,因此這裏的futexwakeup又繼續調用包裝了futex系統調用的futex函數來實現喚醒睡眠在內核中的工做線程。
runtime/sys_linux_amd64.s : 525
// int64 futex(int32 *uaddr, int32 op, int32 val, //struct timespec *timeout, int32 *uaddr2, int32 val2); TEXT runtime·futex(SB),NOSPLIT,$0 MOVQ addr+0(FP), DI #這6條指令在爲futex系統調用準備參數 MOVL op+8(FP), SI MOVL val+12(FP), DX MOVQ ts+16(FP), R10 MOVQ addr2+24(FP), R8 MOVL val3+32(FP), R9 MOVL $SYS_futex, AX #futex系統調用編號放入AX寄存器 SYSCALL #系統調用,進入內核 MOVL AX, ret+40(FP) #系統調用經過AX寄存器返回返回值,這裏把返回值保存到內存之中 RET
futex函數由彙編代碼寫成,前面的幾條指令都在爲futex系統調用準備參數,參數準備完成以後則經過SYSCALL指令進入操做系統內核完成線程的喚醒功能,內核在完成喚醒工做以後當前工做線程則從內核返回到futex函數繼續執行SYSCALL指令以後的代碼並按函數調用鏈原路返回,繼續執行其它代碼,而被喚醒的工做線程則由內核負責在適當的時候調度到CPU上運行。
看完喚醒流程,下面咱們來分析工做線程的建立。
建立工做線程
回到startm函數,若是沒有正處於休眠狀態的工做線程,則須要調用newm函數新建一個工做線程。
runtime/proc.go : 1807
// Create a new m. It will start off with a call to fn, or else the scheduler. // fn needs to be static and not a heap allocated closure. // May run with m.p==nil, so write barriers are not allowed. //go:nowritebarrierrec func newm(fn func(), _p_ *p) { mp := allocm(_p_, fn) mp.nextp.set(_p_) ...... newm1(mp) }
newm首先調用allocm函數從堆上分配一個m結構體對象,而後調用newm1函數。
runtime/proc.go : 1843
func newm1(mp *m) { //省略cgo相關代碼....... execLock.rlock() // Prevent process clone. newosproc(mp) execLock.runlock() }
newm1繼續調用newosproc函數,newosproc的主要任務是調用clone函數建立一個系統線程,而新建的這個系統線程將從mstart函數開始運行。
runtime/os_linux.go : 143
// May run with m.p==nil, so write barriers are not allowed. //go:nowritebarrier func newosproc(mp *m) { stk := unsafe.Pointer(mp.g0.stack.hi) ...... ret := clone(cloneFlags, stk, unsafe.Pointer(mp), unsafe.Pointer(mp.g0), unsafe.Pointer(funcPC(mstart))) ...... } //clone系統調用的Flags選項 cloneFlags = _CLONE_VM | /* share memory */ //指定父子線程共享進程地址空間 _CLONE_FS | /* share cwd, etc */ _CLONE_FILES | /* share fd table */ _CLONE_SIGHAND | /* share sig handler table */ _CLONE_SYSVSEM | /* share SysV semaphore undo lists (see issue #20763) */ _CLONE_THREAD /* revisit - okay for now */ //建立子線程而不是子進程
clone函數是由彙編語言實現的,該函數使用clone系統調用完成建立系統線程的核心功能。咱們分段來看
runtime/sys_linux_amd64.s : 539
// int32 clone(int32 flags, void *stk, M *mp, G *gp, void (*fn)(void)); TEXT runtime·clone(SB),NOSPLIT,$0 MOVL flags+0(FP), DI//系統調用的第一個參數 MOVQ stk+8(FP), SI //系統調用的第二個參數 MOVQ $0, DX //第三個參數 MOVQ $0, R10 //第四個參數 // Copy mp, gp, fn off parent stack for use by child. // Careful:Linux system call clobbers CXand R11. MOVQ mp+16(FP), R8 MOVQ gp+24(FP), R9 MOVQ fn+32(FP), R12 MOVL $SYS_clone, AX SYSCALL
clone函數首先用了4條指令爲clone系統調用準備參數,該系統調用一共須要四個參數,根據Linux系統調用約定,這四個參數須要分別放入rdi, rsi,rdx和r10寄存器中,這裏最重要的是第一個參數和第二個參數,分別用來指定內核建立線程時須要的選項和新線程應該使用的棧。由於即將被建立的線程與當前線程共享同一個進程地址空間,因此這裏必須爲子線程指定其使用的棧,不然父子線程會共享同一個棧從而形成混亂,從上面的newosproc函數能夠看出,新線程使用的棧爲m.g0.stack.lo~m.g0.stack.hi這段內存,而這段內存是newm函數在建立m結構體對象時從進程的堆上分配而來的。
準備好系統調用的參數以後,還有另一件很重的事情須要作,那就是把clone函數的其它幾個參數(mp, gp和線程入口函數)保存到寄存器中,之因此須要在系統調用以前保存這幾個參數,緣由在於這幾個參數目前還位於父線程的棧之中,而一旦經過系統調用把子線程建立出來以後,子線程將會使用咱們在clone系統調用時給它指定的棧,因此這裏須要把這幾個參數先保存到寄存器,等子線程從系統調用返回後直接在寄存器中獲取這幾個參數。這裏要注意的是雖然這個幾個參數值保存在了父線程的寄存器之中,但建立子線程時,操做系統內核會把父線程的全部寄存器幫咱們複製一份給子線程,因此當子線程開始運行時就能拿到父線程保存在寄存器中的值,從而拿到這幾個參數。這些準備工做完成以後代碼調用syscall指令進入內核,由內核幫助咱們建立系統線程。
clone系統調用完成後實際上就多了一個操做系統線程,新建立的子線程和當前線程都得從系統調用返回而後繼續執行後面的代碼,那麼從系統調用返回以後咱們怎麼知道哪一個是父線程哪一個是子線程,從而來決定它們的執行流程?使用過fork系統調用的讀者應該知道,咱們須要經過返回值來判斷父子線程,系統調用的返回值若是是0則表示這是子線程,不爲0則表示這個是父線程。用c代碼來描述大概就是這個樣子:
if (clone(...) == 0) { //子線程 子線程代碼 } else {//父線程 父線程代碼 }
雖然這裏只有一次clone調用,但它卻返回了2次,一次返回到父線程,一次返回到子線程,而後2個線程各自執行本身的代碼流程。
回到clone函數,下面代碼的第一條指令就在判斷系統調用的返回值,若是是子線程則跳轉到後面的代碼繼續執行,若是是父線程,它建立子線程的任務已經完成,因此這裏把返回值保存在棧上以後就直接執行ret指令返回到newosproc函數了。
runtime/sys_linux_amd64.s : 555
// In parent, return. CMPQ AX, $0 #判斷clone系統調用的返回值 JEQ 3(PC) / #跳轉到子線程部分 MOVL AX, ret+40(FP) #父線程須要執行的指令 RET #父線程須要執行的指令
而對於子線程來講,還有不少初始化工做要作,下面是子線程須要繼續執行的指令。
runtime/sys_linux_amd64.s : 561
# In child, on new stack. #子線程須要繼續執行的指令 MOVQ SI, SP #設置CPU棧頂寄存器指向子線程的棧頂,這條指令看起來是多餘的?內核應該已經把SP設置好了 # If g or m are nil, skip Go-related setup. CMPQ R8, $0 # m,新建立的m結構體對象的地址,由父線程保存在R8寄存器中的值被複制到了子線程 JEQ nog CMPQ R9, $0 # g,m.g0的地址,由父線程保存在R9寄存器中的值被複制到了子線程 JEQ nog # Initialize m->procid to Linux tid MOVL $SYS_gettid, AX #經過gettid系統調用獲取線程ID(tid) SYSCALL MOVQ AX, m_procid(R8) #m.procid = tid #Set FS to point at m->tls. #新線程剛剛建立出來,還未設置線程本地存儲,即m結構體對象還未與工做線程關聯起來, #下面的指令負責設置新線程的TLS,把m對象和工做線程關聯起來 LEAQ m_tls(R8), DI #取m.tls字段的地址 CALL runtime·settls(SB) #In child, set up new stack get_tls(CX) MOVQ R8, g_m(R9) # g.m = m MOVQ R9, g(CX) # tls.g = &m.g0 CALL runtime·stackcheck(SB) nog: # Call fn CALL R12 #這裏調用mstart函數 ......
這段代碼的第一條指令把CPU寄存器的棧頂指針設置爲新線程的的棧頂,這條指令看起來是多餘的,由於咱們在clone系統調用時已經把棧信息告訴操做系統了,操做系統在把新線程調度起來運行時已經幫咱們把CPU的rsp寄存器設置好了,這裏應該不必本身去設置。接下來的4條指令判斷m和g是否爲nil,若是是則直接去執行fn函數,對於咱們這個流程來講,由於如今正在建立工做線程,因此m和g(實際上是m.g0)都不爲空,於是須要繼續對m進行初始化。
對新建立出來的工做線程的初始化過程從上面代碼片斷的第6條指令開始,它首先經過系統調用獲取到子線程的線程id,並賦值給m.procid,而後調用settls設置線程本地存儲並經過把m.g0的地址放入線程本地存儲之中,從而實現了m結構體對象與工做線程之間的關聯,settls函數咱們已經在第二章詳細分析過,因此這裏直接跳過。
新工做線程的初始化完成以後,便開始執行mstart函數,咱們在第二章也見過該函數,主線程初始化完成以後也是調用的它。回憶一下,mstart函數首先會去設置m.g0的stackguard成員,而後調用mstart1()函數把當前工做線程的g0的調度信息保存在m.g0.sched成員之中,最後經過調用schedule函數進入調度循環。
總結
本章僅以讀寫channel爲例分析了goroutine因操做被阻塞而發生的被動調度,其實發生被動調度的狀況還比較多,好比因讀寫網絡鏈接而阻塞、加鎖被阻塞或select操做阻塞等等都會發生被動調度,讀者能夠自行閱讀相關源代碼。
本章還分析了睡眠中的工做線程是如何被喚起起來工做的以及新工做線程的建立和初始化流程。