轉載請聲明出處哦~,本篇文章發佈於luozhiyun的博客:https://www.luozhiyun.com/archives/444java
最近在工做中有一個需求,簡單來講就是在短期內會建立上百萬個定時任務,建立的時候會將對應的金額相加,防止超售,須要過半個小時再去核對數據,若是數據對不上就須要將加上的金額再減回去。git
這個需求若是用Go內置的Timer來作的話性能比較低下,由於Timer是使用最小堆來實現的,建立和刪除的時間複雜度都爲 O(log n)。若是使用時間輪的話則是O(1)性能會好不少。github
對於時間輪來講,我之前寫過一篇java版的時間輪算法分析:https://www.luozhiyun.com/archives/59,此次來看看Go語言的時間輪實現,順便你們有興趣的也能夠對比一下二者的區別,以及我寫文章的水平和一年多前有沒有提高,哈哈哈。算法
時間輪的運用實際上是很是的普遍的,在 Netty、Akka、Quartz、ZooKeeper、Kafka 等組件中都存在時間輪的蹤跡。下面用Go實現的時間輪是以Kafka的代碼爲原型來實現的,完整代碼:https://github.com/devYun/timingwheel。apache
在時間輪中存儲任務的是一個環形隊列,底層採用數組實現,數組中的每一個元素能夠存放一個定時任務列表。定時任務列表是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項,其中封裝了真正的定時任務。數組
時間輪由多個時間格組成,每一個時間格表明當前時間輪的基本時間跨度(tickMs)。時間輪的時間格個數是固定的,可用 wheelSize 來表示,那麼整個時間輪的整體時間跨度(interval)能夠經過公式 tickMs×wheelSize 計算得出。併發
時間輪還有一個錶盤指針(currentTime),用來表示時間輪當前所處的時間,currentTime 是 tickMs 的整數倍。currentTime指向的地方是表示到期的時間格,表示須要處理的時間格所對應的鏈表中的全部任務。app
以下圖是一個tickMs爲1s,wheelSize等於10的時間輪,每一格里面放的是一個定時任務鏈表,鏈表裏面存有真正的任務項:異步
初始狀況下表盤指針 currentTime 指向時間格0,若時間輪的 tickMs 爲 1ms 且 wheelSize 等於10,那麼interval則等於10s。以下圖此時有一個定時爲2s的任務插進來會存放到時間格爲2的任務鏈表中,用紅色標記。隨着時間的不斷推移,指針 currentTime 不斷向前推動,若是過了2s,那麼 currentTime 會指向時間格2的位置,會將此時間格的任務鏈表獲取出來處理。函數
若是當前的指針 currentTime 指向的是2,此時若是插入一個9s的任務進來,那麼新來的任務會服用原來的時間格鏈表,會存放到時間格1中
這裏所講的時間輪都是簡單時間輪,只有一層,整體時間範圍在 currentTime 和 currentTime+interval 之間。若是如今有一個15s的定時任務是須要從新開啓一個時間輪,設置一個時間跨度至少爲15s的時間輪纔夠用。可是這樣擴充是沒有底線的,若是須要一個1萬秒的時間輪,那麼就須要一個這麼大的數組去存放,不只佔用很大的內存空間,並且也會由於須要遍歷這麼大的數組從而拉低效率。
所以引入了層級時間輪的概念。
如圖是一個兩層的時間輪,第二層時間輪也是由10個時間格組成,每一個時間格的跨度是10s。第二層的時間輪的 tickMs 爲第一層時間輪的 interval,即10s。每一層時間輪的 wheelSize 是固定的,都是10,那麼第二層的時間輪的整體時間跨度 interval 爲100s。
圖中展現了每一個時間格對應的過時時間範圍, 咱們能夠清晰地看到, 第二層時間輪的第0個時間格的過時時間範圍是 [0,9]。也就是說, 第二層時間輪的一個時間格就能夠表示第一層時間輪的全部(10個)時間格;
若是向該時間輪中添加一個15s的任務,那麼當第一層時間輪容納不下時,進入第二層時間輪,並插入到過時時間爲[10,19]的時間格中。
隨着時間的流逝,當本來15s的任務還剩下5s的時候,這裏就有一個時間輪降級的操做,此時第一層時間輪的整體時間跨度已足夠,此任務被添加到第一層時間輪到期時間爲5的時間格中,以後再經歷5s後,此任務真正到期,最終執行相應的到期操做。
由於咱們這個Go語言版本的時間輪代碼是仿照Kafka寫的,因此在具體實現時間輪 TimingWheel 時還有一些小細節:
type TimingWheel struct { // 時間跨度,單位是毫秒 tick int64 // in milliseconds // 時間輪個數 wheelSize int64 // 總跨度 interval int64 // in milliseconds // 當前指針指向時間 currentTime int64 // in milliseconds // 時間格列表 buckets []*bucket // 延遲隊列 queue *delayqueue.DelayQueue // 上級的時間輪引用 overflowWheel unsafe.Pointer // type: *TimingWheel exitC chan struct{} waitGroup waitGroupWrapper }
tick、wheelSize、interval、currentTime都比較好理解,buckets字段表明的是時間格列表,queue是一個延遲隊列,全部的任務都是經過延遲隊列來進行觸發,overflowWheel是上層時間輪的引用。
type bucket struct { // 任務的過時時間 expiration int64 mu sync.Mutex // 相同過時時間的任務隊列 timers *list.List }
bucket裏面實際上封裝的是時間格里面的任務隊列,裏面放入的是相同過時時間的任務,到期後會將隊列timers拿出來進行處理。這裏有個有意思的地方是因爲會有多個線程併發的訪問bucket,因此須要用到原子類來獲取int64位的值,爲了保證32位系統上面讀取64位數據的一致性,須要進行64位對齊。具體的能夠看這篇:https://www.luozhiyun.com/archives/429,講的是對內存對齊的思考。
type Timer struct { // 到期時間 expiration int64 // in milliseconds // 要被執行的具體任務 task func() // Timer所在bucket的指針 b unsafe.Pointer // type: *bucket // bucket列表中對應的元素 element *list.Element }
Timer是時間輪的最小執行單元,是定時任務的封裝,到期後會調用task來執行任務。
例如如今初始化一個tick是1s,wheelSize是10的時間輪:
func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() } func NewTimingWheel(tick time.Duration, wheelSize int64) *TimingWheel { // 將傳入的tick轉化成毫秒 tickMs := int64(tick / time.Millisecond) // 若是小於零,那麼panic if tickMs <= 0 { panic(errors.New("tick must be greater than or equal to 1ms")) } // 設置開始時間 startMs := timeToMs(time.Now().UTC()) // 初始化TimingWheel return newTimingWheel( tickMs, wheelSize, startMs, delayqueue.New(int(wheelSize)), ) } func newTimingWheel(tickMs int64, wheelSize int64, startMs int64, queue *delayqueue.DelayQueue) *TimingWheel { // 初始化buckets的大小 buckets := make([]*bucket, wheelSize) for i := range buckets { buckets[i] = newBucket() } // 實例化TimingWheel return &TimingWheel{ tick: tickMs, wheelSize: wheelSize, // currentTime必須是tickMs的倍數,因此這裏使用truncate進行修剪 currentTime: truncate(startMs, tickMs), interval: tickMs * wheelSize, buckets: buckets, queue: queue, exitC: make(chan struct{}), } }
初始化十分簡單,你們能夠看看上面的代碼註釋便可。
下面咱們看看start方法:
func (tw *TimingWheel) Start() { // Poll會執行一個無限循環,將到期的元素放入到queue的C管道中 tw.waitGroup.Wrap(func() { tw.queue.Poll(tw.exitC, func() int64 { return timeToMs(time.Now().UTC()) }) }) // 開啓無限循環獲取queue中C的數據 tw.waitGroup.Wrap(func() { for { select { // 從隊列裏面出來的數據都是到期的bucket case elem := <-tw.queue.C: b := elem.(*bucket) // 時間輪會將當前時間 currentTime 往前移動到 bucket的到期時間 tw.advanceClock(b.Expiration()) // 取出bucket隊列的數據,並調用addOrRun方法執行 b.Flush(tw.addOrRun) case <-tw.exitC: return } } }) }
這裏使用了util封裝的一個Wrap方法,這個方法會起一個goroutines異步執行傳入的函數,具體的能夠到我上面給出的連接去看源碼。
Start方法會啓動兩個goroutines。第一個goroutines用來調用延遲隊列的queue的Poll方法,這個方法會一直循環獲取隊列裏面的數據,而後將到期的數據放入到queue的C管道中;第二個goroutines會無限循環獲取queue中C的數據,若是C中有數據表示已經到期,那麼會先調用advanceClock方法將當前時間 currentTime 往前移動到 bucket的到期時間,而後再調用Flush方法取出bucket中的隊列,並調用addOrRun方法執行。
func (tw *TimingWheel) advanceClock(expiration int64) { currentTime := atomic.LoadInt64(&tw.currentTime) // 過時時間大於等於(當前時間+tick) if expiration >= currentTime+tw.tick { // 將currentTime設置爲expiration,從而推動currentTime currentTime = truncate(expiration, tw.tick) atomic.StoreInt64(&tw.currentTime, currentTime) // Try to advance the clock of the overflow wheel if present // 若是有上層時間輪,那麼遞歸調用上層時間輪的引用 overflowWheel := atomic.LoadPointer(&tw.overflowWheel) if overflowWheel != nil { (*TimingWheel)(overflowWheel).advanceClock(currentTime) } } }
advanceClock方法會根據到期時間來重新設置currentTime,從而推動時間輪前進。
func (b *bucket) Flush(reinsert func(*Timer)) { var ts []*Timer b.mu.Lock() // 循環獲取bucket隊列節點 for e := b.timers.Front(); e != nil; { next := e.Next() t := e.Value.(*Timer) // 將頭節點移除bucket隊列 b.remove(t) ts = append(ts, t) e = next } b.mu.Unlock() b.SetExpiration(-1) // TODO: Improve the coordination with b.Add() for _, t := range ts { reinsert(t) } }
Flush方法會根據bucket裏面timers列表進行遍歷插入到ts數組中,而後調用reinsert方法,這裏是調用的addOrRun方法。
func (tw *TimingWheel) addOrRun(t *Timer) { // 若是已通過期,那麼直接執行 if !tw.add(t) { // 異步執行定時任務 go t.task() } }
addOrRun會調用add方法檢查傳入的定時任務Timer是否已經到期,若是到期那麼異步調用task方法直接執行。add方法咱們下面會接着分析。
整個start執行流程如圖:
func main() { tw := timingwheel.NewTimingWheel(time.Second, 10) tw.Start() // 添加任務 tw.AfterFunc(time.Second*15, func() { fmt.Println("The timer fires") exitC <- time.Now().UTC() }) }
咱們經過AfterFunc方法添加一個15s的定時任務,若是到期了,那麼執行傳入的函數。
func (tw *TimingWheel) AfterFunc(d time.Duration, f func()) *Timer { t := &Timer{ expiration: timeToMs(time.Now().UTC().Add(d)), task: f, } tw.addOrRun(t) return t }
AfterFunc方法回根據傳入的任務到期時間,以及到期須要執行的函數封裝成Timer,調用addOrRun方法。addOrRun方法咱們上面已經看過了,會根據到期時間來決定是否須要執行定時任務。
下面咱們來看一下add方法:
func (tw *TimingWheel) add(t *Timer) bool { currentTime := atomic.LoadInt64(&tw.currentTime) // 已通過期 if t.expiration < currentTime+tw.tick { // Already expired return false // 到期時間在第一層環內 } else if t.expiration < currentTime+tw.interval { // Put it into its own bucket // 獲取時間輪的位置 virtualID := t.expiration / tw.tick b := tw.buckets[virtualID%tw.wheelSize] // 將任務放入到bucket隊列中 b.Add(t) // 若是是相同的時間,那麼返回false,防止被屢次插入到隊列中 if b.SetExpiration(virtualID * tw.tick) { // 將該bucket加入到延遲隊列中 tw.queue.Offer(b, b.Expiration()) } return true } else { // Out of the interval. Put it into the overflow wheel // 若是放入的到期時間超過第一層時間輪,那麼放到上一層中去 overflowWheel := atomic.LoadPointer(&tw.overflowWheel) if overflowWheel == nil { atomic.CompareAndSwapPointer( &tw.overflowWheel, nil, // 須要注意的是,這裏tick變成了interval unsafe.Pointer(newTimingWheel( tw.interval, tw.wheelSize, currentTime, tw.queue, )), ) overflowWheel = atomic.LoadPointer(&tw.overflowWheel) } // 往上遞歸 return (*TimingWheel)(overflowWheel).add(t) } }
add方法根據到期時間來分紅了三部分,第一部分是小於當前時間+tick,表示已經到期,那麼返回false執行任務便可;
第二部分的判斷會根據expiration是否小於時間輪的跨度,若是小於的話表示該定時任務能夠放入到當前時間輪中,經過取模找到buckets對應的時間格並放入到bucket隊列中,SetExpiration方法會根據傳入的參數來判斷是否已經執行過延遲隊列的Offer方法,防止重複插入;
第三部分表示該定時任務的時間跨度超過了當前時間輪,須要升級到上一層的時間輪中。須要注意的是,上一層的時間輪的tick是當前時間輪的interval,延遲隊列仍是同一個,而後設置爲指針overflowWheel,並調用add方法往上層遞歸。
到這裏時間輪已經講完了,不過還有須要注意的地方,咱們在用上面的時間輪實現中,使用了DelayQueue加環形隊列的方式實現了時間輪。對定時任務項的插入和刪除操做而言,TimingWheel時間複雜度爲 O(1),在DelayQueue中的隊列使用的是優先隊列,時間複雜度是O(log n),可是因爲buckets列表其實是很是小的,因此並不會影響性能。
https://github.com/RussellLuo/timingwheel