【源碼學習】time.Timer 和 time.Ticker

time

Timer and Ticker

  • go version: v1.12.1

定時器的實現通常有如下幾種:linux

  • 最小堆,go 使用的這種。
  • 紅黑樹,nginx 使用的這種。
  • 鏈表,redis 使用的這種。
  • 時間輪,linux 使用的這種。

建立定時器

建立 Timer 的代碼:nginx

func NewTimer(d Duration) *Timer {
	c := make(chan Time, 1)
	t := &Timer{
		C: c,
		r: runtimeTimer{
			when: when(d),
			f:    sendTime,
			arg:  c,
		},
	}
	startTimer(&t.r)
	return t
}

建立 Ticker 的代碼:redis

func NewTicker(d Duration) *Ticker {
	if d <= 0 {
		panic(errors.New("non-positive interval for NewTicker"))
	}
	// Give the channel a 1-element time buffer.
	// If the client falls behind while reading, we drop ticks
	// on the floor until the client catches up.
	c := make(chan Time, 1)
	t := &Ticker{
		C: c,
		r: runtimeTimer{
			when:   when(d),
			period: int64(d),
			f:      sendTime,
			arg:    c,
		},
	}
	startTimer(&t.r)
	return t
}

Timer 和 Ticker 都是調用的 startTimer(*runtimeTimer),區別是 Ticker 比 Timer 多了一個 period 字段。緩存

其中 startTimer() 的聲明以下:app

func startTimer(*runtimeTimer)

沒有實現,對應的是 runtime/time.go 中的以下函數:函數

// startTimer adds t to the timer heap.
//go:linkname startTimer time.startTimer
func startTimer(t *timer) {
	if raceenabled {
		racerelease(unsafe.Pointer(t))
	}
	addtimer(t)
}

這裏的參數是 runtime.timer,而傳進來時是 time.runtimeTimer,這兩個結構體字段是一一對應的:性能

// Interface to timers implemented in package runtime.
// Must be in sync with ../runtime/time.go:/^type timer
type runtimeTimer struct {
	tb uintptr
	i  int

	when   int64
	period int64
	f      func(interface{}, uintptr) // NOTE: must not be closure
	arg    interface{}
	seq    uintptr
}

type timer struct {
	tb *timersBucket // the bucket the timer lives in
	i  int           // heap index

	// Timer wakes up at when, and then at when+period, ... (period > 0 only)
	// each time calling f(arg, now) in the timer goroutine, so f must be
	// a well-behaved function and not block.
	when   int64
	period int64
	f      func(interface{}, uintptr)
	arg    interface{}
	seq    uintptr
}

startTimer() 中調用了 addtimer():ui

func addtimer(t *timer) {
	tb := t.assignBucket()
	lock(&tb.lock)
	ok := tb.addtimerLocked(t)
	unlock(&tb.lock)
	if !ok {
		badTimer()
	}
}

在 runtime/time.go 中有一個全局變量 timers:this

var timers [timersLen]struct {
	timersBucket

	// The padding should eliminate false sharing
	// between timersBucket values.
	pad [cpu.CacheLinePadSize - unsafe.Sizeof(timersBucket{})%cpu.CacheLinePadSize]byte
}

它的結構大概是這樣子的:code

timers 包含固定的 64 個 timersBucket,而每一個 timersBucket 中包含多個 *timer(字段 t)。timersBucket 中的多個 timer 使用最小堆來組織的。

爲何是 64 個?

個數最好應該是 GOMAXPROCS 個,可是這樣的話就須要動態分配了,64 是根據內存使用和性能之間平衡得出的。

addtimer() 首先肯定一個 timersBucket,而後將 timer 放入這個 bucket 中。

怎麼肯定 bucket 的?

func (t *timer) assignBucket() *timersBucket {
	id := uint8(getg().m.p.ptr().id) % timersLen
	t.tb = &timers[id].timersBucket
	return t.tb
}

根據當前 G 所在的 P 的 id。

而後是放入 bucket 中的邏輯:

func (tb *timersBucket) addtimerLocked(t *timer) bool {
	// when must never be negative; otherwise timerproc will overflow
	// during its delta calculation and never expire other runtime timers.
	if t.when < 0 {
		t.when = 1<<63 - 1
	}
	t.i = len(tb.t)
	tb.t = append(tb.t, t)
	if !siftupTimer(tb.t, t.i) {
		return false
	}
	if t.i == 0 {
		// siftup moved to top: new earliest deadline.
		if tb.sleeping && tb.sleepUntil > t.when {
			tb.sleeping = false
			notewakeup(&tb.waitnote)
		}
		if tb.rescheduling {
			tb.rescheduling = false
			goready(tb.gp, 0)
		}
		if !tb.created {
			tb.created = true
			go timerproc(tb)
		}
	}
	return true
}

首先是加入到 t 切片中,而後使用 siftupTimer() 來維護最小堆的性質。t.i == 0 說明當前 bucket 中沒有其餘 timer。

bucket 第一個添加 timer 時會啓動一個協程調用 timerproc,代碼以下:

func timerproc(tb *timersBucket) {
	tb.gp = getg()
	for {
		lock(&tb.lock)
		tb.sleeping = false
		now := nanotime()
		delta := int64(-1)
		for {
            // 列表是空的,跳出循環
			if len(tb.t) == 0 {
				delta = -1
				break
            }
            // 堆上最小的 timer,最老的那個
			t := tb.t[0]
            delta = t.when - now
            // 還沒到時間
			if delta > 0 {
				break
			}
            ok := true
            // ticker,從新計算到期時間,不從堆上刪除
			if t.period > 0 {
				// leave in heap but adjust next time to fire
				t.when += t.period * (1 + -delta/t.period)
				if !siftdownTimer(tb.t, 0) {
					ok = false
				}
			} else {
				// timer, remove from heap
				last := len(tb.t) - 1
				if last > 0 {
					tb.t[0] = tb.t[last]
					tb.t[0].i = 0
				}
				tb.t[last] = nil
				tb.t = tb.t[:last]
				if last > 0 {
					if !siftdownTimer(tb.t, 0) {
						ok = false
					}
				}
				t.i = -1 // mark as removed
			}
			f := t.f
			arg := t.arg
			seq := t.seq
			unlock(&tb.lock)
			if !ok {
				badTimer()
			}
			if raceenabled {
				raceacquire(unsafe.Pointer(t))
			}
			f(arg, seq)
			lock(&tb.lock)
		}
		if delta < 0 || faketime > 0 {
			// No timers left - put goroutine to sleep.
			tb.rescheduling = true
			goparkunlock(&tb.lock, waitReasonTimerGoroutineIdle, traceEvGoBlock, 1)
			continue
		}
		// At least one timer pending. Sleep until then.
		tb.sleeping = true
		tb.sleepUntil = now + delta
		noteclear(&tb.waitnote)
		unlock(&tb.lock)
		notetsleepg(&tb.waitnote, delta)
	}
}

若是當前 t 列表是空的,那麼 rescheduling = true,而後將當前協程掛起。何時再喚醒呢?addtimer() 中若是 rescheduling 爲 true,那麼就將協程喚醒繼續 for 循環。

若是堆上最小的元素(最早到期的)還沒到期,那麼 sleeping = true,同時會 sleep 知道該元素到期。若是在 sleep 期間又添加了一個元素,而這個元素比堆上全部的 timer 都更快到期,在 addtimer() 中會經過 waitnote 來喚醒,繼續 for 循環來處理。

若是堆上最小的元素已經到期了,應該給這個到期的 timer.C 發送當前時間。若是 timer 是一個 Ticker,那麼會修改它的到期時間,不從堆上移走。若是 timer 是一個 Timer,是一次性的,那麼會從堆上刪除它。

如何計算 Ticker 的下次到期時間?

t.when += t.period * (1 + -delta/t.period)

這裏的 delta 是 t.when - now 的結果,表示距離過時時間已通過去了多久,計算新的過時時間時將這個值減去了。

處理 timer 就是調用 timer.f(),對應的是 timer.sendTime():

func sendTime(c interface{}, seq uintptr) {
	// Non-blocking send of time on c.
	// Used in NewTimer, it cannot block anyway (buffer).
	// Used in NewTicker, dropping sends on the floor is
	// the desired behavior when the reader gets behind,
	// because the sends are periodic.
	select {
	case c.(chan Time) <- Now():
	default:
	}
}

Timer 和 Ticker 的 c 都是 make(chan Time, 1)。對於 Timer 來講,由於有一個緩存,因此會執行到 case 分支。對於 Ticker 來講,由於會屢次調用這個方法,若是一直沒有從 Ticker.C 中拿取時間,那麼這裏會調用 default 分支,也就是後面的時間會被丟棄,以此來保證 timerproc 不會阻塞。

中止定時器

Timer 和 Ticker 都是經過 runtime/time.go 中的 stopTimer() 來中止的:

// stopTimer removes t from the timer heap if it is there.
// It returns true if t was removed, false if t wasn't even there.
//go:linkname stopTimer time.stopTimer
func stopTimer(t *timer) bool {
	return deltimer(t)
}

// Delete timer t from the heap.
// Do not need to update the timerproc: if it wakes up early, no big deal.
func deltimer(t *timer) bool {
	if t.tb == nil {
		// t.tb can be nil if the user created a timer
		// directly, without invoking startTimer e.g
		//    time.Ticker{C: c}
		// In this case, return early without any deletion.
		// See Issue 21874.
		return false
	}

	tb := t.tb

	lock(&tb.lock)
	removed, ok := tb.deltimerLocked(t)
	unlock(&tb.lock)
	if !ok {
		badTimer()
	}
	return removed
}

func (tb *timersBucket) deltimerLocked(t *timer) (removed, ok bool) {
	// t may not be registered anymore and may have
	// a bogus i (typically 0, if generated by Go).
	// Verify it before proceeding.
	i := t.i
	last := len(tb.t) - 1
	if i < 0 || i > last || tb.t[i] != t {
		return false, true
	}
	if i != last {
		tb.t[i] = tb.t[last]
		tb.t[i].i = i
	}
	tb.t[last] = nil
	tb.t = tb.t[:last]
	ok = true
	if i != last {
		if !siftupTimer(tb.t, i) {
			ok = false
		}
		if !siftdownTimer(tb.t, i) {
			ok = false
		}
	}
	return true, ok
}

timer.i 表示這個 timer 在堆上的索引。對於 Timer 來講,在到期後可能會從堆上刪掉了,這時 timerproc() 函數會將 timer.i 標記爲 -1。

刪除就是將 timer 和堆上最後一個元素交換,而後從 t 中刪除,最後從新維護下堆的性質。

若是不調用 Timer.Stop()/Ticker.Stop() 會發生什麼?

Timer 在到期後會被 timerproc() 函數刪除,但及時主動刪除能夠減輕 timersBucket 的壓力,尤爲是在定時器比較多的狀況下。

Ticker 若是不調用 Stop 會一直存在堆上。

相關文章
相關標籤/搜索