HashedWheelTimer定時任務算法解析

一、原理

HashedWheelTimer是採用一種定時輪的方式來管理和維護大量的Timer調度算法.Linux 內核中的定時器採用的就是這個方案。
一個HashedWheelTimer是環形結構,相似一個時鐘,分爲不少槽,一個槽表明一個時間間隔,每一個槽又對應一個相似Map結構的對象,使用雙向鏈表存儲定時任務,指針週期性的跳動,跳動到一個槽位,就執行該槽位的定時任務。
環形結構能夠根據超時時間的 hash 值(這個 hash 值實際上就是ticks & mask)將 task 分佈到不一樣的槽位中, 當 tick 到那個槽位時, 只須要遍歷那個槽位的 task 便可知道哪些任務會超時(而使用線性結構, 你每次 tick 都須要遍歷全部 task), 因此, 咱們任務量大的時候, 相應的增長 wheel 的 ticksPerWheel 值, 能夠減小 tick 時遍歷任務的個數.算法

二、結構圖

圖片描述

三、效率

3.1優勢

  1. 能夠添加、刪除、取消定時任務
  2. 能高效的處理大批定時任務

3.2缺點

  1. 對內存要求較高,佔用較高的內存
  2. 時間精度要求不高

四、結合源碼分析

首先來看HashedWheelTimer的構造函數,HashedWheelTimer有不少構造方法,可是最後都是調用一個:數組

public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel,
            long maxPendingTimeouts) {

        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // Normalize ticksPerWheel to power of two and initialize the wheel.
        // 構造時間輪的槽位數,槽位數只能是2的冪次方
        wheel = createWheel(ticksPerWheel);
        // 時間輪槽位數
        mask = wheel.length - 1;

        // Convert tickDuration to nanos.
        // 初始化時間週期
        this.tickDuration = unit.toNanos(tickDuration);

        // Prevent overflow.
        if (this.tickDuration >= Long.MAX_VALUE / wheel.length) {
            throw new IllegalArgumentException(String.format(
                    "tickDuration: %d (expected: 0 < tickDuration in nanos < %d",
                    tickDuration, Long.MAX_VALUE / wheel.length));
        }
        // 初始化輪詢時間輪的線程,使用這個線程週期性的輪詢時間輪
        workerThread = threadFactory.newThread(worker);

        this.maxPendingTimeouts = maxPendingTimeouts;

        if (INSTANCE_COUNTER.incrementAndGet() > INSTANCE_COUNT_LIMIT &&
                WARNED_TOO_MANY_INSTANCES.compareAndSet(false, true)) {
            reportTooManyInstances();
        }
    }

時間輪實際就是一個HashedWeelBucket數組,上面這個構造方法就是在初始化這個數組,槽位數就是數組長度,tickDuration是時間週期,workerThread線程用來輪詢數組;app

private static HashedWheelBucket[] createWheel(int ticksPerWheel) {
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException(
                    "ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }
        if (ticksPerWheel > 1073741824) {
            throw new IllegalArgumentException(
                    "ticksPerWheel may not be greater than 2^30: " + ticksPerWheel);
        }
        // HashedWheelBucket數組長度是2的冪次方,獲取<=ticksPerWheel最大的2的冪次方
        ticksPerWheel = normalizeTicksPerWheel(ticksPerWheel);
        HashedWheelBucket[] wheel = new HashedWheelBucket[ticksPerWheel];
        for (int i = 0; i < wheel.length; i++) {
            wheel[i] = new HashedWheelBucket();
        }
        return wheel;
    }

初始化的HashedWheelBucket數組的長度必須是2的冪次方。HashedWheelTimer初始化完了,記下來就是如何向時間輪裏添加定時任務,其實很簡單,只要調用newTimeOut()方法便可函數

public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        if (task == null) {
            throw new NullPointerException("task");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }

        long pendingTimeoutsCount = pendingTimeouts.incrementAndGet();

        if (maxPendingTimeouts > 0 && pendingTimeoutsCount > maxPendingTimeouts) {
            pendingTimeouts.decrementAndGet();
            throw new RejectedExecutionException("Number of pending timeouts ("
                    + pendingTimeoutsCount + ") is greater than or equal to maximum allowed pending "
                    + "timeouts (" + maxPendingTimeouts + ")");
        }

        // 開啓時間輪輪詢
        start();

        // Add the timeout to the timeout queue which will be processed on the next tick.
        // During processing all the queued HashedWheelTimeouts will be added to the correct HashedWheelBucket.
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;

        // Guard against overflow.
        if (delay > 0 && deadline < 0) {
            deadline = Long.MAX_VALUE;
        }
        // 將定時任務封裝成HashedWheelTimeout
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        // 將定時任務存儲到任務鏈表中
        timeouts.add(timeout);
        return timeout;
    }

在newTimeOut()方法中會去開啓輪詢時間輪的線程(即workerThread),接下來在看如何輪詢:oop

public void start() {
        // 判斷HashedWheelTimer狀態,若是狀態開啓,則開啓輪詢線程
        switch (WORKER_STATE_UPDATER.get(this)) {
            case WORKER_STATE_INIT:
                if (WORKER_STATE_UPDATER.compareAndSet(this, WORKER_STATE_INIT, WORKER_STATE_STARTED)) {
                    workerThread.start();
                }
                break;
            case WORKER_STATE_STARTED:
                break;
            case WORKER_STATE_SHUTDOWN:
                throw new IllegalStateException("cannot be started once stopped");
            default:
                throw new Error("Invalid WorkerState");
        }

        // Wait until the startTime is initialized by the worker.
        while (startTime == 0) {
            try {
                // 阻塞當前線程,目的是保證輪詢線程workerThread開啓
                startTimeInitialized.await();
            } catch (InterruptedException ignore) {
                // Ignore - it will be ready very soon.
            }
        }
    }

在這個方法中會去開啓workerThread線程,執行workerThread線程中run()方法源碼分析

public void run() {
            // Initialize the startTime.
            // 初始化開始時間
            startTime = System.nanoTime();
            if (startTime == 0) {
                // We use 0 as an indicator for the uninitialized value here, so make sure it's not 0 when initialized.
                startTime = 1;
            }

            // Notify the other threads waiting for the initialization at start().
            // 喚醒阻塞的線程
            startTimeInitialized.countDown();

            do {
                // 根據週期時間tickDuration,進行週期性的tick下一個槽位
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    // 獲取下一個槽位的角標
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    // 獲取該角標對應的HashedWheelBucket對象
                    HashedWheelBucket bucket =
                            wheel[idx];
                    // 將存儲在鏈表timeOuts中的定時任務存儲到對應的槽位的HashedWheelBucket對象中        
                    transferTimeoutsToBuckets();
                    // 執行槽位中定時任務
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

            // Fill the unprocessedTimeouts so we can return them from stop() method.
            for (HashedWheelBucket bucket : wheel) {
                bucket.clearTimeouts(unprocessedTimeouts);
            }
            for (; ; ) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    break;
                }
                if (!timeout.isCancelled()) {
                    unprocessedTimeouts.add(timeout);
                }
            }
            processCancelledTasks();
        }

在上面方法中,輪詢時間輪,執行對應槽位的定時任務,在執行以前,會先將存儲在鏈表中任務按照各自的時間放入對應的槽位中,接下來我們來看如何根據週期時間進行tickthis

private long waitForNextTick() {
            // 獲取下一個槽位的等待時間
            long deadline = tickDuration * (tick + 1);

            for (; ; ) {
                // 獲取當前時間間隔
                final long currentTime = System.nanoTime() - startTime;
                // 計算tick到下一個槽位須要等待的時間
                long sleepTimeMs = (deadline - currentTime + 999999) / 1000000;

                // 當前時間間隔大於等於下一個槽位週期時間,不須要等待,直接返回(從這個地方就能夠得出HashedWheelTimer對時間精度要求不高,並非嚴格按照延遲時間來執行的)
                if (sleepTimeMs <= 0) {
                    if (currentTime == Long.MIN_VALUE) {
                        return -Long.MAX_VALUE;
                    } else {
                        return currentTime;
                    }
                }
                if (isWindows()) {
                    sleepTimeMs = sleepTimeMs / 10 * 10;
                }

                try {
                    // 當前時間間隔小於下一個槽位週期時間,則進行休眠
                    Thread.sleep(sleepTimeMs);
                } catch (InterruptedException ignored) {
                    if (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_SHUTDOWN) {
                        return Long.MIN_VALUE;
                    }
                }
            }
        }

分析瞭如何實現時間間隔輪詢,接下來分析如何將任務存儲到HashedWheelBucket中spa

private void transferTimeoutsToBuckets() {
            // transfer only max. 100000 timeouts per tick to prevent a thread to stale the workerThread when it just
            // adds new timeouts in a loop.
            // 遍歷timeouts鏈表,默認遍歷鏈表100000個任務
            for (int i = 0; i < 100000; i++) {
                HashedWheelTimeout timeout = timeouts.poll();
                if (timeout == null) {
                    // all processed
                    break;
                }
                // 任務的狀態等於取消,直接跳過
                if (timeout.state() == HashedWheelTimeout.ST_CANCELLED) {
                    // Was cancelled in the meantime.
                    continue;
                }

                // 設置任務須要輪詢的圈數,如:槽位=8,週期tickDuration=100ms,任務時間=900ms,則說明須要輪詢一圈後,才能會執行到該任務,即remainingRounds= 1,槽位角標stopIndex=1
                long calculated = timeout.deadline / tickDuration;
                timeout.remainingRounds = (calculated - tick) / wheel.length;

                // Ensure we don't schedule for past.
                final long ticks = Math.max(calculated, tick);
                int stopIndex = (int) (ticks & mask);

                HashedWheelBucket bucket = wheel[stopIndex];
                // 將定時任務存儲到對應的HashedWheelBucket槽位中
                bucket.addTimeout(timeout);
            }
        }

HashedWheelBucket是一個包含雙向鏈表的對象,addTimeout將任務存儲到鏈表的末端線程

void expireTimeouts(long deadline) {
            // 獲取鏈表表頭任務
            HashedWheelTimeout timeout = head;

            // process all timeouts
            while (timeout != null) {
                // 獲取表頭的下一個任務
                HashedWheelTimeout next = timeout.next;
                if (timeout.remainingRounds <= 0) {
                    // 將要執行的任務從鏈表中刪除
                    next = remove(timeout);
                    // 任務的時間小於間隔時間,執行任務
                    if (timeout.deadline <= deadline) {
                        // 執行任務
                        timeout.expire();
                    } else {
                        // The timeout was placed into a wrong slot. This should never happen.
                        throw new IllegalStateException(String.format(
                                "timeout.deadline (%d) > deadline (%d)", timeout.deadline, deadline));
                    }
                } else if (timeout.isCancelled()) {
                    next = remove(timeout);
                } else {
                    timeout.remainingRounds--;
                }
                timeout = next;
            }
        }

上面這個方法就是遍歷槽位中鏈表中的任務進行執行指針

public void expire() {
            if (!compareAndSetState(ST_INIT, ST_EXPIRED)) {
                return;
            }

            try {
                // **這個地方就是真正執行封裝的task任務,執行具體的任務邏輯**
                task.run(this);
            } catch (Throwable t) {
                if (logger.isWarnEnabled()) {
                    logger.warn("An exception was thrown by " + TimerTask.class.getSimpleName() + '.', t);
                }
            }
        }

以上就是HashedWheelTimer執行的整個過程,在分析的過程當中最好仍是結合具體的實例去分析,這樣會更有利於本身的理解。

相關文章
相關標籤/搜索