Go語言定時器的實現

微信公衆號:LinuGo,歡迎關注

咱們都知道,Time.sleep(d duration)方法會阻塞一個協程的執行直到d時間結束。golang

用法很簡單,但內部實現倒是大有文章,每一個go版本的timer的實現都有所不一樣,本文基於go1.14,接下來分別從宏觀和圍觀介紹一遍主要調度實現過程。安全


圖文演示

下面介紹一種最簡單的場景:微信

首先存在多個goroutine,GT爲有time.Sleep休眠的g,當GT被調度到m上執行時,場景以下圖。數據結構

此時執行到了time.Sleep代碼,GT會與m解綁,同時將該GT的sleep時間等信息記錄到P的timers字段上,此時GT處於Gwaiting狀態,不在運行隊列上,調度器會調度一個新的G2到M上執行。(在每次調度過程當中,會檢查P裏面記錄的定時器,看看有沒有要執行的。)併發

G2執行完了,當要進行下一輪調度時,調度器檢查本身記錄的定時器時發現,GT到時間了,是時候執行了。因爲任務緊急,GT就會被強行插入到P的運行隊列的對頭,保證能立刻被執行到。函數

接下來就會直接調度到GT執行了,睡眠結束。接下來跟隨這個簡單場景看一下源碼實現。oop

階段1、進入睡眠

首先調用time.Sleep(1)會通過編譯器識別//go:linkname連接進入到runtime.timeSleep(n int)方法。ui

//go:linkname timeSleep time.Sleep
func timeSleep(ns int64) { 
   if ns <= 0 { //判斷入參是否正常
      return
   }   
   gp := getg() //獲取當前的goroutine
   t := gp.timer //若是不存在timer,new一個
   if t == nil {
      t = new(timer)
      gp.timer = t
   }
   t.f = goroutineReady  //後面喚醒時候會用到,修改goroutine狀態爲goready
   t.arg = gp
   t.nextwhen = nanotime() + ns  //記錄上喚醒時間
   gopark(resetForSleep, unsafe.Pointer(t), waitReasonSleep, traceEvGoSleep, 1)  //調用gopark掛起goroutine
}

resetForSleep做爲一個函數入參,他的調用棧依次爲resettimer(t, t.nextwhen) -> modtimer(t, when, t.period, t.f, t.arg, t.seq)(後文會講到),在後面的modtimer裏面會將timer定時器加入到當前goroutine所在的p中,定時器在p中的結構爲一個四叉堆,最近的時間的放在最堆頂上,對於這個數據結構沒有作深刻研究。atom

接下來看一下gopark中重要的部分。spa

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) { 
   ......省略了大部分代碼     
   mp.waitlock = lock  //因爲runningG和p沒有鏈接,將timer賦值到當前m上,後面會給到p
   mp.waitunlockf = unlockf  //將函數付給m
   ......
   mcall(park_m) //將當前的g停放
}

看一下gopark裏面的mcall裏面的回調函數park_m中的部分。

func park_m(gp *g) { 
   _g_ := getg()  //獲取當前goroutine
   ......

   casgstatus(gp, _Grunning, _Gwaiting)  //將goroutine狀態設爲waiting
   dropg()

   if fn := _g_.m.waitunlockf; fn != nil { //獲取到mresetForSleep函數
      ok := fn(gp, _g_.m.waitlock) //返回值是true
      _g_.m.waitunlockf = nil  //清空該m的函數空間
      _g_.m.waitlock = nil //...
   ......      }
   schedule()  //觸發新的調速循環,可執行隊列中獲取g到m上進行調度
}

看一下resetForSleep回調函數,裏面依次調用了resettimer(t, t.nextwhen) -> modtimer(t, when, t.period, t.f, t.arg, t.seq),resettimer函數沒有什麼重要信息,只負責返回一個true,看一下modtimer函數。

func modtimer(t *timer, when, period int64, f func(interface{}, uintptr), arg interface{}, seq uintptr) {  
   ...... 
loop:  
   for { 
      switch status = atomic.Load(&t.status); status {
      ......
      case timerNoStatus, timerRemoved:   //因爲剛建立,因此timer爲默認值0,對應timerNoStatus
         mp = acquirem()
         if atomic.Cas(&t.status, status, timerModifying) {
            wasRemoved = true   //設置標誌位爲true
            break loop
         }
         releasem(mp)
              badTimer()
      }
   }

   t.period = period
   t.f = f //上文傳過來的goroutineReady函數,用於將g轉變爲runnable狀態
   t.arg = arg  //上文的g實例
   t.seq = seq 

   if wasRemoved { //會執行到此處
      t.when = when 
      pp := getg().m.p.ptr() //獲取當前的p的指針
      lock(&pp.timersLock) //加鎖,爲了併發安全,由於timer能夠去其餘的p偷取
      doaddtimer(pp, t) //添加定時器到當前的p
      unlock(&pp.timersLock) //解鎖
      if !atomic.Cas(&t.status, timerModifying, timerWaiting) { //轉變到timerWaiting
         badTimer() 
      } 
      ......
}

當觸發完gopark方法,該goroutine脫離當前的m掛起,進入gwaiting狀態,不在任何運行隊列上。對應上圖2。


階段2、恢復執行

執行的恢復會在shedule()或者findRunnable()函數上,內部checkTimers(pp, 0)方法,該方法內部會判斷p中timers堆頂的定時器,若是時間到了的話(當前時間大於計算的時間),調用 runtime.runOneTimer ,該方法裏面會一系列調用到goready方法釋放阻塞的goroutine,並將該goroutine放到運行隊列的第一個。

接下來看一下checkTimers函數:

func checkTimers(pp *p, now int64) (rnow, pollUntil int64, ran bool) { 

   ......省略掉調整計時器時間的一些步驟
   lock(&pp.timersLock) //加鎖
   adjusttimers(pp) //調整計時器的時間
   rnow = now
   if len(pp.timers) > 0 {
      if rnow == 0 {
         rnow = nanotime()
      }
      for len(pp.timers) > 0 {
         if tw := runtimer(pp, rnow); tw != 0 { //進入runtimer方法,攜帶系統時間參數與處理器
            if tw > 0 {
               pollUntil = tw
            }
            break
         }
         ran = true
      }
   }
......
}

進入runtimer方法,會查看p裏面的堆頂的定時器,檢查是否須要執行。

func runtimer(pp *p, now int64) int64 {  
   for {
      t := pp.timers[0] //遍歷堆頂的定時器
     .......
      switch s := atomic.Load(&t.status); s {
      case timerWaiting:  //通過time.Sleep的定時器會是waiting狀態
         if t.when > now {  //判斷是否超過期間
             // Not ready to run.
            return t.when
         }

         if !atomic.Cas(&t.status, s, timerRunning) {  //修改計時器狀態
            continue
         }
         runOneTimer(pp, t, now) //運行該計時器函數
         return 0        ........

接下來調用runOneTimer函數處理。

func runOneTimer(pp *p, t *timer, now int64) {  
........

   f := t.f //goready函數
   arg := t.arg  //就是以前傳入的goroutine
   seq := t.seq  //默認值0

   if t.period > 0 {
    .........  //因爲period爲默認值0,會走else裏面   
    } else {
      dodeltimer0(pp)  //刪除該計時器在p中,該timer在0座標位
      if !atomic.Cas(&t.status, timerRunning, timerNoStatus) {  //設置爲nostatus
         badTimer()
      }
   }.......
   unlock(&pp.timersLock)

   f(arg, seq) //執行goroutineReady方法,喚起等待的goroutine
   .........
}

看一下上面的f(arg,seq)即goroutineReady方法的實現,該函數的實現就是直接調用了goready方法喚起goroutine,對應上圖3:

func goroutineReady(arg interface{}, seq uintptr) {  
   goready(arg.(*g), 0) //該處傳入的第二個參數表明調度到運行隊列的位置,該處設置爲0,說明直接調度到運行隊列即將要執行的位置,等待被執行。
}

另外,系統監控sysmon函數也能夠觸發定時器的調用,該函數是一個循環檢查系統中是否擁有應該被運行可是還在等待的定時器,並調度他們運行。

對於time.NewTimer函數等,實現方法也是大體類似,只是回調函數變成了sendTime函數,該函數不會阻塞。調用該函數後,睡眠的goroutine會從channel中釋放並加入運行隊列,有興趣能夠本身研究一下。

以上就是整個time.sleep的調度過程,你能夠根據我總結的對照源碼一步一步看,確定會加深印象,深刻理解。

參考文章

【1】《Go計時器》https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-timer/

【2】《Golang定時器底層實現剖析》https://www.cyhone.com/articles/analysis-of-golang-timer/

相關文章
相關標籤/搜索