定時器的實現通常有如下幾種:linux
建立 Timer 的代碼:nginx
func NewTimer(d Duration) *Timer { c := make(chan Time, 1) t := &Timer{ C: c, r: runtimeTimer{ when: when(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t }
建立 Ticker 的代碼:redis
func NewTicker(d Duration) *Ticker { if d <= 0 { panic(errors.New("non-positive interval for NewTicker")) } // Give the channel a 1-element time buffer. // If the client falls behind while reading, we drop ticks // on the floor until the client catches up. c := make(chan Time, 1) t := &Ticker{ C: c, r: runtimeTimer{ when: when(d), period: int64(d), f: sendTime, arg: c, }, } startTimer(&t.r) return t }
Timer 和 Ticker 都是調用的 startTimer(*runtimeTimer)
,區別是 Ticker 比 Timer 多了一個 period 字段。緩存
其中 startTimer()
的聲明以下:app
func startTimer(*runtimeTimer)
沒有實現,對應的是 runtime/time.go 中的以下函數:函數
// startTimer adds t to the timer heap. //go:linkname startTimer time.startTimer func startTimer(t *timer) { if raceenabled { racerelease(unsafe.Pointer(t)) } addtimer(t) }
這裏的參數是 runtime.timer,而傳進來時是 time.runtimeTimer,這兩個結構體字段是一一對應的:性能
// Interface to timers implemented in package runtime. // Must be in sync with ../runtime/time.go:/^type timer type runtimeTimer struct { tb uintptr i int when int64 period int64 f func(interface{}, uintptr) // NOTE: must not be closure arg interface{} seq uintptr } type timer struct { tb *timersBucket // the bucket the timer lives in i int // heap index // Timer wakes up at when, and then at when+period, ... (period > 0 only) // each time calling f(arg, now) in the timer goroutine, so f must be // a well-behaved function and not block. when int64 period int64 f func(interface{}, uintptr) arg interface{} seq uintptr }
在 startTimer()
中調用了 addtimer()
:ui
func addtimer(t *timer) { tb := t.assignBucket() lock(&tb.lock) ok := tb.addtimerLocked(t) unlock(&tb.lock) if !ok { badTimer() } }
在 runtime/time.go 中有一個全局變量 timers:this
var timers [timersLen]struct { timersBucket // The padding should eliminate false sharing // between timersBucket values. pad [cpu.CacheLinePadSize - unsafe.Sizeof(timersBucket{})%cpu.CacheLinePadSize]byte }
它的結構大概是這樣子的:code
timers 包含固定的 64 個 timersBucket,而每一個 timersBucket 中包含多個 *timer
(字段 t)。timersBucket 中的多個 timer 使用最小堆來組織的。
爲何是 64 個?
個數最好應該是 GOMAXPROCS 個,可是這樣的話就須要動態分配了,64 是根據內存使用和性能之間平衡得出的。
addtimer()
首先肯定一個 timersBucket,而後將 timer 放入這個 bucket 中。
怎麼肯定 bucket 的?
func (t *timer) assignBucket() *timersBucket { id := uint8(getg().m.p.ptr().id) % timersLen t.tb = &timers[id].timersBucket return t.tb }
根據當前 G 所在的 P 的 id。
而後是放入 bucket 中的邏輯:
func (tb *timersBucket) addtimerLocked(t *timer) bool { // when must never be negative; otherwise timerproc will overflow // during its delta calculation and never expire other runtime timers. if t.when < 0 { t.when = 1<<63 - 1 } t.i = len(tb.t) tb.t = append(tb.t, t) if !siftupTimer(tb.t, t.i) { return false } if t.i == 0 { // siftup moved to top: new earliest deadline. if tb.sleeping && tb.sleepUntil > t.when { tb.sleeping = false notewakeup(&tb.waitnote) } if tb.rescheduling { tb.rescheduling = false goready(tb.gp, 0) } if !tb.created { tb.created = true go timerproc(tb) } } return true }
首先是加入到 t 切片中,而後使用 siftupTimer()
來維護最小堆的性質。t.i == 0
說明當前 bucket 中沒有其餘 timer。
bucket 第一個添加 timer 時會啓動一個協程調用 timerproc
,代碼以下:
func timerproc(tb *timersBucket) { tb.gp = getg() for { lock(&tb.lock) tb.sleeping = false now := nanotime() delta := int64(-1) for { // 列表是空的,跳出循環 if len(tb.t) == 0 { delta = -1 break } // 堆上最小的 timer,最老的那個 t := tb.t[0] delta = t.when - now // 還沒到時間 if delta > 0 { break } ok := true // ticker,從新計算到期時間,不從堆上刪除 if t.period > 0 { // leave in heap but adjust next time to fire t.when += t.period * (1 + -delta/t.period) if !siftdownTimer(tb.t, 0) { ok = false } } else { // timer, remove from heap last := len(tb.t) - 1 if last > 0 { tb.t[0] = tb.t[last] tb.t[0].i = 0 } tb.t[last] = nil tb.t = tb.t[:last] if last > 0 { if !siftdownTimer(tb.t, 0) { ok = false } } t.i = -1 // mark as removed } f := t.f arg := t.arg seq := t.seq unlock(&tb.lock) if !ok { badTimer() } if raceenabled { raceacquire(unsafe.Pointer(t)) } f(arg, seq) lock(&tb.lock) } if delta < 0 || faketime > 0 { // No timers left - put goroutine to sleep. tb.rescheduling = true goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1) continue } // At least one timer pending. Sleep until then. tb.sleeping = true tb.sleepUntil = now + delta noteclear(&tb.waitnote) unlock(&tb.lock) notetsleepg(&tb.waitnote, delta) } }
若是當前 t 列表是空的,那麼 rescheduling = true
,而後將當前協程掛起。何時再喚醒呢? 在 addtimer()
中若是 rescheduling
爲 true,那麼就將協程喚醒繼續 for 循環。
若是堆上最小的元素(最早到期的)還沒到期,那麼 sleeping = true
,同時會 sleep 知道該元素到期。若是在 sleep 期間又添加了一個元素,而這個元素比堆上全部的 timer 都更快到期,在 addtimer()
中會經過 waitnote 來喚醒,繼續 for 循環來處理。
若是堆上最小的元素已經到期了,應該給這個到期的 timer.C 發送當前時間。若是 timer 是一個 Ticker,那麼會修改它的到期時間,不從堆上移走。若是 timer 是一個 Timer,是一次性的,那麼會從堆上刪除它。
如何計算 Ticker 的下次到期時間?
t.when += t.period * (1 + -delta/t.period)
這裏的 delta 是 t.when - now
的結果,表示距離過時時間已通過去了多久,計算新的過時時間時將這個值減去了。
處理 timer 就是調用 timer.f()
,對應的是 timer.sendTime()
:
func sendTime(c interface{}, seq uintptr) { // Non-blocking send of time on c. // Used in NewTimer, it cannot block anyway (buffer). // Used in NewTicker, dropping sends on the floor is // the desired behavior when the reader gets behind, // because the sends are periodic. select { case c.(chan Time) <- Now(): default: } }
Timer 和 Ticker 的 c 都是 make(chan Time, 1)
。對於 Timer 來講,由於有一個緩存,因此會執行到 case 分支。對於 Ticker 來講,由於會屢次調用這個方法,若是一直沒有從 Ticker.C 中拿取時間,那麼這裏會調用 default 分支,也就是後面的時間會被丟棄,以此來保證 timerproc
不會阻塞。
Timer 和 Ticker 都是經過 runtime/time.go 中的 stopTimer()
來中止的:
// stopTimer removes t from the timer heap if it is there. // It returns true if t was removed, false if t wasn't even there. //go:linkname stopTimer time.stopTimer func stopTimer(t *timer) bool { return deltimer(t) } // Delete timer t from the heap. // Do not need to update the timerproc: if it wakes up early, no big deal. func deltimer(t *timer) bool { if t.tb == nil { // t.tb can be nil if the user created a timer // directly, without invoking startTimer e.g // time.Ticker{C: c} // In this case, return early without any deletion. // See Issue 21874. return false } tb := t.tb lock(&tb.lock) removed, ok := tb.deltimerLocked(t) unlock(&tb.lock) if !ok { badTimer() } return removed } func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) { // t may not be registered anymore and may have // a bogus i (typically 0, if generated by Go). // Verify it before proceeding. i := t.i last := len(tb.t) - 1 if i < 0 || i > last || tb.t[i] != t { return false, true } if i != last { tb.t[i] = tb.t[last] tb.t[i].i = i } tb.t[last] = nil tb.t = tb.t[:last] ok = true if i != last { if !siftupTimer(tb.t, i) { ok = false } if !siftdownTimer(tb.t, i) { ok = false } } return true, ok }
timer.i
表示這個 timer 在堆上的索引。對於 Timer 來講,在到期後可能會從堆上刪掉了,這時 timerproc()
函數會將 timer.i 標記爲 -1。
刪除就是將 timer 和堆上最後一個元素交換,而後從 t 中刪除,最後從新維護下堆的性質。
若是不調用 Timer.Stop()/Ticker.Stop() 會發生什麼?
Timer 在到期後會被 timerproc()
函數刪除,但及時主動刪除能夠減輕 timersBucket 的壓力,尤爲是在定時器比較多的狀況下。
Ticker 若是不調用 Stop 會一直存在堆上。