時間輪算法(TimingWheel)是如何實現的?

前言

我在2. SOFAJRaft源碼分析—JRaft的定時任務調度器是怎麼作的?這篇文章裏已經講解過期間輪算法在JRaft中是怎麼應用的,可是我感受我並無講解清楚這個東西,致使看了這篇文章依然和沒看是同樣的,因此我打算從新說透時間輪算法。html

時間輪的應用並不是 JRaft 獨有,其應用場景還有不少,在 Netty、Akka、Quartz、ZooKeeper 、Kafka等組件中都存在時間輪的蹤跡。java

咱們下面講解的時間輪的實現以JRaft中的爲例子進行講解,由於JRaft這部分的代碼是參考Netty的,因此你們也能夠去Netty中去尋找源碼實現。git

時間輪用來解決什麼問題?

若是一個系統中存在着大量的調度任務,而大量的調度任務若是每個都使用本身的調度器來管理任務的生命週期的話,浪費cpu的資源而且很低效。github

時間輪是一種高效來利用線程資源來進行批量化調度的一種調度模型。把大批量的調度任務所有都綁定到同一個的調度器上面,使用這一個調度器來進行全部任務的管理(manager),觸發(trigger)以及運行(runnable)。可以高效的管理各類延時任務,週期任務,通知任務等等。算法

不過,時間輪調度器的時間精度可能不是很高,對於精度要求特別高的調度任務可能不太適合。由於時間輪算法的精度取決於,時間段「指針」單元的最小粒度大小,好比時間輪的格子是一秒跳一次,那麼調度精度小於一秒的任務就沒法被時間輪所調度。windows

時間輪結構

如圖,JRaft中時間輪(HashedWheelTimer)是一個存儲定時任務的環形隊列,底層採用數組實現,數組中的每一個元素能夠存放一個定時任務列表(HashedWheelBucket),HashedWheelBucket是一個環形的雙向鏈表,鏈表中的每一項表示的都是定時任務項(HashedWheelTimeout),其中封裝了真正的定時任務(TimerTask)。數組

時間輪由多個時間格組成,每一個時間格表明當前時間輪的基本時間跨度(tickDuration)。時間輪的時間格個數是固定的,可用 wheel.length 來表示。緩存

時間輪還有一個錶盤指針(tick),用來表示時間輪當前指針跳動的次數,能夠用tickDuration * (tick + 1)來表示下一次到期的任務,須要處理此時間格所對應的 HashedWheelBucket 中的全部任務。數據結構

時間輪運行邏輯

時間輪在啓動的時候會記錄一下當前啓動的時間賦值給startTime。時間輪在添加任務的時候首先會計算延遲時間(deadline),好比一個任務的延遲時間爲24ms,那麼會將當前的時間(currentTime)+24ms-時間輪啓動時的時間(startTime)。而後將任務封裝成HashedWheelTimeout加入到timeouts隊列中,做爲緩存。app

時間輪在運行的時候會將timeouts中緩存的HashedWheelTimeout任務取10萬個出來進行遍歷。
而後須要計算出幾個參數值:

  1. HashedWheelTimeout的總共延遲的次數:將每一個任務的延遲時間(deadline)/tickDuration 計算出tick須要總共跳動的次數;
  2. 計算時間輪round次數:根據計算的須要走的(總次數- 當前tick數量)/ 時間格個數(wheel.length)。好比tickDuration爲1ms,時間格個數爲20個,那麼時間輪走一圈須要20ms,那麼添加進一個延時爲24ms的數據,若是當前的tick爲0,那麼計算出的輪數爲1,指針沒運行一圈就會將round取出來減一,因此須要轉動到第二輪以後才能夠將輪數round減爲0以後纔會運行
  3. 計算出該任務須要放置到時間輪(wheel)的槽位,而後加入到槽位鏈表最後

將timeouts中的數據放置到時間輪wheel中以後,計算出當前時針走到的槽位的位置,並取出槽位中的鏈表數據,將deadline和當前的時間作對比,運行過時的數據。

源碼分析

構造器

public HashedWheelTimer(ThreadFactory threadFactory, long tickDuration, TimeUnit unit, int ticksPerWheel,
                        long maxPendingTimeouts) {

    if (threadFactory == null) {
        throw new NullPointerException("threadFactory");
    }
    //unit = MILLISECONDS
    if (unit == null) {
        throw new NullPointerException("unit");
    }
    if (tickDuration <= 0) {
        throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
    }
    if (ticksPerWheel <= 0) {
        throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
    }

    // Normalize ticksPerWheel to power of two and initialize the wheel.
    // 建立一個HashedWheelBucket數組
    // 建立時間輪基本的數據結構,一個數組。長度爲不小於ticksPerWheel的最小2的n次方
    wheel = createWheel(ticksPerWheel);
    // 這是一個標示符,用來快速計算任務應該呆的格子。
    // 咱們知道,給定一個deadline的定時任務,其應該呆的格子=deadline%wheel.length.可是%操做是個相對耗時的操做,因此使用一種變通的位運算代替:
    // 由於一圈的長度爲2的n次方,mask = 2^n-1後低位將所有是1,而後deadline&mast == deadline%wheel.length
    // java中的HashMap在進行hash以後,進行index的hash尋址尋址的算法也是和這個同樣的
    mask = wheel.length - 1;

    // Convert tickDuration to nanos.
    //tickDuration傳入是1的話,這裏會轉換成1000000
    this.tickDuration = unit.toNanos(tickDuration);

    // Prevent overflow.
    // 校驗是否存在溢出。即指針轉動的時間間隔不能太長而致使tickDuration*wheel.length>Long.MAX_VALUE
    if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
        throw new IllegalArgumentException(String.format(
            "tickDuration: %d (expected: 0 < tickDuration in nanos < %d", tickDuration, Long.MAX_VALUE
                                                                                        / wheel.length));
    }
    //將worker包裝成thread
    workerThread = threadFactory.newThread(worker);
    //maxPendingTimeouts = -1
    this.maxPendingTimeouts = maxPendingTimeouts;

    //若是HashedWheelTimer實例太多,那麼就會打印一個error日誌
    if (instanceCounter.incrementAndGet() > INSTANCE_COUNT_LIMIT
        && warnedTooManyInstances.compareAndSet(false, true)) {
        reportTooManyInstances();
    }
}

在這個構造器中有幾個細節須要注意:

  1. 調用createWheel方法建立的wheel數組必定是2次方數,好比傳入的ticksPerWheel是6,那麼初始化的wheel長度必定是8。這樣作是爲了讓mask & tick 來計算出槽位
  2. tickDuration用的是納秒
  3. 在構造裏面並不會裏面啓動時間輪,而是要等到有第一個任務加入到時間輪的時候才啓動。在構造器裏面會將工做線程worker封裝成workerThread

放入任務到時間輪中

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
    if (task == null) {
        throw new NullPointerException("task");
    }
    if (unit == null) {
        throw new NullPointerException("unit");
    }

    long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

    if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
        pendingTimeouts.decrementAndGet();
        throw new RejectedExecutionException("Number of pending timeouts (" + pendingTimeoutsCount
                                             + ") is greater than or equal to maximum allowed pending "
                                             + "timeouts (" + maxPendingTimeouts + ")");
    }
    // 若是時間輪沒有啓動,則啓動
    start();

    // Add the timeout to the timeout queue which will be processed on the next tick.
    // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
    long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

    // Guard against overflow.
    //在delay爲正數的狀況下,deadline是不可能爲負數
    //若是爲負數,那麼說明超過了long的最大值
    if (delay > 0 && deadline < 0) {
        deadline = Long.MAX_VALUE;
    }
    // 這裏定時任務不是直接加到對應的格子中,而是先加入到一個隊列裏,而後等到下一個tick的時候,
    // 會從隊列裏取出最多100000個任務加入到指定的格子中
    HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
    //Worker會去處理timeouts隊列裏面的數據
    timeouts.add(timeout);
    return timeout;
}
  1. 若是時間輪沒有啓動,那麼就調用start方法啓動時間輪,啓動時間輪以後會爲startTime設置爲當前時間
  2. 計算延遲時間deadline
  3. 將task任務封裝到HashedWheelTimeout中,而後添加到timeouts隊列中進行緩存

start

private final CountDownLatch                                     startTimeInitialized   = new CountDownLatch(1);

public void start() {
    //workerState一開始的時候是0(WORKER_STATE_INIT),而後纔會設置爲1(WORKER_STATE_STARTED)
    switch (workerStateUpdater.get(this)) {
        case WORKER_STATE_INIT:
            //使用cas來獲取啓動調度的權力,只有競爭到的線程容許來進行實例啓動
            if (workerStateUpdater.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                //若是成功設置了workerState,那麼就調用workerThread線程
                workerThread.start();
            }
            break;
        case WORKER_STATE_STARTED:
            break;
        case WORKER_STATE_SHUTDOWN:
            throw new IllegalStateException("cannot be started once stopped");
        default:
            throw new Error("Invalid WorkerState");
    }

    // 等待worker線程初始化時間輪的啓動時間
    // Wait until the startTime is initialized by the worker.
    while (startTime == 0) {
        try {
            //這裏使用countDownLauch來確保調度的線程已經被啓動
            startTimeInitialized.await();
        } catch (InterruptedException ignore) {
            // Ignore - it will be ready very soon.
        }
    }
}

start方法會根據當前的workerState狀態來啓動時間輪。而且用了startTimeInitialized來控制線程的運行,若是workerThread沒有啓動起來,那麼newTimeout方法會一直阻塞在運行start方法中。若是不阻塞,newTimeout方法會獲取不到startTime。

啓動時間輪

時間輪的啓動在HashedWheelTimer的內部類Worker中。調用workerThread#start方法會調用Worker的run方法啓動時間輪。

下面咱們看時間輪啓動作了什麼,下面的分析不考慮任務被取消的狀況。

Worker#run

public void run() {
    // Initialize the startTime.
    startTime = System.nanoTime();
    if (startTime == 0) {
        // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
        startTime = 1;
    }

    //HashedWheelTimer的start方法會繼續往下運行
    // Notify the other threads waiting for the initialization at start().
    startTimeInitialized.countDown();

    do {
        //返回的是當前的nanoTime- startTime
        //也就是返回的是 每 tick 一次的時間間隔
        final long deadline = waitForNextTick();
        if (deadline > 0) {
            //算出時間輪的槽位
            int idx = (int) (tick & mask);
            //移除cancelledTimeouts中的bucket
            // 從bucket中移除timeout
            processCancelledTasks();
            HashedWheelBucket bucket = wheel[idx];
            // 將newTimeout()方法中加入到待處理定時任務隊列中的任務加入到指定的格子中
            transferTimeoutsToBuckets();
            bucket.expireTimeouts(deadline);
            tick++;
        }
    //    校驗若是workerState是started狀態,那麼就一直循環
    } while (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

    // Fill the unprocessedTimeouts so we can return them from stop() method.
    for (HashedWheelBucket bucket : wheel) {
        bucket.clearTimeouts(unprocessedTimeouts);
    }
    for (;;) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            break;
        }
        //若是有沒有被處理的timeout,那麼加入到unprocessedTimeouts對列中
        if (!timeout.isCancelled()) {
            unprocessedTimeouts.add(timeout);
        }
    }
    //處理被取消的任務
    processCancelledTasks();
}
  1. 時間輪運行的時候首先會記錄一下啓動時間(startTime),而後調用startTimeInitialized釋放外層的等待線程;
  2. 進入dowhile循環,調用waitForNextTick睡眠等待到下一次的tick指針的跳動,並返回當前時間減去startTime做爲deadline
  3. 因爲mask= wheel.length -1 ,wheel是2的次方數,因此能夠直接用tick & mask 計算出這次在wheel中的槽位
  4. 調用processCancelledTasks將cancelledTimeouts隊列中的任務取出來,並將當前的任務從時間輪中移除
  5. 調用transferTimeoutsToBuckets方法將timeouts隊列中緩存的數據取出加入到時間輪中
  6. 運行目前指針指向的槽位中的bucket鏈表數據

時間輪指針跳動

waitForNextTick

//sleep, 直到下次tick到來, 而後返回該次tick和啓動時間之間的時長
private long waitForNextTick() {
    //tickDuration這裏是100000
    //tick表示總tick數
    long deadline = tickDuration * (tick + 1);

    for (;;) {
        final long currentTime = System.nanoTime() - startTime;
        // 計算須要sleep的時間, 之因此加999999後再除10000000,前面是1因此這裏須要減去1,
        // 才能計算準確,還有經過這裏能夠看到 其實線程是以睡眠必定的時候再來執行下一個ticket的任務的,
        //這樣若是ticket的間隔設置的過小的話,系統會頻繁的睡眠而後啓動,
        //其實感受影響部分的性能,因此爲了更好的利用系統資源步長能夠稍微設置大點
        long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;
        //sleepTimeMs小於零表示走到了下一個時間輪位置
        if (sleepTimeMs <= 0) {
            if (currentTime == Long.MIN_VALUE) {
                return -Long.MAX_VALUE;
            } else {
                return currentTime;
            }
        }

        // Check if we run on windows, as if thats the case we will need
        // to round the sleepTime as workaround for a bug that only affect
        // the JVM if it runs on windows.
        //
        // See https://github.com/netty/netty/issues/356
        if (Platform.isWindows()) {
            sleepTimeMs = sleepTimeMs / 10 * 10;
        }

        try {
            Thread.sleep(sleepTimeMs);
        } catch (InterruptedException ignored) {
            if (workerStateUpdater.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                return Long.MIN_VALUE;
            }
        }
    }
}

能夠想象一下在時鐘的秒鐘上面秒與秒之間的時間是須要等待的,那麼waitForNextTick這個方法就是根據當前的時間計算出跳動到下個時間的間隔時間,並進行sleep操做,而後返回當前時間距離時間輪啓動時間的時間段。

轉移任務到時間輪中

在調用時間輪的方法加入任務的時候並無直接加入到時間輪中,而是緩存到了timeouts隊列中,因此在運行的時候須要將timeouts隊列中的任務轉移到時間輪數據的鏈表中

transferTimeoutsToBuckets

private void transferTimeoutsToBuckets() {
    // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
    // adds new timeouts in a loop.
    // 每次tick只處理10w個任務,以避免阻塞worker線程
    for (int i = 0; i < 100000; i++) {
        HashedWheelTimeout timeout = timeouts.poll();
        if (timeout == null) {
            // all processed
            break;
        }
        //已經被取消了;
        if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
            // Was cancelled in the meantime.
            continue;
        }
        //calculated = tick 次數
        long calculated = timeout.deadline / tickDuration;
        // 計算剩餘的輪數, 只有 timer 走夠輪數, 而且到達了 task 所在的 slot, task 纔會過時
        timeout.remainingRounds = (calculated - tick) / wheel.length;
        //若是任務在timeouts隊列裏面放久了, 以致於已通過了執行時間, 這個時候就使用當前tick, 也就是放到當前bucket, 此方法調用完後就會被執行
        final long ticks = Math.max(calculated, tick); // Ensure we don't schedule for past.
        //// 算出任務應該插入的 wheel 的 slot, slotIndex = tick 次數 & mask, mask = wheel.length - 1
        int stopIndex = (int) (ticks & mask);

        HashedWheelBucket bucket = wheel[stopIndex];
        //將timeout加入到bucket鏈表中
        bucket.addTimeout(timeout);
    }
}

在這個轉移方法中,寫死了一個循環,每次都只轉移10萬個任務。

而後根據HashedWheelTimeout的deadline延遲時間計算出時間輪須要運行多少次才能運行當前的任務,若是當前的任務延遲時間大於時間輪跑一圈所須要的時間,那麼就計算須要跑幾圈才能到這個任務運行。

最後計算出該任務在時間輪中的槽位,添加到時間輪的鏈表中。

運行時間輪中的任務

當指針跳到時間輪的槽位的時間,會將槽位的HashedWheelBucket取出來,而後遍歷鏈表,運行其中到期的任務。

expireTimeouts

// 過時並執行格子中的到期任務,tick到該格子的時候,worker線程會調用這個方法
//根據deadline和remainingRounds判斷任務是否過時
public void expireTimeouts(long deadline) {
    HashedWheelTimeout timeout = head;

    // process all timeouts
    //遍歷格子中的全部定時任務
    while (timeout != null) {
        // 先保存next,由於移除後next將被設置爲null
        HashedWheelTimeout next = timeout.next;
        if (timeout.remainingRounds <= 0) {
            //從bucket鏈表中移除當前timeout,並返回鏈表中下一個timeout
            next = remove(timeout);
            //若是timeout的時間小於當前的時間,那麼就調用expire執行task
            if (timeout.deadline <= deadline) {
                timeout.expire();
            } else {
                //不可能發生的狀況,就是說round已經爲0了,deadline卻>當前槽的deadline
                // The timeout was placed into a wrong slot. This should never happen.
                throw new IllegalStateException(String.format("timeout.deadline (%d) > deadline (%d)",
                        timeout.deadline, deadline));
            }
        } else if (timeout.isCancelled()) {
            next = remove(timeout);
        } else {
            //由於當前的槽位已通過了,說明已經走了一圈了,把輪數減一
            timeout.remainingRounds--;
        }
        //把指針放置到下一個timeout
        timeout = next;
    }
}

HashedWheelBucket是一個鏈表,因此咱們須要從head節點往下進行遍歷。若是鏈表沒有遍歷到鏈表尾部那麼就繼續往下遍歷。

獲取的timeout節點節點,若是剩餘輪數remainingRounds大於0,那麼就說明要到下一圈才能運行,因此將剩餘輪數減一;

若是當前剩餘輪數小於等於零了,那麼就將當前節點從bucket鏈表中移除,並判斷一下當前的時間是否大於timeout的延遲時間,若是是則調用timeout的expire執行任務。

相關文章
相關標籤/搜索