Netty源碼分析系列文章已接近尾聲,本文再來分析Netty中兩個常見組件:FastThreadLoca與HashedWheelTimer。
源碼分析基於Netty 4.1.52算法
FastThreadLocal比較簡單。
FastThreadLocal和FastThreadLocalThread是配套使用的。
FastThreadLocalThread繼承了Thread,FastThreadLocalThread#threadLocalMap 是一個InternalThreadLocalMap,該InternalThreadLocalMap對象只能用於當前線程。
InternalThreadLocalMap#indexedVariables是一個數組,存放了當前線程全部FastThreadLocal對應的值。
而每一個FastThreadLocal都有一個index,用於定位InternalThreadLocalMap#indexedVariables。
數組
FastThreadLocal#get微信
public final V get() { // #1 InternalThreadLocalMap threadLocalMap = InternalThreadLocalMap.get(); // #2 Object v = threadLocalMap.indexedVariable(index); if (v != InternalThreadLocalMap.UNSET) { return (V) v; } // #3 return initialize(threadLocalMap); }
#1
獲取該線程的InternalThreadLocalMap
若是是FastThreadLocalThread,直接獲取FastThreadLocalThread#threadLocalMap。
不然,從UnpaddedInternalThreadLocalMap.slowThreadLocalMap獲取該線程InternalThreadLocalMap。
注意,UnpaddedInternalThreadLocalMap.slowThreadLocalMap是一個ThreadLocal,這裏實際回退到使用ThreadLocal了。#2
每一個FastThreadLocal都有一個index。
經過該index,獲取InternalThreadLocalMap#indexedVariables中存放的值#3
找不到值,經過initialize方法構建新對象。源碼分析
能夠看到,FastThreadLocal中連hash算法都不用,經過下標獲取對應的值,複雜度爲log(1),天然很快啦。性能
HashedWheelTimer是Netty提供的時間輪調度器。
時間輪是一種充分利用線程資源進行批量化任務調度的調度模型,可以高效的管理各類延時任務。
簡單說,就是將延時任務存放到一個環形隊列中,並經過執行線程定時執行該隊列的任務。this
例如,
環形隊列上有60個格子,
執行線程每秒移動一個格子,則環形隊列每輪可存放1分鐘內的任務。
如今有兩個定時任務
task1,32秒後執行
task2,2分25秒後執行
而執行線程當前位於第6格子
則task1放到32+6=38格,輪數爲0
task2放到25+6=31個,輪數爲2
執行線程將執行當前格子輪數爲0的任務,並將其餘任務輪數減1。
spa
缺點,時間輪調度器的時間精度不高。
由於時間輪算法的精度取決於執行線程移動速度。
例如上面例子中執行線程每秒移動一個格子,則調度精度小於一秒的任務就沒法準時調用。線程
HashedWheelTimer關鍵字段code
// 任務執行器,負責執行任務 Worker worker = new Worker(); // 任務執行線程 Thread workerThread; // HashedWheelTimer狀態, 0 - init, 1 - started, 2 - shut down int workerState; // 時間輪隊列,使用數組實現 HashedWheelBucket[] wheel; // 暫存新增的任務 Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue(); // 已取消任務 Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
添加延遲任務 HashedWheelTimer#newTimeoutorm
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { ... // #1 start(); // #2 long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; ... HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; }
#1
若是HashedWheelTimer未啓動,則啓動該HashedWheelTimer
HashedWheelTimer#start方法負責是啓動workerThread線程#2
startTime是HashedWheelTimer啓動時間
deadline是相對HashedWheelTimer啓動的延遲時間
構建HashedWheelTimeout,添加到HashedWheelTimer#timeouts
時間輪運行 Worker#run
public void run() { ... // #1 startTimeInitialized.countDown(); do { // #2 final long deadline = waitForNextTick(); if (deadline > 0) { // #3 int idx = (int) (tick & mask); processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; // #4 transferTimeoutsToBuckets(); // #5 bucket.expireTimeouts(deadline); // #6 tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // #7 ... }
#1
HashedWheelTimer#start方法阻塞HashedWheelTimer線程直到Worker啓動完成,這裏解除HashedWheelTimer線程阻塞。#2
計算下一格子開始執行的時間,而後sleep到下次格子開始執行時間#2
tick是從HashedWheelTimer啓動後移動的總格子數,這裏獲取tick對應的格子索引。
因爲Long類型足夠大,這裏並不考慮溢出問題。#4
將HashedWheelTimer#timeouts的任務遷移到對應的格子中#5
處理已到期任務#6
移動到下一個格子#7
這裏是HashedWheelTimer#stop後的邏輯處理,取消任務,中止時間輪
遷移任務 Worker#transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() { // #1 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { continue; } // #2 long calculated = timeout.deadline / tickDuration; // #3 timeout.remainingRounds = (calculated - tick) / wheel.length; // #4 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. // #5 int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; bucket.addTimeout(timeout); } }
#1
注意,每次只遷移100000個任務,以避免阻塞線程#2
任務延遲時間/每格時間數, 獲得該任務需延遲的總格子移動數#3
(總格子移動數 - 已移動格子數)/每輪格子數,獲得輪數#4
若是任務在timeouts隊列放得過久致使已通過了執行時間,則使用當前tick, 也就是放到當前bucket,以便儘快執行該任務#5
計算tick對應格子索引,放到對應的格子位置
執行到期任務 HashedWheelBucket#expireTimeouts
public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; while (timeout != null) { HashedWheelTimeout next = timeout.next; // #1 if (timeout.remainingRounds <= 0) { // #2 next = remove(timeout); if (timeout.deadline <= deadline) { // #3 timeout.expire(); } else { throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { // #4 timeout.remainingRounds --; } timeout = next; } }
#1
選擇輪數小於等於0的任務#2
移除任務#3
修改狀態爲過時,並執行任務#4
其餘任務輪數減1
ScheduledExecutorService使用堆(DelayedWorkQueue)維護任務,新增任務複雜度爲O(logN)。
而 HashedWheelTimer 新增任務複雜度爲O(1),因此在任務很是多時, HashedWheelTimer 能夠表現出它的優點。
可是任務較少甚至沒有任務時,HashedWheelTimer的執行線程都須要不斷移動,也會形成性能消耗。
注意,HashedWheelTimer使用同一個線程調用和執行任務,若是某些任務執行時間太久,則影響後續定時任務執行。固然,咱們也能夠考慮在任務中另起線程執行邏輯。
另外,若是任務過多,也會致使任務長期滯留在HashedWheelTimer#timeouts中而不能及時執行。
若是您以爲本文不錯,歡迎關注個人微信公衆號,系列文章持續更新中。您的關注是我堅持的動力!