HashedWheelTimer是採用一種定時輪的方式來管理和維護大量的Timer調度算法.Linux 內核中的定時器採用的就是這個方案。
環形結構能夠根據超時時間的 hash 值(這個 hash 值實際上就是ticks & mask)將 task 分佈到不一樣的槽位中, 當 tick 到那個槽位時, 只須要遍歷那個槽位的 task 便可知道哪些任務會超時(而使用線性結構, 你每次 tick 都須要遍歷全部 task), 因此, 咱們任務量大的時候, 相應的增長 wheel 的 ticksPerWheel 值, 能夠減小 tick 時遍歷任務的個數.算法
public HashedWheelTimer( ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. // 構造時間輪的槽位數,槽位數只能是2的冪次方 wheel = createWheel(ticksPerWheel); // 時間輪槽位數 mask = wheel.length - 1; // Convert tickDuration to nanos. // 初始化時間週期 this.tickDuration = unit.toNanos(tickDuration); // Prevent overflow. if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } // 初始化輪詢時間輪的線程,使用這個線程週期性的輪詢時間輪 workerThread = threadFactory.newThread(worker); this.maxPendingTimeouts = maxPendingTimeouts; if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT && WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) { reportTooManyInstances(); } }
private static HashedWheelBucket[] createWheel(int ticksPerWheel) { if (ticksPerWheel <= 0) { throw new IllegalArgumentException( "ticksPerWheel must be greater than 0: " + ticksPerWheel); } if (ticksPerWheel > 1073741824) { throw new IllegalArgumentException( "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel); } // HashedWheelBucket數組長度是2的冪次方,獲取<=ticksPerWheel最大的2的冪次方 ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel); HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel]; for (int i = 0; i < wheel.length; i++) { wheel[i] = new HashedWheelBucket(); } return wheel; }
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 開啓時間輪輪詢 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 將定時任務封裝成HashedWheelTimeout HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); // 將定時任務存儲到任務鏈表中 timeouts.add(timeout); return timeout; }
public void start() { // 判斷HashedWheelTimer狀態,若是狀態開啓,則開啓輪詢線程 switch (WORKER_STATE_UPDATER.get(this)) { case WORKER_STATE_INIT: if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { // 阻塞當前線程,目的是保證輪詢線程workerThread開啓 startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
public void run() { // Initialize the startTime. // 初始化開始時間 startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } // Notify the other threads waiting for the initialization at start(). // 喚醒阻塞的線程 startTimeInitialized.countDown(); do { // 根據週期時間tickDuration,進行週期性的tick下一個槽位 final long deadline = waitForNextTick(); if (deadline > 0) { // 獲取下一個槽位的角標 int idx = (int) (tick & mask); processCancelledTasks(); // 獲取該角標對應的HashedWheelBucket對象 HashedWheelBucket bucket = wheel[idx]; // 將存儲在鏈表timeOuts中的定時任務存儲到對應的槽位的HashedWheelBucket對象中 transferTimeoutsToBuckets(); // 執行槽位中定時任務 bucket.expireTimeouts(deadline); tick++; } } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (; ; ) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } processCancelledTasks(); }
private long waitForNextTick() { // 獲取下一個槽位的等待時間 long deadline = tickDuration * (tick + 1); for (; ; ) { // 獲取當前時間間隔 final long currentTime = System.nanoTime() - startTime; // 計算tick到下一個槽位須要等待的時間 long sleepTimeMs = (deadline - currentTime + 999999) / 1000000; // 當前時間間隔大於等於下一個槽位週期時間,不須要等待,直接返回(從這個地方就能夠得出HashedWheelTimer對時間精度要求不高,並非嚴格按照延遲時間來執行的) if (sleepTimeMs <= 0) { if (currentTime == Long.MIN_VALUE) { return -Long.MAX_VALUE; } else { return currentTime; } } if (isWindows()) { sleepTimeMs = sleepTimeMs / 10 * 10; } try { // 當前時間間隔小於下一個槽位週期時間,則進行休眠 Thread.sleep(sleepTimeMs); } catch (InterruptedException ignored) { if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) { return Long.MIN_VALUE; } } } }
private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. // 遍歷timeouts鏈表,默認遍歷鏈表100000個任務 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } // 任務的狀態等於取消,直接跳過 if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } // 設置任務須要輪詢的圈數,如:槽位=8,週期tickDuration=100ms,任務時間=900ms,則說明須要輪詢一圈後,才能會執行到該任務,即remainingRounds= 1,槽位角標stopIndex=1 long calculated = timeout.deadline / tickDuration; timeout.remainingRounds = (calculated - tick) / wheel.length; // Ensure we don't schedule for past. final long ticks = Math.max(calculated, tick); int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; // 將定時任務存儲到對應的HashedWheelBucket槽位中 bucket.addTimeout(timeout); } }
void expireTimeouts(long deadline) { // 獲取鏈表表頭任務 HashedWheelTimeout timeout = head; // process all timeouts while (timeout != null) { // 獲取表頭的下一個任務 HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { // 將要執行的任務從鏈表中刪除 next = remove(timeout); // 任務的時間小於間隔時間,執行任務 if (timeout.deadline <= deadline) { // 執行任務 timeout.expire(); } else { // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format( "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { timeout.remainingRounds--; } timeout = next; } }
public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { // **這個地方就是真正執行封裝的task任務,執行具體的任務邏輯** task.run(this); } catch (Throwable t) { if (logger.isWarnEnabled()) { logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } }