看完這個實現以後,感受仍是要多看源碼,多研究。其實JRaft的定時任務調度器是基於Netty的時間輪來作的,若是沒有看過Netty的源碼,極可能並不知道時間輪算法,也就很難想到要去使用這麼優秀的定時調度算法了。java
對於介紹RepeatedTimer,我拿Node初始化的時候的electionTimer進行講解算法
this.electionTimer = new RepeatedTimer("JRaft-ElectionTimer", this.options.getElectionTimeoutMs()) { @Override protected void onTrigger() { handleElectionTimeout(); } @Override protected int adjustTimeout(final int timeoutMs) { //在必定範圍內返回一個隨機的時間戳 //爲了不同時發起選舉而致使失敗 return randomTimeout(timeoutMs); } };
由electionTimer的構造方法能夠看出RepeatedTimer須要傳入兩個參數,一個是name,另外一個是time數組
//timer是HashedWheelTimer private final Timer timer; //實例是HashedWheelTimeout private Timeout timeout; public RepeatedTimer(String name, int timeoutMs) { //name表明RepeatedTimer實例的種類,timeoutMs是超時時間 this(name, timeoutMs, new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048)); } public RepeatedTimer(String name, int timeoutMs, Timer timer) { super(); this.name = name; this.timeoutMs = timeoutMs; this.stopped = true; this.timer = Requires.requireNonNull(timer, "timer"); }
在構造器中會根據傳進來的值初始化一個name和一個timeoutMs,而後實例化一個timer,RepeatedTimer的run方法是由timer進行回調。在RepeatedTimer中會持有兩個對象,一個是timer,一個是timeout數據結構
對於一個RepeatedTimer實例,咱們能夠經過start方法來啓動它:app
public void start() { //加鎖,只能一個線程調用這個方法 this.lock.lock(); try { //destroyed默認是false if (this.destroyed) { return; } //stopped在構造器中初始化爲ture if (!this.stopped) { return; } //啓動完一次後下次就沒法再次往下繼續 this.stopped = false; //running默認爲false if (this.running) { return; } this.running = true; schedule(); } finally { this.lock.unlock(); } }
在調用start方法進行啓動後會進行一系列的校驗和賦值,從上面的賦值以及加鎖的狀況來看,這個是隻能被調用一次的。而後會調用到schedule方法中dom
private void schedule() { if(this.timeout != null) { this.timeout.cancel(); } final TimerTask timerTask = timeout -> { try { RepeatedTimer.this.run(); } catch (final Throwable t) { LOG.error("Run timer task failed, taskName={}.", RepeatedTimer.this.name, t); } }; this.timeout = this.timer.newTimeout(timerTask, adjustTimeout(this.timeoutMs), TimeUnit.MILLISECONDS); }
若是timeout不爲空,那麼會調用HashedWheelTimeout的cancel方法。而後封裝一個TimerTask實例,當執行TimerTask的run方法的時候會調用RepeatedTimer實例的run方法。而後傳入到timer中,TimerTask的run方法由timer進行調用,並將返回值賦值給timeout。ide
若是timer調用了TimerTask的run方法,那麼便會回調到RepeatedTimer的run方法中:
RepeatedTimer#runoop
public void run() { //加鎖 this.lock.lock(); try { //表示RepeatedTimer已經被調用過 this.invoking = true; } finally { this.lock.unlock(); } try { //而後會調用RepeatedTimer實例實現的方法 onTrigger(); } catch (final Throwable t) { LOG.error("Run timer failed.", t); } boolean invokeDestroyed = false; this.lock.lock(); try { this.invoking = false; //若是調用了stop方法,那麼將不會繼續調用schedule方法 if (this.stopped) { this.running = false; invokeDestroyed = this.destroyed; } else { this.timeout = null; schedule(); } } finally { this.lock.unlock(); } if (invokeDestroyed) { onDestroy(); } } protected void onDestroy() { // NO-OP }
這個run方法會由timer進行回調,若是沒有調用stop或destroy方法的話,那麼調用完onTrigger方法後會繼續調用schedule,而後一次次循環調用RepeatedTimer的run方法。ui
若是調用了destroy方法,在這裏會有一個onDestroy的方法,能夠由實現類override複寫執行一個鉤子。this
HashedWheelTimer經過必定的hash規則將不一樣timeout的定時任務劃分到HashedWheelBucket進行管理,而HashedWheelBucket利用雙向鏈表結構維護了某一時刻須要執行的定時任務列表
時間輪,是一個HashedWheelBucket數組,數組數量越多,定時任務管理的時間精度越精確。tick每走一格都會將對應的wheel數組裏面的bucket拿出來進行調度。
Worker繼承自Runnable,HashedWheelTimer必須經過Worker線程操做HashedWheelTimer中的定時任務。Worker是整個HashedWheelTimer的執行流程管理者,控制了定時任務分配、全局deadline時間計算、管理未執行的定時任務、時鐘計算、未執行定時任務回收處理。
是HashedWheelTimer的執行單位,維護了其所屬的HashedWheelTimer和HashedWheelBucket的引用、須要執行的任務邏輯、當前輪次以及當前任務的超時時間(不變)等,能夠認爲是自定義任務的一層Wrapper。
HashedWheelBucket維護了hash到其內的全部HashedWheelTimeout結構,是一個雙向隊列。
在初始化RepeatedTimer實例的時候會實例化一個HashedWheelTimer:
new HashedWheelTimer(new NamedThreadFactory(name, true), 1, TimeUnit.MILLISECONDS, 2048)
而後調用HashedWheelTimer的構造器:
private final HashedWheelBucket[] wheel; private final int mask; private final long tickDuration; private final Worker worker = new Worker(); private final Thread workerThread; private final long maxPendingTimeouts; private static final int INSTANCE_COUNT_LIMIT = 256; private static final AtomicInteger instanceCounter = new AtomicInteger(); private static final AtomicBoolean warnedTooManyInstances = new AtomicBoolean(); public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel) { tickDuration this(threadFactory, tickDuration, unit, ticksPerWheel, -1); } public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel, long maxPendingTimeouts) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } //unit = MILLISECONDS if (unit == null) { throw new NullPointerException("unit"); } if (tickDuration <= 0) { throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel); } // Normalize ticksPerWheel to power of two and initialize the wheel. // 建立一個HashedWheelBucket數組 // 建立時間輪基本的數據結構,一個數組。長度爲不小於ticksPerWheel的最小2的n次方 wheel = createWheel(ticksPerWheel); // 這是一個標示符,用來快速計算任務應該呆的格子。 // 咱們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.可是%操做是個相對耗時的操做,因此使用一種變通的位運算代替: // 由於一圈的長度爲2的n次方,mask = 2^n-1後低位將所有是1,而後deadline&mast == deadline%wheel.length // java中的HashMap在進行hash以後,進行index的hash尋址尋址的算法也是和這個同樣的 mask = wheel.length - 1; // Convert tickDuration to nanos. //tickDuration傳入是1的話,這裏會轉換成1000000 this.tickDuration = unit.toNanos(tickDuration); // Prevent overflow. // 校驗是否存在溢出。即指針轉動的時間間隔不能太長而致使tickDuration*wheel.length>Long.MAX_VALUE if (this.tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException(String.format( "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE / wheel.length)); } //將worker包裝成thread workerThread = threadFactory.newThread(worker); //maxPendingTimeouts = -1 this.maxPendingTimeouts = maxPendingTimeouts; //若是HashedWheelTimer實例太多,那麼就會打印一個error日誌 if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT && warnedTooManyInstances.compareAndSet(false, true)) { reportTooManyInstances(); } }
這個構造器裏面主要作一些初始化的工做。
時間輪算法中並不須要手動的去調用start方法來啓動,而是在添加節點的時候會啓動時間輪。
咱們在RepeatedTimer的schedule方法裏會調用newTimeout向時間輪中添加一個任務。
HashedWheelTimer#newTimeout
public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { if (task == null) { throw new NullPointerException("task"); } if (unit == null) { throw new NullPointerException("unit"); } long pendingTimeoutsCount = pendingTimeouts.incrementAndGet(); if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) { pendingTimeouts.decrementAndGet(); throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending " + "timeouts (" + maxPendingTimeouts + ")"); } // 若是時間輪沒有啓動,則啓動 start(); // Add the timeout to the timeout queue which will be processed on the next tick. // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket. long deadline = System.nanoTime() + unit.toNanos(delay) - startTime; // Guard against overflow. //在delay爲正數的狀況下,deadline是不可能爲負數 //若是爲負數,那麼說明超過了long的最大值 if (delay > 0 && deadline < 0) { deadline = Long.MAX_VALUE; } // 這裏定時任務不是直接加到對應的格子中,而是先加入到一個隊列裏,而後等到下一個tick的時候, // 會從隊列裏取出最多100000個任務加入到指定的格子中 HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline); //Worker會去處理timeouts隊列裏面的數據 timeouts.add(timeout); return timeout; }
在這個方法中,在校驗以後會調用start方法啓動時間輪,而後設置deadline,這個時間等於時間輪啓動的時間點+延遲的的時間;
而後新建一個HashedWheelTimeout實例,會直接加入到timeouts隊列中去,timeouts對列會在worker的run方法裏面取出來放入到wheel中進行處理。
而後咱們來看看start方法:
HashedWheelTimer#start
private static final AtomicIntegerFieldUpdater<HashedWheelTimer> workerStateUpdater = AtomicIntegerFieldUpdater.newUpdater(HashedWheelTimer.class,"workerState"); private volatile int workerState; //不須要你主動調用,當有任務添加進來的的時候他就會跑 public void start() { //workerState一開始的時候是0(WORKER_STATE_INIT),而後纔會設置爲1(WORKER_STATE_STARTED) switch (workerStateUpdater.get(this)) { case WORKER_STATE_INIT: //使用cas來獲取啓動調度的權力,只有競爭到的線程容許來進行實例啓動 if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) { //若是成功設置了workerState,那麼就調用workerThread線程 workerThread.start(); } break; case WORKER_STATE_STARTED: break; case WORKER_STATE_SHUTDOWN: throw new IllegalStateException("cannot be started once stopped"); default: throw new Error("Invalid WorkerState"); } // 等待worker線程初始化時間輪的啓動時間 // Wait until the startTime is initialized by the worker. while (startTime == 0) { try { //這裏使用countDownLauch來確保調度的線程已經被啓動 startTimeInitialized.await(); } catch (InterruptedException ignore) { // Ignore - it will be ready very soon. } } }
由這裏咱們能夠看出,啓動時間輪是不須要手動去調用的,而是在有任務的時候會自動運行,防止在沒有任務的時候空轉浪費資源。
在start方法裏面會使用AtomicIntegerFieldUpdater的方式來更新workerState這個變量,若是沒有啓動過那麼直接在cas成功以後調用start方法啓動workerThread線程。
若是workerThread還沒運行,那麼會在while循環中等待,直到workerThread運行爲止纔會往下運行。
時間輪的運轉是在Worker的run方法中進行的:
Worker#run
private final Set<Timeout> unprocessedTimeouts = new HashSet<>(); private long tick; public void run() { // Initialize the startTime. startTime = System.nanoTime(); if (startTime == 0) { // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized. startTime = 1; } //HashedWheelTimer的start方法會繼續往下運行 // Notify the other threads waiting for the initialization at start(). startTimeInitialized.countDown(); do { //返回的是當前的nanoTime- startTime //也就是返回的是 每 tick 一次的時間間隔 final long deadline = waitForNextTick(); if (deadline > 0) { //算出時間輪的槽位 int idx = (int) (tick & mask); //移除cancelledTimeouts中的bucket // 從bucket中移除timeout processCancelledTasks(); HashedWheelBucket bucket = wheel[idx]; // 將newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中 transferTimeoutsToBuckets(); bucket.expireTimeouts(deadline); tick++; } // 校驗若是workerState是started狀態,那麼就一直循環 } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED); // Fill the unprocessedTimeouts so we can return them from stop() method. for (HashedWheelBucket bucket : wheel) { bucket.clearTimeouts(unprocessedTimeouts); } for (;;) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { break; } //若是有沒有被處理的timeout,那麼加入到unprocessedTimeouts對列中 if (!timeout.isCancelled()) { unprocessedTimeouts.add(timeout); } } //處理被取消的任務 processCancelledTasks(); }
上面全部的過時但未被處理的bucket會在調用stop方法的時候返回unprocessedTimeouts隊列中的數據。因此unprocessedTimeouts中的數據只是作一個記錄,並不會再次被執行。
時間輪的全部處理過程都在do-while循環中被處理,咱們下面一個個分析
Worker#processCancelledTasks
private void processCancelledTasks() { for (;;) { HashedWheelTimeout timeout = cancelledTimeouts.poll(); if (timeout == null) { // all processed break; } try { timeout.remove(); } catch (Throwable t) { if (LOG.isWarnEnabled()) { LOG.warn("An exception was thrown while process a cancellation task", t); } } } }
這個方法至關的簡單,由於在調用HashedWheelTimer的stop方法的時候會將要取消的HashedWheelTimeout實例放入到cancelledTimeouts隊列中,因此這裏只須要循環把隊列中的數據取出來,而後調用HashedWheelTimeout的remove方法將本身在bucket移除就行了
HashedWheelTimeout#remove
void remove() { HashedWheelBucket bucket = this.bucket; if (bucket != null) { //這裏面涉及到鏈表的引用摘除,十分清晰易懂,想了解的能夠去看看 bucket.remove(this); } else { timer.pendingTimeouts.decrementAndGet(); } }
Worker#transferTimeoutsToBuckets
private void transferTimeoutsToBuckets() { // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just // adds new timeouts in a loop. // 每次tick只處理10w個任務,以避免阻塞worker線程 for (int i = 0; i < 100000; i++) { HashedWheelTimeout timeout = timeouts.poll(); if (timeout == null) { // all processed break; } //已經被取消了; if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) { // Was cancelled in the meantime. continue; } //calculated = tick 次數 long calculated = timeout.deadline / tickDuration; // 計算剩餘的輪數, 只有 timer 走夠輪數, 而且到達了 task 所在的 slot, task 纔會過時 timeout.remainingRounds = (calculated - tick) / wheel.length; //若是任務在timeouts隊列裏面放久了, 以致於已通過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法調用完後就會被執行 final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past. //// 算出任務應該插入的 wheel 的 slot, slotIndex = tick 次數 & mask, mask = wheel.length - 1 int stopIndex = (int) (ticks & mask); HashedWheelBucket bucket = wheel[stopIndex]; //將timeout加入到bucket鏈表中 bucket.addTimeout(timeout); } }
|_____________________|____________ worker啓動時間 timeout任務加入時間
在worker的run方法的do-while循環中,在根據當前的tick拿到wheel中的bucket後會調用expireTimeouts方法來處理這個bucket的到期任務
HashedWheelBucket#expireTimeouts
// 過時並執行格子中的到期任務,tick到該格子的時候,worker線程會調用這個方法, //根據deadline和remainingRounds判斷任務是否過時 public void expireTimeouts(long deadline) { HashedWheelTimeout timeout = head; // process all timeouts //遍歷格子中的全部定時任務 while (timeout != null) { // 先保存next,由於移除後next將被設置爲null HashedWheelTimeout next = timeout.next; if (timeout.remainingRounds <= 0) { //從bucket鏈表中移除當前timeout,並返回鏈表中下一個timeout next = remove(timeout); //若是timeout的時間小於當前的時間,那麼就調用expire執行task if (timeout.deadline <= deadline) { timeout.expire(); } else { //不可能發生的狀況,就是說round已經爲0了,deadline卻>當前槽的deadline // The timeout was placed into a wrong slot. This should never happen. throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline)); } } else if (timeout.isCancelled()) { next = remove(timeout); } else { //由於當前的槽位已通過了,說明已經走了一圈了,把輪數減一 timeout.remainingRounds--; } //把指針放置到下一個timeout timeout = next; } }
expireTimeouts方法會根據當前tick到的槽位,而後獲取槽位中的bucket並找到鏈表中到期的timeout並執行
HashedWheelTimeout#task
public void expire() { if (!compareAndSetState(ST_INIT, ST_EXPIRED)) { return; } try { task.run(this); } catch (Throwable t) { if (LOG.isWarnEnabled()) { LOG.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t); } } }
這裏這個task就是在schedule方法中構建的timerTask實例,調用timerTask的run方法會調用到外層的RepeatedTimer的run方法,從而調用到RepeatedTimer子類實現的onTrigger方法。
到這裏Jraft的定時調度就講完了,感受仍是頗有意思的。