Goroutine被動調度之一(18)

本文是《Go語言調度器源代碼情景分析》系列的第18篇,也是第四章《Goroutine被動調度》的第1小節。編程


前一章咱們詳細分析了調度器的調度策略,即調度器如何選取下一個進入運行的goroutine,但咱們還不清楚何時以及什麼狀況下會發生調度,從這一章開始咱們就來討論這個問題。緩存

整體說來,go語言的調度器會在如下三種狀況下對goroutine進行調度:併發

  1. goroutine執行某個操做因條件不知足須要等待而發生的調度;dom

  2. goroutine主動調用Gosched()函數讓出CPU而發生的調度;函數

  3. goroutine運行時間太長或長時間處於系統調用之中而被調度器剝奪運行權而發生的調度。ui

本章主要分析咱們稱之爲被動調度的第1種調度,剩下的兩種調度將在後面兩章分別進行討論。this

Demo例子atom

咱們以一個demo程序爲例來分析因阻塞而發生的被動調度。spa

package main

func start(c chan int) {
    c<-100
}

func main() {
    c:=make(chan int)

    go start(c)

    <-c
}

 該程序啓動時,main goroutine首先會建立一個無緩存的channel,而後啓動一個goroutine(爲了方便討論咱們稱它爲g2)向channel發送數據,而main本身則去讀取這個channel。線程

這兩個goroutine讀寫channel時必定會發生一次阻塞,不是main goroutine讀取channel時發生阻塞就是g2寫入channel時發生阻塞。

建立g2 goroutine

首先用gdb反彙編一下main函數,看看彙編代碼。

0x44f4d0<+0>: mov   %fs:0xfffffffffffffff8,%rcx
0x44f4d9<+9>: cmp   0x10(%rcx),%rsp
0x44f4dd<+13>: jbe   0x44f549 <main.main+121>
0x44f4df<+15>: sub   $0x28,%rsp
0x44f4e3<+19>: mov   %rbp,0x20(%rsp)
0x44f4e8<+24>: lea   0x20(%rsp),%rbp
0x44f4ed<+29>: lea   0xb36c(%rip),%rax       
0x44f4f4<+36>: mov   %rax,(%rsp)
0x44f4f8<+40>: movq   $0x0,0x8(%rsp)
0x44f501<+49>: callq    0x404330 <runtime.makechan>  #建立channel
0x44f506<+54>: mov   0x10(%rsp),%rax
0x44f50b<+59>: mov   %rax,0x18(%rsp)
0x44f510<+64>: movl   $0x8,(%rsp)
0x44f517<+71>: lea   0x240f2(%rip),%rcx       
0x44f51e<+78>: mov   %rcx,0x8(%rsp)
0x44f523<+83>: callq   0x42c1b0 <runtime.newproc> #建立goroutine
0x44f528<+88>: mov   0x18(%rsp),%rax
0x44f52d<+93>: mov   %rax,(%rsp)
0x44f531<+97>: movq   $0x0,0x8(%rsp)
0x44f53a<+106>: callq   0x405080 <runtime.chanrecv1> #從channel讀取數據
0x44f53f<+111>: mov   0x20(%rsp),%rbp
0x44f544<+116>: add   $0x28,%rsp
0x44f548<+120>: retq   
0x44f549<+121>: callq 0x447390 <runtime.morestack_noctxt>
0x44f54e<+126>: jmp   0x44f4d0 <main.main>

從main函數的彙編代碼咱們能夠看到,建立goroutine的go關鍵字被編譯器翻譯成了對runtime.newproc函數的調用,第二章咱們對這個函數的主要流程作過詳細分析,這裏簡單的回顧一下:

  1. 切換到g0棧;

  2. 分配g結構體對象;

  3. 初始化g對應的棧信息,並把參數拷貝到新g的棧上;

  4. 設置好g的sched成員,該成員包括調度g時所必須pc, sp, bp等調度信息;

  5. 調用runqput函數把g放入運行隊列;

  6. 返回

由於當時咱們的主要目標是調度器的初始化部分,因此並無詳細分析上述流程中的第5步,也就是runqput是如何把goroutine放入運行隊列的,如今就回頭分析一下這個過程,下面咱們直接從runqput函數開始。

經過runqput函數把goroutine掛入運行隊列

runtime/proc.go : 4746

// runqput tries to put g on the local runnable queue.
// If next is false, runqput adds g to the tail of the runnable queue.
// If next is true, runqput puts g in the _p_.runnext slot.
// If the run queue is full, runnext puts g on the global queue.
// Executed only by the owner P.
func runqput(_p_ *p, gp *g, next bool)   {
    if randomizeScheduler && next && fastrand() % 2 == 0  {
        next = false
    }

    if next  {
        //把gp放在_p_.runnext成員裏,
        //runnext成員中的goroutine會被優先調度起來運行
    retryNext:
        oldnext := _p_.runnext
        if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp)))  {
             //有其它線程在操做runnext成員,須要重試
            goto retryNext
        }
        if oldnext == 0  { //本來runnext爲nil,因此沒任何事情可作了,直接返回
            return
        }
        // Kick the old runnext out to the regular run queue.
        gp = oldnext.ptr() //本來存放在runnext的gp須要放入runq的尾部
    }

retry:
    //可能有其它線程正在併發修改runqhead成員,因此須要跟其它線程同步
    h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with consumers
    t := _p_.runqtail
    if t - h < uint32(len(_p_.runq))  { //判斷隊列是否滿了
        //隊列尚未滿,能夠放入
        _p_.runq[t % uint32(len(_p_.runq))].set(gp)
       
        // store-release, makes it available for consumption
        //雖然沒有其它線程併發修改這個runqtail,但其它線程會併發讀取該值以及p的runq成員
        //這裏使用StoreRel是爲了:
        //1,原子寫入runqtail
        //2,防止編譯器和CPU亂序,保證上一行代碼對runq的修改發生在修改runqtail以前
        //3,可見行屏障,保證當前線程對運行隊列的修改對其它線程立馬可見
        atomic.StoreRel(&_p_.runqtail, t + 1)
        return
    }
    //p的本地運行隊列已滿,須要放入全局運行隊列
    if runqputslow(_p_, gp, h, t) {
        return
    }
    // the queue is not full, now the put above must succeed
    goto retry
}

runqput函數流程很清晰,它首先嚐試把gp放入_p_的本地運行隊列,若是本地隊列滿了,則經過runqputslow函數把gp放入全局運行隊列。

runtime/proc.go : 4784

// Put g and a batch of work from local runnable queue on global queue.
// Executed only by the owner P.
func runqputslow(_p_ *p, gp *g, h, t uint32) bool  {
    var batch [len(_p_.runq) / 2 + 1]*g  //gp加上_p_本地隊列的一半

    // First, grab a batch from local queue.
    n := t - h
    n = n / 2
    if n != uint32(len(_p_.runq) / 2)  {
        throw("runqputslow: queue is not full")
    }
    for i := uint32(0); i < n; i++ { //取出p本地隊列的一半
        batch[i] = _p_.runq[(h+i) % uint32(len(_p_.runq))].ptr()
    }
    if !atomic.CasRel(&_p_.runqhead, h, h + n)  { // cas-release, commits consume
        //若是cas操做失敗,說明已經有其它工做線程從_p_的本地運行隊列偷走了一些goroutine,因此直接返回
        return false
    }
    batch[n] = gp

    if randomizeScheduler {
        for i := uint32(1); i <= n; i++ {
            j := fastrandn(i + 1)
            batch[i], batch[j] = batch[j], batch[i]
        }
    }

    // Link the goroutines.
    //全局運行隊列是一個鏈表,這裏首先把全部須要放入全局運行隊列的g連接起來,
    //減小後面對全局鏈表的鎖住時間,從而下降鎖衝突
    for i := uint32(0); i < n; i++  {
        batch[i].schedlink.set(batch[i+1])
    }
    var q gQueue
    q.head.set(batch[0])
    q.tail.set(batch[n])

    // Now put the batch on global queue.
    lock(&sched.lock)
    globrunqputbatch(&q, int32(n+1))
    unlock(&sched.lock)
    return true
}

runqputslow函數首先使用鏈表把從_p_的本地隊列中取出的一半連同gp一塊兒串聯起來,而後在加鎖成功以後經過globrunqputbatch函數把該鏈表鏈入全局運行隊列(全局運行隊列是使用鏈表實現的)。值的一提的是runqputslow函數並無一開始就把全局運行隊列鎖住,而是等全部的準備工做作完以後才鎖住全局運行隊列,這是併發編程加鎖的基本原則,須要儘可能減少鎖的粒度,下降鎖衝突的機率。

分析完runqput函數是如何把goroutine放入運行隊列以後,接下來咱們繼續分析main goroutine因讀取channel而發生的阻塞流程。

因讀取channel阻塞而發生的被動調度

從代碼邏輯的角度來講,咱們不能肯定main goroutine和新建立出來的g2誰先運行,但對於咱們分析來講咱們能夠假定某個goroutine先運行,由於無論誰先運行,都會阻塞在channel的讀或則寫上,因此這裏咱們假設main建立好g2後首先阻塞在了對channel的讀操做上。下面咱們看看讀取channel的過程。

從前面的反彙編代碼咱們知道讀取channel是經過調用runtime.chanrecv1函數來完成的,咱們就從它開始分析,不過在分析過程當中咱們不會把精力放在對channel的操做上,而是分析這個過程當中跟調度有關的細節。

runtime/chan.go : 403

// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
    chanrecv(c, elem, true)
}

// runtime/chan.go : 415
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
    ......
    //省略部分的代碼邏輯主要在判斷讀取操做是否能夠當即完成,若是不能當即完成
    //就須要把g掛在channel c的讀取隊列上,而後調用goparkunlock函數阻塞此goroutine
    goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
    ......
}

chanrecv1直接調用chanrecv函數實現讀取操做,chanrecv首先會判斷channel是否有數據可讀,若是有數據則直接讀取並返回,但若是沒有數據,則須要把當前goroutine掛入channel的讀取隊列之中並調用goparkunlock函數阻塞該goroutine.

runtime/proc.go : 304

// Puts the current goroutine into a waiting state and unlocks the lock.
// The goroutine can be made runnable again by calling goready(gp).
func goparkunlock(lock*mutex, reasonwaitReason, traceEvbyte, traceskipint) {
    gopark(parkunlock_c, unsafe.Pointer(lock), reason, traceEv, traceskip)
}

// runtime/proc.go : 276
// Puts the current goroutine into a waiting state and calls unlockf.
// If unlockf returns false, the goroutine is resumed.
// unlockf must not access this G's stack, as it may be moved between
// the call to gopark and the call to unlockf.
// Reason explains why the goroutine has been parked.
// It is displayed in stack traces and heap dumps.
// Reasons should be unique and descriptive.
// Do not re-use reasons, add new ones.
func gopark(unlockffunc(*g, unsafe.Pointer) bool, lockunsafe.Pointer, reason    waitReason, traceEvbyte, traceskipint) {
    ......
    // can't do anything that might move the G between Ms here.
    mcall(park_m) //切換到g0棧執行park_m函數
}

goparkunlock函數直接調用gopark函數,gopark則調用mcall從當前main goroutine切換到g0去執行park_m函數(mcall前面咱們分析過,其主要做用就是保存當前goroutine的現場,而後切換到g0棧去調用做爲參數傳遞給它的函數)

runtime/proc.go : 2581

// park continuation on g0.
func park_m(gp*g) {
    _g_ := getg()

    if trace.enabled {
        traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
    }

    casgstatus(gp, _Grunning, _Gwaiting)
    dropg()  //解除g和m之間的關係

    ......
   
    schedule()
}

park_m首先把當前goroutine的狀態設置爲_Gwaiting(由於它正在等待其它goroutine往channel裏面寫數據),而後調用dropg函數解除g和m之間的關係,最後經過調用schedule函數進入調度循環,schedule函數咱們也詳細分析過,它首先會從運行隊列中挑選出一個goroutine,而後調用gogo函數切換到被挑選出來的goroutine去運行。由於main goroutine在讀取channel被阻塞以前已經把建立好的g2放入了運行隊列,因此在這裏schedule會把g2調度起來運行,這裏完成了一次從main goroutine到g2調度(咱們假設只有一個工做線程在進行調度)。

喚醒阻塞在channel上的goroutine

g2 goroutine的入口是start函數,下面咱們就從該函數開始分析g2寫channel的流程,看它如何喚醒正在等待着讀取channel的main goroutine。仍是先來反彙編一下start函數的代碼:

0x44f480<+0>:mov   %fs:0xfffffffffffffff8,%rcx
0x44f489<+9>:cmp   0x10(%rcx),%rsp
0x44f48d<+13>:jbe   0x44f4c1 <main.start+65>
0x44f48f<+15>:sub   $0x18,%rsp
0x44f493<+19>:mov   %rbp,0x10(%rsp)
0x44f498<+24>:lea   0x10(%rsp),%rbp
0x44f49d<+29>:mov   0x20(%rsp),%rax
0x44f4a2<+34>:mov   %rax,(%rsp)
0x44f4a6<+38>:lea   0x2d71b(%rip),%rax       
0x44f4ad<+45>:mov   %rax,0x8(%rsp)
0x44f4b2<+50>:callq   0x404560 <runtime.chansend1> #寫channel
0x44f4b7<+55>:mov   0x10(%rsp),%rbp
0x44f4bc<+60>:add   $0x18,%rsp
0x44f4c0<+64>:retq   
0x44f4c1<+65>:callq    0x447390 <runtime.morestack_noctxt>
0x44f4c6<+70>:jmp   0x44f480 <main.start>

能夠看到,編譯器把對channel的發送操做翻譯成了對runtime.chansend1函數的調用

runtime/chan.go : 124

/ entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}

// runtime/chan.go : 142
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
    ......
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        //能夠直接發送數據給sg
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    ......
}

// runtime/chan.go : 269
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    ......
    goready(gp, skip+1)
}

// runtime/proc.go : 310
func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

channel發送和讀取的流程相似,若是可以當即發送則當即發送並返回,若是不能當即發送則須要阻塞,在咱們這個場景中,由於main goroutine此時此刻正掛在channel的讀取隊列上等待數據,因此這裏直接調用send函數發送給main goroutine,send函數則調用goready函數切換到g0棧並調用ready函數來喚醒sg對應的goroutine,即正在等待讀channel的main goroutine。

runtime/proc.go : 639

// Mark gp ready to run.
func ready(gp *g, traceskip int, next bool) {
    ......
    // Mark runnable.
    _g_ := getg()
    ......
    // status is Gwaiting or Gscanwaiting, make Grunnable and put on runq
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next) //放入運行隊列
    if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
        //有空閒的p並且沒有正在偷取goroutine的工做線程,則須要喚醒p出來工做
        wakep()
    }
    ......
}

ready函數首先把須要喚醒的goroutine的狀態設置爲_Grunnable,而後把其放入運行隊列之中等待調度器的調度。

對於本章咱們分析的場景,執行到這裏main goroutine已經被放入了運行隊列,但還未被調度起來運行,而g2 goroutine在向channel寫完數據以後就從這裏的ready函數返回並退出了,從第二章咱們對goroutine的退出流程的分析能夠得知,在g2的退出過程當中將會在goexit0函數中調用schedule函數進入下一輪調度,從而把剛剛放入運行隊列的main goroutine調度起來運行。

在上面分析ready函數時咱們略過了一種狀況:若是當前有空閒的p並且沒有工做線程正在嘗試從各個工做線程的本地運行隊列偷取goroutine的話(沒有處於spinning狀態的工做線程),那麼就須要經過wakep函數把空閒的p喚醒起來工做。爲了避免讓篇幅過長,下一節咱們再來分析wakep如何去喚醒和建立新的工做線程。

相關文章
相關標籤/搜索