面試官:知道時間輪算法嗎?在Netty和Kafka中如何應用的?爲何不用Timer、延時線程池?

你們好,我是yes。html

最近看 Kafka 看到了時間輪算法,記得之前看 Netty 也看到過這玩意,沒太過關注。今天就來看看時間輪究竟是什麼東西。面試

爲何要用時間輪算法來實現延遲操做?算法

延時操做 Java 不是提供了 Timer 麼?數組

還有 DelayQueue 配合線程池或者 ScheduledThreadPool 不香嗎?緩存

咱們先來簡單看看 Timer、DelayQueue 和 ScheduledThreadPool 的相關實現,看看它們是如何實現延時任務的,源碼之下無祕密。再來剖析下爲什麼 Netty 和 Kafka 特地實現了時間輪來處理延遲任務。markdown

若是在手機上閱讀其實純看字也行,不用看代碼,我都會先用文字描述清楚。不過電腦上看效果更佳。多線程

Timer

Timer 能夠實現延時任務,也能夠實現週期性任務。咱們先來看看 Timer 核心屬性和構造器。併發

核心就是一個優先隊列和封裝的執行任務的線程,從這咱們也能夠看到一個 Timer 只有一個線程執行任務。異步

再來看看如何實現延時和週期性任務的。我先簡單的歸納一下,首先維持一個小頂堆,即最快須要執行的任務排在優先隊列的第一個,根據堆的特性咱們知道插入和刪除的時間複雜度都是 O(logn)。分佈式

而後 TimerThread 不斷地拿排着的第一個任務的執行時間和當前時間作對比。若是時間到了先看看這個任務是否是週期性執行的任務,若是是則修改當前任務時間爲下次執行的時間,若是不是週期性任務則將任務從優先隊列中移除。最後執行任務。若是時間還未到則調用 wait() 等待。

再看下圖,整理下流程。

流程知道了再對着看下代碼,這塊就差很少了。看代碼不爽的能夠跳過代碼部分,影響不大。

先來看下 TaskQueue,就簡單看下插入任務的過程,就是個普通的堆插入操做。

再來看看 TimerThread 的 run 操做。

小結一下

能夠看出 Timer 實際就是根據任務的執行時間維護了一個優先隊列,而且起了一個線程不斷地拉取任務執行。

有什麼弊端呢?

首先優先隊列的插入和刪除的時間複雜度是O(logn),當數據量大的時候,頻繁的入堆出堆性能有待考慮。

而且是單線程執行,那麼若是一個任務執行的時間太久則會影響下一個任務的執行時間(固然你任務的run要是異步執行也行)。

而且從代碼能夠看到對異常沒有作什麼處理,那麼一個任務出錯的時候會致使以後的任務都沒法執行。

ScheduledThreadPoolExecutor

在說 ScheduledThreadPoolExecutor 以前咱們再看下 Timer 的註釋,註釋可都是乾貨千萬不要錯過。我作了點修改,突出了下重點。

Java 5.0 introduced ScheduledThreadPoolExecutor, It is effectively a more versatile replacement for the Timer, it allows multiple service threads. Configuring with one thread makes it equivalent to Timer。

簡單翻譯下:1.5 引入了 ScheduledThreadPoolExecutor,它是一個具備更多功能的 Timer 的替代品,容許多個服務線程。若是設置一個服務線程和 Timer 沒啥差異。

從註釋看出相對於 Timer ,可能就是單線程跑任務和多線程跑任務的區別。咱們來看下。

繼承了 ThreadPoolExecutor,實現了 ScheduledExecutorService。能夠定性操做就是正常線程池差很少了。區別就在於兩點,一個是 ScheduledFutureTask ,一個是 DelayedWorkQueue。

其實 DelayedWorkQueue 就是優先隊列,也是利用數組實現的小頂堆。而 ScheduledFutureTask 繼承自 FutureTask 重寫了 run 方法,實現了週期性任務的需求。

小結一下

ScheduledThreadPoolExecutor 大體的流程和 Timer 差很少,也是維護一個優先隊列,而後經過重寫 task 的 run 方法來實現週期性任務,主要差異在於能多線程運行任務,不會單線程阻塞

而且 Java 線程池的設定是 task 出錯會把錯誤吃了,無聲無息的。所以一個任務出錯也不會影響以後的任務

DelayQueue

Java 中還有個延遲隊列 DelayQueue,加入延遲隊列的元素都必須實現 Delayed 接口。延遲隊列內部是利用 PriorityQueue 實現的,因此仍是利用優先隊列!Delayed 接口繼承了Comparable 所以優先隊列是經過 delay 來排序的。

而後咱們再來看下延遲隊列是如何獲取元素的。

小結一下

也是利用優先隊列實現的,元素經過實現 Delayed 接口來返回延遲的時間。不過延遲隊列就是個容器,須要其餘線程來獲取和執行任務。

這下是搞明白了 Timer 、ScheduledThreadPool 和 DelayQueue,總結的說下它們都是經過優先隊列來獲取最先須要執行的任務,所以插入和刪除任務的時間複雜度都爲O(logn),而且 Timer 、ScheduledThreadPool 的週期性任務是經過重置任務的下一次執行時間來完成的。

問題就出在時間複雜度上,插入刪除時間複雜度是O(logn),那麼假設頻繁插入刪除次數爲 m,總的時間複雜度就是O(mlogn),這種時間複雜度知足不了 Kafka 這類中間件對性能的要求,而時間輪算法的插入刪除時間複雜度是O(1)。咱們來看看時間輪算法是如何實現的。

時間輪算法

俗話說藝術源於生活,技術也能從平常生活中找到靈感。我們先來看塊表,嗯金色的表。

都看清楚了吧,時間輪就是和手錶時鐘很類似的存在。時間輪用環形數組實現,數組的每一個元素能夠稱爲槽,和 HashMap同樣稱呼。

槽的內部用雙向鏈表存着待執行的任務,添加和刪除的鏈表操做時間複雜度都是 O(1),槽位自己也指代時間精度,好比一秒掃一個槽,那麼這個時間輪的最高精度就是 1 秒。

也就是說延遲 1.2 秒的任務和 1.5 秒的任務會被加入到同一個槽中,而後在 1 秒的時候遍歷這個槽中的鏈表執行任務。

從圖中能夠看到此時指針指向的是第一個槽,一共有八個槽0~7,假設槽的時間單位爲 1 秒,如今要加入一個延時 5 秒的任務,計算方式就是 5 % 8 + 1 = 6,即放在槽位爲 6,下標爲 5 的那個槽中。更具體的就是拼到槽的雙向鏈表的尾部。

而後每秒指針順時針移動一格,這樣就掃到了下一格,遍歷這格中的雙向鏈表執行任務。而後再循環繼續。

能夠看到插入任務從計算槽位到插入鏈表,時間複雜度都是O(1)。那假設如今要加入一個50秒後執行的任務怎麼辦?這槽好像不夠啊?難道要加槽嘛?和HashMap同樣擴容?

不是的,常見有兩種方式,一種是經過增長輪次的概念。50 % 8 + 1 = 3,即應該放在槽位是 3,下標是 2 的位置。而後 (50 - 1) / 8 = 6,即輪數記爲 6。也就是說當循環 6 輪以後掃到下標的 2 的這個槽位會觸發這個任務。Netty 中的 HashedWheelTimer 使用的就是這種方式。

還有一種是經過多層次的時間輪,這個和咱們的手錶就更像了,像咱們秒針走一圈,分針走一格,分針走一圈,時針走一格。

多層次時間輪就是這樣實現的。假設上圖就是第一層,那麼第一層走了一圈,第二層就走一格,能夠得知第二層的一格就是8秒,假設第二層也是 8 個槽,那麼第二層走一圈,第三層走一格,能夠得知第三層一格就是 64 秒。那麼一格三層,每層8個槽,一共 24 個槽時間輪就能夠處理最多延遲 512 秒的任務。

而多層次時間輪還會有降級的操做,假設一個任務延遲 500 秒執行,那麼剛開始加進來確定是放在第三層的,當時間過了 436 秒後,此時還須要 64 秒就會觸發任務的執行,而此時相對而言它就是個延遲 64 秒後的任務,所以它會被下降放在第二層中,第一層還放不下它。

再過個 56 秒,相對而言它就是個延遲 8 秒後執行的任務,所以它會再被降級放在第一層中,等待執行。

降級是爲了保證時間精度一致性。Kafka內部用的就是多層次的時間輪算法。

Netty中的時間輪

在 Netty 中時間輪的實現類是 HashedWheelTimer,代碼中的 wheel 就是上圖畫的循環數組,mask 的設計和HashMap同樣,經過限制數組的大小爲2的次方,利用位運算來替代取模運算,提升性能。tickDuration 就是每格的時間即精度。能夠看到配備了一個工做線程來處理任務的執行。

接下來咱們再來看看任務是如何添加的。

能夠看到任務並無直接添加到時間輪中,而是先入了一個 mpsc 隊列,我簡單說下 mpsc 是 JCTools 中的併發隊列,用在多個生產者可同時訪問隊列,但只有一個消費者會訪問隊列的狀況。篇幅有限,有興趣的朋友自行了解實現。

而後咱們再來看看工做線程是如何運做的。

很直觀沒什麼花頭,咱們先來看看 waitForNextTick,是如何獲得下一次執行時間的。

簡單的說就是經過 tickDuration 和此時已經滴答的次數算出下一次須要檢查的時間,時候未到就sleep等着。

再來看下任務如何入槽的。

註釋的很清楚了,實現也和上述分析的一致。

最後再來看下如何執行的。

就是經過輪數和時間雙重判斷,執行完了移除任務。

小結一下

整體上看 Netty 的實現就是上文說的時間輪經過輪數的實現,徹底一致。能夠看出時間精度由 TickDuration 把控,而且工做線程的除了處理執行到時的任務還作了其餘操做,所以任務不必定會被精準的執行。

並且任務的執行若是不是新起一個線程,或者將任務扔到線程池執行,那麼耗時的任務會阻塞下個任務的執行。

而且會有不少無用的 tick 推動,例如 TickDuration 爲1秒,此時就一個延遲350秒的任務,那就是有349次無用的操做。

可是從另外一面來看,若是任務都執行很快(固然你也能夠異步執行),而且任務數不少,經過分批執行,而且增刪任務的時間複雜度都是O(1)來講。時間輪仍是比經過優先隊列實現的延時任務來的合適些。

Kafka 中的時間輪

上面咱們說到 Kafka 中的時間輪是多層次時間輪實現,總的而言實現和上述說的思路一致。不過細節有些不一樣,而且作了點優化。

先看看添加任務的方法。在添加的時候就設置任務執行的絕對時間。

那麼時間輪是如何推進的呢?Netty 中是經過固定的時間間隔掃描,時候未到就等待來進行時間輪的推進。上面咱們分析到這樣會有空推動的狀況。

而 Kafka 就利用了空間換時間的思想,經過 DelayQueue,來保存每一個槽,經過每一個槽的過時時間排序。這樣擁有最先須要執行任務的槽會有優先獲取。若是時候未到,那麼 delayQueue.poll 就會阻塞着,這樣就不會有空推動的狀況發送。

咱們來看下推動的方法。

從上面的 add 方法咱們知道每次對比都是根據expiration < currentTime + interval 來進行對比的,而advanceClock 就是用來推動更新 currentTime 的。

小結一下

Kafka 用了多層次時間輪來實現,而且是按需建立時間輪,採用任務的絕對時間來判斷延期,而且對於每一個槽(槽內存放的也是任務的雙向鏈表)都會維護一個過時時間,利用 DelayQueue 來對每一個槽的過時時間排序,來進行時間的推動,防止空推動的存在。

每次推動都會更新 currentTime 爲當前時間戳,固然作了點微調使得 currentTime 是 tickMs 的整數倍。而且每次推動都會把能降級的任務從新插入降級。

能夠看到這裏的 DelayQueue 的元素是每一個槽,而不是任務,所以數量就少不少了,這應該是權衡了對於槽操做的延時隊列的時間複雜度與空推動的影響。

總結

首先介紹了 Timer、DelayQueue 和 ScheduledThreadPool,它們都是基於優先隊列實現的,O(logn) 的時間複雜度在任務數多的狀況下頻繁的入隊出隊對性能來講有損耗。所以適合於任務數很少的狀況。

Timer 是單線程的會有阻塞的風險,而且對異常沒有作處理,一個任務出錯 Timer 就掛了。而 ScheduledThreadPool 相比於 Timer 首先能夠多線程來執行任務,而且線程池對異常作了處理,使得任務之間不會有影響。

而且 Timer 和 ScheduledThreadPool 能夠週期性執行任務。 而 DelayQueue 就是個具備優先級的阻塞隊列。

對比而言時間輪更適合任務數很大的延時場景,它的任務插入和刪除時間複雜度都爲O(1)。對於延遲超過期間輪所能表示的範圍有兩種處理方式,一是經過增長一個字段-輪數,Netty 就是這樣實現的。二是多層次時間輪,Kakfa 是這樣實現的。

相比而言 Netty 的實現會有空推動的問題,而 Kafka 採用 DelayQueue 以槽爲單位,利用空間換時間的思想解決了空推動的問題。

能夠看出延遲任務的實現都不是很精確的,而且或多或少都會有阻塞的狀況,即便你異步執行,線程不夠的狀況下仍是會阻塞。

巨人的肩膀

《深刻理解Kafka:核心設計與實踐原理》

www.cnblogs.com/luozhiyun/p…


我是 yes,從一點點到億點點,咱們下篇見

往期推薦:

Kafka和RocketMQ底層存儲之那些你不知道的事

消息隊列面試熱點一鍋端

圖解+代碼|常見限流算法以及限流在單機分佈式場景下的思考

表弟面試被虐我教他緩存連招

面試官:說說Kafka處理請求的全流程

Kafka索引設計的亮點

Kafka日誌段讀寫分析

相關文章
相關標籤/搜索