[TOC]java
在許多業務場景中,咱們都會碰到延遲任務,定時任務這種需求。特別的,在網絡鏈接的場景中,經常會出現一些超時控制。因爲服務端的鏈接數量很大,這些超時任務的數量每每也是很龐大的。實現對大量任務的超時管理並非一個容易的事情。算法
本章咱們將介紹幾種用於實現超時任務的數據結構,而且最後分析 Netty 在超時任務上採起的結構和代碼。數組
歡迎加入技術交流羣186233599討論交流,也歡迎關注筆者公衆號:風火說。網絡
JDK 在 1.3 的時候引入了Timer
數據結構用於實現定時任務。Timer
的實現思路比較簡單,其內部有兩個主要屬性:數據結構
TaskQueue
:定時任務抽象類TimeTask
的列表。TimerThread
:用於執行定時任務的線程。Timer
結構還定義了一個抽象類TimerTask
而且繼承了Runnable
接口。業務系統實現了這個抽象類的run
方法用於提供具體的延時任務邏輯。多線程
TaskQueue
內部採用大頂堆的方式,依據任務的觸發時間進行排序。而TimerThread
則以死循環的方式從TaskQueue
獲取隊列頭,等待隊列頭的任務的超時時間到達後觸發該任務,而且將任務從隊列中移除。併發
Timer
的數據結構和算法都很容易理解。全部的超時任務都首先進入延時隊列。後臺超時線程不斷的從延遲隊列中獲取任務而且等待超時時間到達後執行任務。延遲隊列採用大頂堆排序,在延遲任務的場景中有三種操做,分別是:添加任務,提取隊列頭任務,查看隊列頭任務。數據結構和算法
查看隊列頭任務的事件複雜度是 O(1) 。而添加任務和提取隊列頭任務的時間複雜度都是 O(Log2n) 。當任務數量較大時,添加和刪除的開銷也是比較大的。此外,因爲Timer
內部只有一個處理線程,若是有一個延遲任務的處理消耗了較多的時間,會對應的延遲後續任務的處理。性能
因爲Timer
只有一個線程用來處理延遲任務,在任務數量不少的時候顯然是不足夠的。在 JDK1.5 引入線程池接口ExecutorService
後,也對應的提供了一個用於處理延時任務的ScheduledExecutorService
子類接口。該接口內部也同樣使用了一個使用小頂堆進行排序的延遲隊列存聽任務。線程池中的線程會在這個隊列上等待直到有任務能夠提取。this
ScheduledExecutorService
的實現上有一些特殊,只有一個線程可以提取到延遲隊列頭的任務,而且根據任務的超時時間進行等待。在這個等待期間,其餘的線程是沒法獲取任務的。這樣的實現是爲了不多個線程同時獲取任務,致使超時時間未到達就職務觸發或者在等待任務超時時間時有新的任務被加入而沒法響應。
因爲ScheduledExecutorService
可使用多個線程,這樣也緩解了由於個別任務執行時間長致使的後續任務被阻塞的狀況。不過延遲隊列也是同樣採用小頂堆的排序方式,所以添加任務和刪除任務的時間複雜度都是 O(Log2n) 。在任務數量很大的狀況下,性能表現比較差。
雖然Timer
和ScheduledThreadPoolExecutor
都提供了對延遲任務的支撐能力,可是因爲新增任務和提取任務的時間複雜度都是 O(Log2n) ,在任務數量很大,好比幾萬,十幾萬的時候,性能的開銷就變得很巨大。
那麼,是否存在新增任務和提取任務比 O(Log2n) 複雜度更低的數據結構呢?答案是存在的。在論文《Hashed and Hierarchical Timing Wheels》中設計了一種名爲時間輪( Timing Wheels )的數據結構,這種結構在處理延遲任務時,其新增任務和刪除任務的時間複雜度下降到了 O(1) 。
時間輪的數據結構很相似於咱們鐘錶上的數據指針,故而得名時間輪。其數據結構用圖示意以下
每個時間「格子」咱們稱之爲槽位,槽位中存放着延遲任務隊列。槽位自己表明着一個時間單位,好比 1 秒。時間輪擁有的槽位個數就是該時間輪可以處理的最大延遲跨度的任務,槽位的時間單位表明着時間輪的精度。這意味着小於時間單位的時間在該時間輪是沒法被區分的。
槽位上的延遲任務隊列中的任務都有相同的延遲時間。每個單位時間,指針都會移動到下一個槽位。當指針指向某一個槽位時,該槽位的延遲任務隊列中的任務都會被觸發。
當有一個延遲任務要插入時間輪時,首先計算其延遲時間與單位時間的餘值,從指針指向的當前槽位移動餘值的個數槽位,就是該延遲任務須要被放入的槽位。
舉個例子,時間輪有8個槽位,編號爲 0 ~ 7 。指針當前指向槽位 2 。新增一個延遲時間爲 4 秒的延遲任務,4 % 8 = 4,所以該任務會被插入 4 + 2 = 6,也就是槽位6的延遲任務隊列。
時間輪的槽位實現能夠採用循環數組的方式達成,也就是讓指針在越過數組的邊界後從新回到起始下標。歸納來講,能夠將時間輪的算法描述爲
用隊列來存儲延遲任務,同一個隊列中的任務,其延遲時間相同。用循環數組的方式來存儲元素,數組中的每個元素都指向一個延遲任務隊列。
有一個當前指針指向數組中的某一個槽位,每間隔一個單位時間,指針就移動到下一個槽位。被指針指向的槽位的延遲隊列,其中的延遲任務所有被觸發。
在時間輪中新增一個延遲任務,將其延遲時間除以單位時間獲得的餘值,從當前指針開始,移動餘值對應個數的槽位,就是延遲任務被放入的槽位。
基於這樣的數據結構,插入一個延遲任務的時間複雜度就降低到 O(1) 。而當指針指向到一個槽位時,該槽位鏈接的延遲任務隊列中的延遲任務所有被觸發。
延遲任務的觸發和執行不該該影響指針向後移動的時間精確性。所以通常狀況下,用於移動指針的線程只負責任務的觸發,任務的執行交由其餘的線程來完成。好比,能夠將槽位上的延遲任務隊列放入到額外的線程池中執行,而後在槽位上新建一個空白的新的延遲任務隊列用於後續任務的添加。
在基本原理中咱們分析了時間輪的基礎結構。不過當時咱們假設須要插入的延遲任務的時間不會超過期間輪的長度,也就是說每個槽位上的延遲任務隊列中的任務的延遲時間都是相同的。
在這種狀況下,要支持更大時間跨度的延遲任務,要麼增長時間輪的槽位數,要麼減小時間輪的精度,也就是每個槽位表明的單位時間。時間輪的精度顯然是一個業務上的硬性要求,那麼只能增長槽位數。假設要求精度爲 1 秒,要能支持延遲時間爲 1 天的延遲任務,時間輪的槽位數須要 60 × 60 × 24 = 86400 。這就須要消耗更多的內存。顯然,單純增長槽位數並非一個好的解決方案。
在論文中,針對大跨度的延遲任務支持,提供了兩種擴展方案。
在該方案中,算法引入了「輪次」的概念,延遲任務的延遲時間除以時間輪長度獲得的商值爲輪次。延遲任務的延遲時間除以時間輪長度獲得的餘數爲要插入的槽位偏移量。
當插入延遲任務時首先計算輪次和槽位偏移量,經過槽位偏移量肯定延遲任務插入的槽位。當指針指向某一個槽位時,對槽位指向的延遲任務隊列進行遍歷,其中輪次爲0的延遲任務所有觸發,其他任務則等待下一個週期。
經過引入輪次,就能夠在有限的槽位上支持無窮時間範圍的延遲任務。可是雖然插入任務的時間複雜度仍然是 O(1) ,可是在延遲任務觸發時卻須要遍歷延遲任務隊列來確認其輪次是否爲0。任務觸發時的時間複雜卻上升爲了 O(n) 。
對於這個狀況,還有一個變化的細節能夠採用,就是將延遲任務隊列按照輪次進行排序,比方說使用小頂堆對延遲任務隊列進行排序。這樣,當指針指向一個槽位觸發延遲任務時,只須要不斷的從隊列頭取出任務進行輪次檢查,一旦任務輪次不等於0就能夠中止。任務觸發的時間複雜度降低爲 O(1) 。對應的,因爲隊列是排序的了,任務插入的時候除了須要定位插入的槽位,還須要定位在隊列中的插入位置。插入的時間複雜度變化爲 O(1) 和 O(Log2n) ,n 爲該槽位上延遲任務隊列的長度。
看看手錶的設計,有秒針,分針,時針。像秒針與分針,雖然都有 60 格 ,可是各自的格子表明的時間長度不一樣。參考這個思路,咱們能夠聲明多個不一樣層級的時間輪,每個時間輪的槽位的時間跨度是其次級時間輪的總體時間範圍。
當低層級的時間輪的指針完整的走完一圈,其對應的高層級時間輪對應的移動一個槽位。而且高層級時間輪指針指向的槽位中的任務按照延遲時間計算,從新放入到低層級時間輪的不一樣槽位中。這樣的方式,保證了每個時間輪中的每個槽位的延遲任務隊列中的任務都具有相同時間精度的延遲時間。
以精度爲 1 秒,時間範圍爲 1 天的時間輪爲例子,能夠設計三級時間輪:秒級時間輪有 60 個槽位,每一個槽位的時間爲 1 秒;分鐘級時間輪有 60 個槽位,每一個槽位的時間爲 60 秒;小時級時間輪有24個槽位,每一個槽位的時間爲 60 分鐘。當秒級時間輪走完 60 秒後,秒級時間輪的指針再次指向下標爲0的槽位,而分鐘級時間輪的指針向後移動一個槽位,而且將該槽位上的延遲任務所有取出而且從新計算後放入秒級時間輪。
總共只須要 60 + 60 + 24 = 144 個槽位便可支撐。對比上面提到的單級時間輪須要 86400 個槽位而言,節省了至關的內存。
層級時間輪有兩種常見的作法:
時間輪算法的核心思想就是經過循環數組和指針移動的方式,將新增延遲任務的時間複雜度降低到 O(1) ,可是在具體實現上,包括如何處理更大時間跨度的延遲任務上,各家不一樣的實現都會有一些細節上的變化。下面咱們以 Netty 中都時間輪實現爲例子來進行代碼分析。
Netty 的實現自定義了一個超時器的接口io.netty.util.Timer
,其方法以下
public interface Timer {
//新增一個延時任務,入參爲定時任務TimerTask,和對應的延遲時間
Timeout newTimeout(TimerTask task, long delay, TimeUnit unit);
//中止時間輪的運行,而且返回全部未被觸發的延時任務
Set < Timeout > stop();
}
public interface Timeout {
Timer timer();
TimerTask task();
boolean isExpired();
boolean isCancelled();
boolean cancel();
}
複製代碼
Timeout
接口是對延遲任務的一個封裝,其接口方法說明其實現內部須要維持該延遲任務的狀態。後續咱們分析其實現內部代碼時能夠更容易的看到。
Timer
接口有惟一實現HashedWheelTimer
。首先來看其構造方法,以下
public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection, long maxPendingTimeouts) {
//省略代碼,省略參數非空檢查內容。
wheel = createWheel(ticksPerWheel);
mask = wheel.length - 1;
//省略代碼,省略槽位時間範圍檢查,避免溢出以及小於 1 毫秒。
workerThread = threadFactory.newThread(worker);
//省略代碼,省略資源泄漏追蹤設置以及時間輪實例個數檢查
}
複製代碼
首先是方法createWheel
,用於建立時間輪的核心數據結構,循環數組。來看下其方法內容
private static HashedWheelBucket[] createWheel(int ticksPerWheel)
{
//省略代碼,確認 ticksPerWheel 處於正確的區間
//將 ticksPerWheel 規範化爲 2 的次方冪大小。
ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
for(int i = 0; i < wheel.length; i++)
{
wheel[i] = new HashedWheelBucket();
}
return wheel;
}
複製代碼
數組的長度爲 2 的次方冪方便進行求商和取餘計算。
HashedWheelBucket
內部存儲着由HashedWheelTimeout
節點構成的雙向鏈表,而且存儲着鏈表的頭節點和尾結點,方便於任務的提取和插入。
方法HashedWheelTimer#newTimeout
用於新增延遲任務,下面來看下代碼
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
//省略代碼,用於參數檢查
start();
long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
if(delay > 0 && deadline < 0)
{
deadline = Long.MAX_VALUE;
}
HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
timeouts.add(timeout);
return timeout;
}
複製代碼
能夠看到,在新增任務的時候,任務並非直接進入到循環數組中,而是首先被放入到一個隊列,也就是屬性timeouts
,該隊列是一個 MPSC 類型的隊列,採用這個模式主要出於提高併發性能考慮,由於這個隊列只有線程workerThread
會進行任務提取操做。
該線程是在構造方法中經過調用workerThread = threadFactory.newThread(worker)
被建立。可是建立以後並非立刻執行線程的start
方法,其啓動的時機是這個時間輪第一次新增延遲任務的時候,也就是本方法中的start
方法的內容。下面是其代碼
public void start() {
switch(WORKER_STATE_UPDATER.get(this))
{
case WORKER_STATE_INIT:
if(WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED))
{
workerThread.start();
}
break;
case WORKER_STATE_STARTED:
break;
case WORKER_STATE_SHUTDOWN:
throw new IllegalStateException("cannot be started once stopped");
default:
throw new Error("Invalid WorkerState");
}
while(startTime == 0)
{
try
{
startTimeInitialized.await();
}
catch(InterruptedException ignore)
{
// Ignore - it will be ready very soon.
}
}
}
複製代碼
方法很明顯的分爲兩個部分,第一部分爲Switch
方法塊,經過對狀態變量的 CAS 操做,確保只有一個線程可以執行workerThread.start()
方法來啓動工做線程,避免併發異常。第二部分爲阻塞等待,經過CountDownLatch
類型變量startTimeInitialized
執行阻塞等待,用於等待工做線程workerThread
真正進入工做狀態。
從newTimeout
方法的角度來看,插入延遲任務首先是放入隊列中,以前分析數據結構的時候也說過任務的觸發是指針指向時間輪中某個槽位時進行,那麼必然存在一個須要將隊列中的延遲任務放入到時間輪的數組之中的工做。這個動做顯然就是就是由workerThread
工做線程來完成。下面就來看下這個線程的具體代碼內容。
工做線程是依託於HashedWheelTimer.Worker
這個實現了Runnable
接口的類進行工做的,那下面看下其對run
方法的實現代碼,以下
public void run() {
{//代碼塊①
startTime = System.nanoTime();
if(startTime == 0)
{
//使用startTime==0 做爲線程進入工做狀態模式標識,所以這裏從新賦值爲1
startTime = 1;
}
//通知外部初始化工做線程的線程,工做線程已經啓動完畢
startTimeInitialized.countDown();
}
{//代碼塊②
do {
final long deadline = waitForNextTick();
if(deadline > 0)
{
int idx = (int)(tick & mask);
processCancelledTasks();
HashedWheelBucket bucket = wheel[idx];
transferTimeoutsToBuckets();
bucket.expireTimeouts(deadline);
tick++;
}
} while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);
}
{//代碼塊③
for(HashedWheelBucket bucket: wheel)
{
bucket.clearTimeouts(unprocessedTimeouts);
}
for(;;)
{
HashedWheelTimeout timeout = timeouts.poll();
if(timeout == null)
{
break;
}
if(!timeout.isCancelled())
{
unprocessedTimeouts.add(timeout);
}
}
processCancelledTasks();
}
}
複製代碼
爲了方便閱讀,這邊將run
方法的內容分爲三個代碼塊。首先來看代碼塊①。經過系統調用System.nanoTime
爲啓動時間startTime
設置初始值,該變量表明瞭時間輪的基線時間,用於後續相對時間的計算。賦值完畢後,經過startTimeInitialized
變量對外部的等待線程進行通知。
接着來看代碼塊②。這是主要的工做部分,總體是在一個while
循環中,確保工做線程只在時間輪沒有被終止的時候工做。首先來看方法waitForNextTick
,在時間輪中,指針移動一次,稱之爲一個tick
,這個方法顯然內部應該是用於等待指針移動到下一個tick
,來看具體代碼,以下
private long waitForNextTick() {
long deadline = tickDuration * (tick + 1);
for(;;)
{
final long currentTime = System.nanoTime() - startTime;
long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
if(sleepTimeMs <= 0)
{
if(currentTime == Long.MIN_VALUE)
{
return -Long.MAX_VALUE;
}
else
{
return currentTime;
}
}
if(PlatformDependent.isWindows())
{
sleepTimeMs = sleepTimeMs / 10 * 10;
}
try
{
Thread.sleep(sleepTimeMs);
}
catch(InterruptedException ignored)
{
if(WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN)
{
return Long.MIN_VALUE;
}
}
}
}
複製代碼
整個方法的思路很簡單,前面說過,時間輪每移動一次指針,意味着一個tick
。這裏tick
能夠當作是指針移動的次數。因爲槽位的時間範圍是固定的,所以能夠簡單的計算出來指針移動到下一個槽位,理論上應該通過的時間,也就是long deadline = tickDuration * (tick + 1)
。以後再計算從時間輪啓動到當前,實際通過的時間,也就是long currentTime = System.nanoTime() - startTime
。兩者的差值就是線程所須要睡眠的時間。
若是差值小於0,意味着實際通過的時間超過了理論時間,此時已經超出了應該休眠的範圍,方法須要當即返回。因爲在這個方法的執行過程當中,可能會遇到時間輪被中止的狀況,所以使用一個特殊值來表達這個事件,也就是Long.MIN_VALUE
,這也是爲何currentTime
要避開這個值的緣由。
還有一點須要注意,Thread.sleep
方法的實現是依託於操做系統提供的中斷檢查,也就是操做系統會在每個中斷的時候去檢查是否有線程須要喚醒而且提供CPU資源。默認狀況下 Linux 的中斷間隔是 1 毫秒,而 Windows 的中斷間隔是 10 毫秒或者 15 毫秒,具體取決於硬件識別。
若是是在 Windows 平臺下,當方法調用Thread.sleep
傳入的參數不是10的整數倍時,其內部會調用系統方法timeBeginPeriod()
和timeEndPeriod()
來修改中斷週期爲 1 毫秒,而且在休眠結束後再次設置回默認值。這樣的目的是爲了保證休眠時間的準確性。可是在 Windows 平臺下,頻繁的調用修改中斷週期會致使 Windows 時鐘出現異常,大多數時候的表現是致使時鐘加快。這將致使好比嘗試休眠 10 秒時,實際上只休眠了 9 秒。因此在這裏,經過sleepTimeMs = sleepTimeMs / 10 * 10
保證了sleepTimeMs
是 10 的整數倍,從而避免了 Windows 的這個 BUG 。
當方法waitForNextTick
返回後,而且返回的值是正數,意味着當前tick
的休眠等待已經完成,能夠進行延遲任務的觸發處理了。經過int idx = (int)(tick & mask)
調用,肯定下一個被觸發延遲任務的槽位在循環數組中的下標。在處理觸發任務以前,首先將已經取消的延遲任務從槽位所指向的延遲任務隊列中刪除。每次調用HashedWheelTimer#newTimeout
新增延遲任務時都會返回一個Timeout
對象,能夠經過cancle
方法將這個延遲任務取消。當執行取消動做的時候,並不會直接從延遲隊列中刪除,而是將這個對象放入到取消隊列,也就是HashedWheelTimer.cancelledTimeouts
屬性。在準備遍歷槽位上延遲任務隊列以前,經過方法processCancelledTasks
來遍歷這個取消隊列,將其中的延遲任務從各自槽位上的延遲任務隊列中刪除。使用這種方式的好處在於延遲任務的刪除只有一個線程會進行,避免了多線程帶來的併發干擾,減小了開發難度。
在處理完取消的延遲任務後,調用方法transferTimeoutsToBuckets
來將新增延遲任務隊列HashedWheelTimer.timeouts
中的延遲任務分別添加到合適其延遲時間的槽位中。方法的代碼很簡單,就是循環不斷從timeouts
取出任務,而且計算其延遲時間與時間輪範圍的商值和餘數,結果分別爲其輪次與槽位下標。根據槽位下標將該任務添加到槽位對應的延遲任務隊列中。
在這裏能夠看到 Netty 做者對時間輪這一結構的併發設計,新增任務是向 MPSC 隊列新增元素實現。而槽位上的延遲任務隊列只有時間輪自己的線程可以進行新增和刪除,設計爲了 SPSC 模式。前者是爲了提升無鎖併發下的性能,後者則是經過約束,減小了設計難度。
transferTimeoutsToBuckets
方法每次最多隻會轉移 100000 個延遲任務到合適的槽位中,這是爲了不外部循環添加任務致使的餓死。方法執行完畢後,就到了槽位上延遲任務的觸發處理,也就是方法HashedWheelBucket#expireTimeouts
的功能,方法內的邏輯也很簡單。遍歷隊列,若是延遲任務的輪次不爲 0,則減 1。不然觸發任務執行方法,也就是HashedWheelTimeout#expire
。該方法內部依然經過 CAS 方式對狀態進行更新,避免方法的觸發和取消之間的競爭衝突。從這個方法的實現能夠看到,Netty 採用了輪次的方式來對超出時間輪範圍的延遲時間進行支持。多層級時間輪的實現相比輪次概念的實現更爲複雜,考慮到在網絡IO應用中,超出時間輪範圍的場景比較少,使用輪次的方式去支撐更大的時間,是一個相對容易實現的方案。
當須要被觸發的延遲任務都被觸發後,經過tick
加 1 來表達指針移動到下一個槽位。
外部線程經過調用HashedWheelTimer#stop
方法來中止時間輪,中止的方式很簡單,就是經過 CAS 調用來修改時間輪的狀態屬性。而在代碼塊②中經過循環的方式在每一次tick
都會檢查這個狀態位。代碼塊③的內容很簡單,遍歷全部的槽位,而且遍歷槽位的延遲任務隊列,將全部未到達延遲時間而且未取消的任務,都放入到一個集合中,最終將這個集合返回。這個集合內存儲的就是全部未能執行的延遲任務。
在處理大量延遲任務的場景中,時間輪是一個很高效的算法與數據結構。Netty 在對時間輪的實現上,在添加任務,過時任務,刪除任務等環節進行了一些細節上的調整。實際上,不一樣中間件中都有對時間輪的一些實現,各自也都有區別,可是核心都是圍繞在循環數組與槽位過時這個概念上。不一樣的細節變化有各自適合的場景和考量。