1. 簡單介紹.html
A Timer optimized for approximated I/O timeout scheduling. java
關於Timer的介紹能夠看看這篇文章, 寫得不錯 : http://novoland.github.io/%E5%B9%B6%E5%8F%91/2014/07/26/%E5%AE%9A%E6%97%B6%E5%99%A8%EF%BC%88Timer%EF%BC%89%E7%9A%84%E5%AE%9E%E7%8E%B0.html git
能夠看到, HashedWheelTimer 主要用來高效處理大量定時任務, 且任務對時間精度要求相對不高, 好比連接超時管理等場景, 缺點是, 內存佔用相對較高.github
2. 簡單例子.
shell
1) 引入最新的Netty 5依賴 (不必定須要Netty5之前的版本里HashedWheelTimer就已存在)apache
<dependency> <groupId>io.netty</groupId> <artifactId>netty-all</artifactId> <version>5.0.0.Alpha2</version> </dependency>
2) 例子代碼(LocalTime是Java8的時間類)數組
import io.netty.util.HashedWheelTimer; import java.time.LocalTime; import java.util.concurrent.TimeUnit; public class Temp { public static void main(String[] args) throws Exception { //建立Timer, 精度爲100毫秒, HashedWheelTimer timer = new HashedWheelTimer(100, TimeUnit.MILLISECONDS, 16); System.out.println(LocalTime.now()); timer.newTimeout((timeout) -> { System.out.println(LocalTime.now()); System.out.println(timeout); }, 5, TimeUnit.SECONDS); //阻塞main線程 System.in.read(); } }
能夠看到輸出:app
14:55:13.735 14:55:18.845 HashedWheelTimer$HashedWheelTimeout(deadline: 101009856 ns ago, task: com.haogrgr.test.main.Temp$$Lambda$4/708890004@1b797119)
3. 原理簡介.函數
1) 原理如圖所示.this
能夠看到, 就像一個時鐘同樣, 那麼若是寫代碼來實現一個時鐘的話, 大概相似於這樣:
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import org.apache.commons.lang.math.IntRange; public class Temp { public static void main(String[] args) throws Exception { Clock clock = new Clock().start(); for (int i = 0; i < 5; i++) { clock.echo(); Thread.sleep(500); } } } final class Clock { static ScheduledExecutorService updater = Executors.newSingleThreadScheduledExecutor(); private int tick = 0; private int[] wheel = new IntRange(1, 60).toArray(); Clock start() { //1s累加一次tick, 當到60s時歸零 updater.scheduleAtFixedRate(() -> tick = ++tick % wheel.length, 0, 1, TimeUnit.SECONDS); return this; } void echo() { System.out.println("當前時鐘 : " + wheel[tick]); } }
運行輸出:
當前時鐘 : 1 當前時鐘 : 2 當前時鐘 : 2 當前時鐘 : 3 當前時鐘 : 3
回到主題, 來看看HashedWheelTimer的構造函數參數:
HashedWheelTimer( ThreadFactory threadFactory, //相似於Clock中的updater, 負責建立Worker線程. long tickDuration, //時間刻度之間的時長(默認100ms), 通俗的說, 就是多久tick++一次. TimeUnit unit, //tickDuration的單位. int ticksPerWheel //相似於Clock中的wheel的長度(默認512). ):
除了構造函數參數, 還有一個比較重要的概念, 輪(Round) : 一輪的時長爲 tickDuration * ticksPerWheel, 也就是轉一圈的時長.
其中Worker線程爲HashedWheelTimer的核心, 主要負責每過tickDuration時間就累加一次tick. 同時, 也負責執行到期的timeout任務, 同時, 也負責添加timeou任務到指定的wheel中.
當添加Timeout任務的時候, 會根據設置的時間, 來計算出須要等待的時間長度, 根據時間長度, 進而算出要通過多少次tick, 而後根據tick的次數來算出通過多少輪, 最終得出task在wheel中的位置.
例如, 若是任務設置爲在100s後執行. 若是按照默認的HashedWheelTimer配置(tickDuration爲100ms, wheel長爲512)則:
任務須要通過的tick數爲: (100 * 1000) / 100 = 1000次 (等待時長 / tickDuration) 任務須要通過的輪數爲 : 1000次 / 512次/輪 = 1輪 (tick總次數 / ticksPerWheel) 任務存放的wheel索引爲 : 1000 - 512 = 488 (走完n輪時間後, 還要多少個tick) 因此這裏任務須要通過一輪後, 還要等待488次tick, 纔會執行, 進而任務存放的wheel位置也就是488.
到這裏, 大概原理已經介紹完了, 接下來看源碼吧.
4. 主要成員.
1) HashedWheelTimer, 對外的類, 主要負責啓動Worker線程, 添加任務等.
2) Worker, 內部負責添加任務, 累加tick, 執行任務等.
3) HashedWheelTimeout, 任務的包裝類, 鏈表結構, 負責保存deadline, 輪數, 等.
4) HashedWheelBucket, wheel數組元素, 負責存放HashedWheelTimeout鏈表.
5. HashedWheelTimer源碼走讀.
下面是HashedWheelTimer的代碼, 去掉了一些非關鍵代碼.
public class HashedWheelTimer implements Timer { private static final AtomicIntegerFieldUpdater<HashedWheelTimer> WORKER_STATE_UPDATER; static { AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater = PlatformDependent .newAtomicIntegerFieldUpdater(HashedWheelTimer.class, "workerState"); if (workerStateUpdater == null) { workerStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class, "workerState"); } WORKER_STATE_UPDATER = workerStateUpdater; } private final Worker worker = new Worker(); private final Thread workerThread; public static final int WORKER_STATE_INIT = 0; public static final int WORKER_STATE_STARTED = 1; public static final int WORKER_STATE_SHUTDOWN = 2; @SuppressWarnings({ "unused", "FieldMayBeFinal", "RedundantFieldInitialization" }) private volatile int workerState = WORKER_STATE_INIT; // 0 - init, 1 - started, 2 - shut down private final long tickDuration; private final HashedWheelBucket[] wheel; private final int mask; private final CountDownLatch startTimeInitialized = new CountDownLatch(1); private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue(); private final Queue<Runnable> cancelledTimeouts = PlatformDependent.newMpscQueue(); private volatile long startTime; //建立Timer public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { //校驗參數, 代碼略 //建立wheel數組, 和HashMap的entry數組長度相似, 爲2的次方. wheel = createWheel(ticksPerWheel); //用於計算任務存放wheel的索引 //由於wheel長度爲2的次方, 則, 若是長度爲16(10000), mask就爲15(1111) //那麼, 經過 n & mask 就能夠實現 相似於 n % mask, 而 & 更高效........ mask = wheel.length - 1; //tickDuration 不能大於 Long.MAX_VALUE / wheel.length, 也就是一輪的時間不能大於Long.MAX_VALUE 納秒 this.tickDuration = unit.toNanos(tickDuration); //建立worker線程 workerThread = threadFactory.newThread(worker); } //建立wheel數組 private static HashedWheelBucket[] createWheel(int ticksPerWheel) { //參數校驗, 略 //2的次方 ticksPerWheel = 1; while (ticksPerWheel < ticksPerWheel) { ticksPerWheel <<= 1; } //初始化 HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i++) { wheel[i] = new HashedWheelBucket(); } return wheel; } //啓動Timer, 不須要顯示調用, 調用newTimeout時, 會自動調用該方法 public void start() { //初始爲WORKER_STATE_INIT, cas修改成WORKER_STATE_STARTED, 並啓動worker線程 switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } //等待worker啓動, 並初始化startTime完成 while (startTime == 0) { try { startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } } //中止Timer public Set<Timeout> stop() { //worker線程不能調用stop方法, 也就是咱們添加的Task中不能調用stop方法. if (Thread.currentThread() == workerThread) { throw new IllegalStateException(HashedWheelTimer.class.getSimpleName() + ".stop() cannot be called from " + TimerTask.class.getSimpleName()); } //cas修改狀態爲shutdown, 若是修改失敗, 則當前狀態只多是WORKER_STATE_INIT和WORKER_STATE_SHUTDOWN if (!WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_STARTED, WORKER_STATE_SHUTDOWN)) { WORKER_STATE_UPDATER.set(this, WORKER_STATE_SHUTDOWN);//老是設置爲WORKER_STATE_SHUTDOWN return Collections.emptySet();//狀態爲0和2時, 是沒有遺留任務的. } //中斷worker線程, worker線程中會輪詢Timer狀態的. boolean interrupted = false; while (workerThread.isAlive()) { workerThread.interrupt(); try { workerThread.join(100); } catch (InterruptedException ignored) { interrupted = true; } } //恢復中斷標誌 if (interrupted) { Thread.currentThread().interrupt(); } //返回未處理的任務 return worker.unprocessedTimeouts(); } //添加定時任務, delay爲延遲時間 public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { start();//未啓動, 則啓動 //任務先添加到timeouts隊列中, 等待下一個tick時, 再添加到對應的wheel中去. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); timeouts.add(timeout); return timeout; } }
註釋的比較詳細了, 代碼也比較簡單, 就很少說了, 看註釋吧.
添加任務時, 並非直接將人物添加到wheel中, 而是先放入隊列, 再等待Worker線程在下一次tick時, 將人物放入wheel中.
AtomicIntegerFieldUpdater是JUC的類, Netty會判斷, 當存在Unsafe時, 會使用Netty本身利用Unsafe實現的UnsafeAtomicIntegerFieldUpdater.
字數限制... 接第二篇...