本文首發地址:http://zqhxuyuan.github.io/2016/05/13/2016-05-13-Kafka-Book-Sample-TimingWheel/java
那麼Kafka的Timer定時器是如何存儲DelayedOperation,又是如何在有任務超時的時候能準確地輪詢出來。在Java中有多種方案能夠作到任務的延遲執行,好比java.util.Timer和TimerTask的調度,或者DelayedQueue和實現Delayed接口的線程。但這些對於Kafka這種動輒成千上萬個請求的分佈式系統而言都過於重量級,因此Kafka的Timer專門設計了TimingWheel這個數據結構來存儲大量的處理請求,不過它的底層仍是基於DelayedQueue實現的。 git
被放到延遲隊列的每一個元素必須實現Delayed接口,原本能夠直接將DelayedOperation放入隊列中(任務失效的時候是一個一個彈出),不過由於DelayedOperation數量級太大了,能夠將多個DelayedOperation組成一個TimerTaskList鏈表(在同一個列表中的全部任務的失效時間都很相近,但不必定都相等),以TimerTaskList做爲隊列的元素,因此失效時間會被設置到TimerTaskList上,當失效的時候,整個列表中的全部任務都會一塊兒失效。github
Purgatory將任務添加到Timer定時器,而且會在Reaper線程中調用advanceClock
不斷地移動內部的時鐘,使得超時的任務能夠被取出來執行。任務加入到TimingWheel中須要首先被包裝成TimerTaskEntry,而後TimingWheel會根據TimerTaskEntry的失效時間加入到某個TimerTaskList中(TimingWheel的某個bucket)。當TimerTaskList由於超時被輪詢出來並不必定表明裏面全部的TimerTaskEntry必定就超時,因此對於沒有超時的TimerTaskEntry須要從新加入到TimingWheel新的TimerTaskList中,對於超時的TimerTaskEntry則當即執行任務。不過timingWheel.add
添加任務時並不須要先判斷有沒有超時而後再作決定,而是無論三七二十一,先嚐試加入TimerTaskEntry,若是添加成功,那很好;若是沒有添加成功,說明這個任務要麼已經被取消了,要麼超時了。 apache
添加不成功有兩種狀況,1)被其餘線程完成後任務會被取消,這樣保證了只有最早完成的那個線程只會調用一次完成的方法,其餘線程就再也不須要執行這個任務了。2)任務超時了,但尚未被其餘線程完成即尚未被取消,當前線程就應該當即執行任務。數據結構
class Timer(taskExecutor:ExecutorService, tickMs:Long=1,wheelSize:Int=20, startMs: Long = System.currentTimeMillis) { val delayQueue = new DelayQueue[TimerTaskList]() //延遲隊列,按照失效時間排序 val taskCounter = new AtomicInteger(0) //內存級別的原子共享變量,全部時間輪同一個計數器 val timingWheel=new TimingWheel(tickMs,wheelSize,startMs,taskCounter,delayQueue) def add(timerTask: TimerTask) = { //1.DelayedOperation是一個TimerTask addTimerTaskEntry(new TimerTaskEntry(timerTask)) //2.被包裝成定時任務條目 } val reinsert=(entry:TimerTaskEntry) => addTimerTaskEntry(entry)//高階函數 //add和reinsert都會將TimerTaskEntry加入到時間輪,後者使用已有的TimerTaskEntry def addTimerTaskEntry(timerTaskEntry: TimerTaskEntry) { val addSuccess = timingWheel.add(timerTaskEntry) //3.添加到時間輪中 if (!addSuccess) { //添加不成功,要麼被取消,要麼超時了 if (!timerTaskEntry.cancelled) //尚未被取消,那就是超時了 taskExecutor.submit(timerTaskEntry.timerTask) //執行條目裏的定時任務 } } def advanceClock(timeoutMs: Long): Boolean = { //timeout是輪詢的最長等待時間 var bucket=delayQueue.poll(timeoutMs, TimeUnit.MILLISECONDS)//沒有到超時不會被輪詢出 if (bucket != null) { //從延遲隊列輪詢出存儲的TimerTaskList while (bucket != null) { //一次可能會輪詢出多個元素,當並不必定是延遲隊列全部元素 timingWheel.advanceClock(bucket.getExpiration()) bucket.flush(reinsert) //從新插入,函數的entry參數只有真正調用flush方法才能知道 bucket=delayQueue.poll() //當即再輪詢一次(不等待),直到poll出來沒有東西了才中止 } } } }
Timer使用TimingWheel時間輪來管理延遲的等待超時的請求,TimingWheel時間輪是一個存儲定時任務的數據結構:tickMs表示指針每隔1ms tick一次,wheelSize=20表示走完一圈要tick 20次,因此走完一圈總共要花費20*1ms=20ms,若是tickMs=1000ms,wheelSize=60,就和時鐘裏秒針的滴答徹底同樣了。taskCounter表示請求數量,若是請求完成則計數器的值會減小,delayQueue是延遲隊列用來存儲定時任務。能夠把Timer看作是定時器線程即模擬秒針每秒鐘走一次這個動做,而TimingWheel則負責在秒針tick一次以後將超時的任務完成掉。圖3-73舉例現實世界的時鐘和鬧鐘(指定時間點)/計時器(多長時間後),假設設置了一個計時器任務要在30秒後離開電腦休息一下,當啓動計時器時,時間一秒鐘一秒鐘地流逝,要執行的任務也漸漸臨近,剩餘的時間愈來愈少,當計時器中止時,時間已經走了30秒了,以前設置的任務就應該被執行。TimingWheel的工做原理和計時器是相似的,它容許在不一樣時刻加入不一樣計時器,並且對同一個時間點,也容許多個計時器同時觸發執行,好比有多個任務都要在30秒後同時執行。 分佈式
圖3-73 現實世界的時鐘/計時器示例函數
private[timer] class TimingWheel(tickMs:Long,wheelSize:Int,startMs:Long, taskCounter: AtomicInteger, queue: DelayQueue[TimerTaskList]) { val interval = tickMs * wheelSize val buckets = Array.tabulate[TimerTaskList](wheelSize) { _ => new TimerTaskList(taskCounter) } //每一個List共享taskCount計數器 var currentTime = startMs - (startMs % tickMs) @volatile var overflowWheel: TimingWheel = null def addOverflowWheel(): Unit = { if (overflowWheel == null) { //建立父級別的時間輪 overflowWheel = new TimingWheel( tickMs = interval, //低級別的整個範圍做爲父級別一個tick wheelSize = wheelSize, //bucket的數量不變 startMs = currentTime, //當前時間經過advanceClock會更新 taskCounter = taskCounter, queue //全局惟一的計數器和延遲隊列 ) } } def add(timerTaskEntry: TimerTaskEntry): Boolean = { val expiration = timerTaskEntry.expirationMs if (timerTaskEntry.cancelled) { //被其餘線程取消了(執行任務時會取消) false } else if (expiration < currentTime + tickMs) { //已經超時了 false } else if (expiration < currentTime + interval) { val virtualId = expiration / tickMs val bucket = buckets((virtualId % wheelSize.toLong).toInt) bucket.add(timerTaskEntry) //把任務根據失效時間點放到對應的bucket中 //設置bucket的失效時間點,而後把bucket加入隊列中 if (bucket.setExpiration(virtualId * tickMs)) queue.offer(bucket) true } else { if (overflowWheel == null) addOverflowWheel() overflowWheel.add(timerTaskEntry) } } def advanceClock(timeMs: Long): Unit = { if (timeMs >= currentTime + tickMs) { currentTime = timeMs - (timeMs % tickMs) if (overflowWheel != null) overflowWheel.advanceClock(currentTime) } } }
舉例外部的Purgatory添加任務到定時器中,而後經過Reaper線程的advanceClock移動時鐘的調用順序,假設當前時間currentTime是0s,時間輪的tickMs=1000ms=1s,時間輪大小=8,整個時間輪的範圍interval=1s*8=8s
,Reaper線程循環時輪詢一次隊列最長timeoutMs=200ms,添加的四個任務的超時時間分別是[A=0s,B=1s,C=1s,D=3s]。添加任務時,任務A=0s<currentTime+tickMs=0s+1s=1s
,因此任務A添加失敗,執行executor.submit
,表示任務A已經超時了;任務B/C/D知足1s=currentTime+tickMs<=expiration<currentTime+interval=8s
,因此能夠成功加入隊列,不會執行executor.submit
,表示它們都尚未到執行的時間點。B/C會加入bucket=1%8=1,D加入的bucket=3%8=3。而後Reaper線程開始調用advanceClock,當前時間=0s,在timeout=200ms內並不會從隊列中輪詢出來任何元素,由於即便200ms過去了,當前時間是200ms,而隊列中最早能夠被彈出來的都是1s,因此Reaper線程繼續調用了屢次輪詢方法:200ms,400ms,600ms,800ms。 性能
在800ms時間點調用輪詢時,timeoutMs=200ms,這時候能夠終於把bucket1輪詢出來(由於800ms時間點+200ms時間間隔=1000ms=1s,而恰好隊列中存在延遲時間=1s的bucket),這個bucket1中有兩個任務B/C,它們的失效時間都是1s,並且bucket級別的expiration=1s(加入任務時同時肯定bucket的失效時間),首先經過advanceClock更新currentTime=1s。spa
bucket.flush會嘗試將任務B/C從新加入隊列中,因爲此時currentTime已經被更新爲1s,B/C的超時時間=1s<currentTime+tickMs=1s+1s=2s
,因此此時添加到隊列會失敗,因而和最開始的A相似執行executor.submit
,也表示任務B/C在當前時間點=800ms時刻的輪詢過程當中超時了。因爲Reaper線程還有其餘工做因此即便每次輪詢的timeout=200ms,也不必定說每次發生輪詢的時間點就是[200m,400ms,600ms]這麼恰好,假設輪詢的時間點是900ms,那麼不用等timeout=200ms只過了100ms就能夠把失效時間=1s的bucket輪詢出來。圖3-74示例了Timer調用TimingWheel添加任務和時鐘移動的調用過程,這裏爲了簡單起見,尚未考慮overflowWheel
的場景。 線程
圖3-74 Timer和TimingWheel的調用示例(沒有二級時間輪)
失效時間和bucket選擇
任務的失效時間是一個肯定的時間點,因此無論當前時間是什麼,即便失效時間相同的兩個任務在不一樣時間點加入隊列,它們也會被放入同一個bucket中。固然根據任務的失效時間選擇不一樣的bucket還跟tickMs以及時間輪的大小有關,時間輪的expiration範圍expiration=[currentTime+tickMs,currentTime+interval)
。圖3-75中示例了在相同時間輪大小下三種不一樣的tickMs,當tickMs=1時,每一個bucket中任務的失效時間只有一個值,當tickMs=2時有兩種可能,當tickMs=8時就有8種可能,好比失效時間在120-127範圍內的任務都會被分配到bucket7中。即便是相同失效時間若是tickMs不一樣也會被放入不一樣的bucket,好比任務失效時間=103在tickMs=1時分配到bucket7,在tickMs=2時分配到bucket3。
圖3-75 相同時間輪,不一樣tickMs
由tickMs和時間輪的大小決定了這個時間輪全部任務失效時間的一個範圍,若是超過這個範圍,則不容許加入,圖3-76中當前時間等於100時,時間輪的範圍=[101..107],當前時間等於101時,範圍=[102..108]以此類推。當前時間所在的bucket其實是沒有任務的,由於任務的失效時間若是和當前時間相等說明任務已經失效了,不該該放入隊列中。
圖3-76 時鐘tick影響了時間輪的取值範圍
任務的失效時間和當前時間相等指的是徹底相等,好比tickMs=1s,時間輪大小=60,當前時間等於12:00:00,某個任務的超時時間是12:01:00。時鐘tick時每隔一秒走動一次:[12:00:01,..12:00:59,12:01:00],在12:01:00這一刻任務就超時了,不是12:01:00到下一次tick=12:01:01的一半12:01:00.500,也不是過了12:01:00後的下一次tick=12:01:01才超時,當剛恰好進入12:01:00.000時任務就超時了!好比你定了一個12:01:00的任務運行,你固然但願在那個時間點分絕不差地精準地執行任務,多一秒少一秒都不行!
層級時間輪
只有一個時間輪雖然在時間移動時能夠重用舊的bucket來保存失效時間更日後的任務,可是因爲時間輪所容許的範圍就那麼大,超過這個範圍的失效時間就沒法很好地存儲了。仍是以tickMs=1s,時間輪大小=60爲例,若是當前時間是12:00:00,你沒法設置12:01:01的任務,更談不上12:02:00以及失效時間更加日後的任務了。在《嵌入式系統的實時概念》第十一章提到使用一個外部的event flow buffer
來暫時存儲超過interval的事件,不過更好的方式是使用層級的時間輪。層級時間輪中假設時間輪大小都不變,可是tickMs則是不斷遞增,假設Level0的tickMs=1s,則Level1的tickMs=1s*60
=60s,Level2的tickMs=60s*60
=3600s以此類推。每一層的tickMs表示的是在當前時間輪中移動一格的粒度/單位,Level0=1s,Level1=60s,Level2=3600s。能夠用鐘錶的秒鐘,分針,時針的移動來理解這三個時間輪:秒針走動一格須要花費一秒,分針走動一格花費60秒,時針走動一格花費3600秒。並且更高層級的tickMs等於低一層的整個時間輪範圍,好比Level0的interval=1s*60
=60s,恰好做爲Level1的tickMs,Level1的interval=60s*60
=3600s,也做爲Level2的tickMs。也就是說Level1的一格等於Level0走完一圈,Level2的一格等於Level1走完一圈。
那麼爲何tickMs的單位不一樣,假設有幾個任務的失效時間分別是[20s,60s,70s,120s,3600s],若是全部時間輪的tickMs都是1s,總共須要3600/60=60個時間輪!既然每一個時間輪的tickMs都相等,跟直接用一個大小等於3600的時間輪是沒有任何區別的。而若是使用層級時間輪,總共只須要3個時間輪,20s在Level0的第20個單元格,60s和70s在Level1的第一個單元格內,120s在Level1的第二個單元格上,3600在Level2的第一個單元格上(思考下爲何要把60s這種恰好等於當前時間輪範圍的任務放在下一個時間輪,而不是當前時間輪上)。
confluent有篇博客詳細介紹了時間輪的改進和性能對比,圖3-77中當前指針指向任務①所在的bucket,則任務①已經超時了,當發生一次tick以後,任務①已經完全從隊列中移除了,tick一次以後當前指針指向了任務②所在的bucket,由於任務②也已經超時了,也就是說tick指針指向哪裏,那裏就已經超時了,在當前指針所指向的bucket裏的任務都應該被取出來執行。
圖3-77 時間輪tick到當前bucket,這個bucket的任務都超時
摘自:http://www.confluent.io/blog/apache-kafka-purgatory-hierarchical-timing-wheels
圖3-78有兩個時間輪分別是Level0和Level1,在Time0時加入了⑦⑧⑨三個任務,任務⑦在Level0的7-8之間,任務⑧⑨在Level1的8-16之間。這裏你可能會認爲⑧和⑨應該緊接着Level0的7-8的下一個應該放在Level1的0-8,這樣才叫作無縫銜接嘛。不過若是把⑧⑨放在Level1的0-8之間,1)
自己就不符合取值範圍,由於⑧⑨在0-8以外,而放在8-16之間正好知足⑧⑨的取值範圍。2)
Level1當前指向了0-8表示這個區間的全部任務都已經超時,若是⑧⑨放在這裏,那麼它們就都會超時,而此時連任務⑦都還沒超時,⑧⑨怎麼可能超時呢。3)
Level1的0-8這一格子對應了Level0的全部格子,因此Level1指向0-8表示任務在0-8之間的正在超時,不過具體0-8之間的任務則仍是以Level0爲準,這就比如在Time8時,Level1指向了8-16,表示8-16之間的任務正在超時,可是具體8-16之間的任務也是以Level0爲準。
在Time0以後發生一次Tick後,Level0的指針指向1-2,而Level2的指針沒有變化,而Level0的0消失,1添加了9。圖3-78中當前時間=Time7時,Level0的指針指向了7-8之間,Level2的指針仍是沒有變化,任務⑦超時。再次發生tick以後,指針移動到8-9(這裏已經不是0-1了),這時候Level1的指針終於移動了一格從原先的0-8移動到8-16(想象下秒針走了一圈60s,分針才終於挪動了一格)。而Level1原先在8-16之間存在任務⑧⑨,那麼是否是說這兩個任務同時失效了呢?實際上外界真實時鐘走動的粒度只和第一個時間輪Level0的tickMs相等,Level1走動一格只表示當前這一格的全部任務在Level0走完一圈後都會失效,就比如Level1指向0-8時表示Level0中任務時間在0-8之間只有Level0走完一圈纔會所有失效。所以須要把Level1的任務⑧⑨從Level1中解除出來,放到更細粒度的Level0中才能真正決定任務何時真正失效。因此在Timer8時,Level1的任務⑧⑨被一一放回Level0的各個bucket中,原先在Level1中擠在同一個單元格里的多個任務被分散在Level0的各個單元格中,這樣原先在Level1的各個任務如今就會參照Level0中的tickMs(也就是真實的tickMs)。
圖3-78 層級時間輪的收斂和發散
能夠這麼理解,在Time0到Time7之間,任務⑧⑨在Level1中蓄勢待發,可是由於Level0尚未走完一圈,Level1的指針不會移動,只有Level0走完一圈後,Level1纔會移動一次,並把Level1一格的任務按照Level0的tickMs粒度從新劃分。Level0表明的永遠是真實的時鐘移動,超時的任務必定是在Level0中被選中的,在其餘Level中的任務在接近超時的時候只會源源不斷地進入到Level0中。能夠認爲除了Level0,其餘Level都是虛擬出來的時間輪,這些更高級的時間輪由於tickMs粒度比較大,能夠存儲數據量更大的任務,可是不具有執行超時任務的能力,當高級別的時間輪發生一次tick後,須要把tick指向的全部任務移動到低級別的時間輪中,從而有機會被放到Level0中真正地執行。