大批量定時任務管理利器HashedWheelTimer

和同事討論一個定時審覈的需求,運營設定審覈經過的時間,到了這個時間以後,相關內容自動審覈經過,本是個小的需求,可是考慮到若是須要定時審覈的東西不少,這樣大量的定時任務帶來的一系列問題,而後逐步的討論到了netty的HashedWheelTimer這個的實現。java

方案一 單定時器方案

描述:

把全部須要定時審覈的資源放到redis中,例如sorted set中,須要審覈經過的時間做爲score值。後臺啓動一個定時器,定時輪詢sortedSet,當score值小於當前時間,則運行任務審覈經過。redis

問題

這個方案在小批量數據的狀況下沒有問題,可是在大批量任務的狀況下就會出現問題了,由於每次都要輪詢全量的數據,逐個判斷是否須要執行,一旦輪詢任務執行比較長,就會出現任務沒法按照定時的時間執行的問題。算法

方案二 多定時器方案

描述

每一個須要定時完成的任務都啓動一個定時任務,而後等待完成以後銷燬數據庫

問題

這個方案帶來的問題很明顯,定時任務比較多的狀況下,會啓動不少的線程,這樣服務器會承受不了以後崩潰。基本上不會採起這個方案。數組

方案三 借用redis的過時通知功能

描述

和方案一相似,針對每個須要定時審覈的任務,設定過時時間,過時時間也就是審覈經過的時間,訂閱redis的過時事件,當這個事件發生時,執行相應的審覈經過任務。服務器

問題

這個方案來講是借用了redis這種中間件來實現咱們的功能,這中實際上屬於redis的發佈訂閱功能中的一部分,針對redis發佈訂閱功能是不推薦咱們在生產環境中作業務操做的,一般redis內部(例如redis集羣節點上下線,選舉等等來使用),咱們業務系統使用它的這個事件會產生以下兩個問題 一、redis發佈訂閱的不穩定問題 二、redid發佈訂閱的可靠性問題 具體能夠參考 http://www.javashuo.com/article/p-drsjnhbe-kc.html (redis的發佈訂閱缺陷)jvm

方案四 Hash分層記時輪算法

也許你和我同樣都是第一次據說這個東西,這個東西就是專爲大批量定時任務管理而生。具體論文詳見參考文獻[2]ide

算法概要

簡要的說這個是一個輪,裏面有指針,指針會根據設定的時間單位旋轉,任務根據一些算法會落在相應的槽位上。以下圖 函數

首先會有一個輪,這個輪在這裏分紅了8個槽位,任務任務添加的時候會根據相應的算法對槽位個數取模,獲得任務會存儲在具體哪一個槽位,每一個槽位是一個鏈表結構,任務存儲了任務的過時時間(任務執行時間),任務須要執行須要指針轉的輪數,指針(tick) 每間隔一個單位的時間會往下走一個槽位,而後會查詢這個槽位上的存儲的任務,而且任務的存儲的剩餘輪數會減一當剩餘輪數小於等於零時,就會開始執行這個任務,執行以後會把任務從這個槽位上給刪除掉。學習

例如上圖: 槽位爲8個槽位 Bucket 指針每一個時間間隔(100ms)會往下走一個槽位,這個時間間隔叫作tickDuration 那至關於每隔8*100ms=800ms,會輪詢一圈。

HashedWheelTimer

算法理解起來比較簡單,而且也有成熟的實現,那就是在netty中有一個HashedWheelTimer這個類,把這個算法實現了出來。接下來分析分析一下它的這個代碼。

初始化

在這個類上定義的有幾個比較重要的屬性

/**
     *這個work是一個內部類,實現了Runable接口,是比較核心的一個類,包裝了具體任務的運行,把任務放到具體如何放到某個槽位上,指針往下走的具體方法,任務取消等。 
     */
    private final Worker worker = new Worker();
    /**
     *工做線程,這個就是整個HashedWheelTimer啓動的起點 
     */
    private final Thread workerThread;

    /**
     *當前任務的狀態,1表明任務已經開始執行,0任務初始化,2任務已關閉 
     */
    public static final int WORKER_STATE_INIT = 0;
    public static final int WORKER_STATE_STARTED = 1;
    public static final int WORKER_STATE_SHUTDOWN = 2;

    /**
     *這個很核心的一個概念,就是指針往下走的單位,在HashedWheelTimer這個類中,默認是100ms指針往下走一個單位 
     */
    private final long tickDuration;
    /**
     * 這個就是指的時間輪,有多少個槽位,wheel的大小就是多大,HashedWheelTimer中默認槽位有512個
     */
    private final HashedWheelBucket[] wheel;
    /**
     * 主要輔助計算任務會存儲在哪一個槽位上,mask =wheel.length-1 
     */
    private final int mask;

    /**
     *全部要執行的任務的任務隊列 
     */
    private final Queue<HashedWheelTimeout> timeouts = PlatformDependent.newMpscQueue();
    /**
     *全部要取消的任務的任務隊列 
     */
    private final Queue<HashedWheelTimeout> cancelledTimeouts = PlatformDependent.newMpscQueue();
    /**
     *HashedWheelTimer實例開始運行的時間,是納秒數,開始時間是System.nanotime() 
     */
    private volatile long startTime;

這些屬性的定義和概念映射到上面時間輪算法上就是下圖的樣子了。 HashedWheelTimer

HashedWheelTimer初始化主要是在它的構造函數中,提供了多種重載方式,只須要看最全的構造函數便可。

/**
     * Creates a new timer.
     * @param threadFactory        執行任務的工廠
     * @param tickDuration         指針往下走一步的時間間隔
     * @param unit                 指針往下走一步的時間單位,秒,毫秒。納秒等
     * @param ticksPerWheel        時間輪的大小,也就是槽位的個數
     */
    public HashedWheelTimer(
            ThreadFactory threadFactory,
            long tickDuration, TimeUnit unit, int ticksPerWheel, boolean leakDetection,
            long maxPendingTimeouts) {

        /**
         * 先校驗參數的合法性,對threadFactory,時間單位,時間間隔,時間輪大小作了限制
         */
        if (threadFactory == null) {
            throw new NullPointerException("threadFactory");
        }
        if (unit == null) {
            throw new NullPointerException("unit");
        }
        if (tickDuration <= 0) {
            throw new IllegalArgumentException("tickDuration must be greater than 0: " + tickDuration);
        }
        if (ticksPerWheel <= 0) {
            throw new IllegalArgumentException("ticksPerWheel must be greater than 0: " + ticksPerWheel);
        }

        // 建立槽位,實際上就是初始化HashedWheelBucket數組,直接new出來的
        wheel = createWheel(ticksPerWheel);
        //用來計算槽位的輔助變量,一下子會在Worker中尋找槽位時使用到
        mask = wheel.length - 1;
        ...
        //初始化線程,是用threadFactory建立出來的一個worker線程
        workerThread = threadFactory.newThread(worker);

      ...

    }

任務添加和運行

當須要添加一個定時任務的時候,是經過newTimeout方法添加的,添加的任務必須實現TimerTask接口的run方法。任務添加以後,無需顯式的開啓任務,添加以後任務會自動開啓,等到了執行的時間會被自動執行。客戶端使用的方式以下:

@Test
    public void testRun() throws Exception{
        final CountDownLatch latch = new CountDownLatch(1);

        HashedWheelTimer hashedWheelTimer = new HashedWheelTimer();
        hashedWheelTimer.newTimeout(timeout -> {
            System.out.println("hello world");
            latch.countDown();
        }, 5, TimeUnit.SECONDS);

        latch.await();
        System.out.println("執行結束");

    }

5秒鐘以後會被輸出"hello world",而後任務執行完畢。既然任務的添加和執行入口都是經過newTimeout這個方法搞定的,那就看一下這個方法裏面有哪些祕密吧。

@Override
    public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) {
        ...
        start();
        ...
        /**
         * 能夠看到任務存活的時間計算,當前時間的毫秒數加上咱們設定的時間,而後減去程序開始執行的時間。這是一個時間段
         */
        long deadline = System.nanoTime() + unit.toNanos(delay) - startTime;
        HashedWheelTimeout timeout = new HashedWheelTimeout(this, task, deadline);
        timeouts.add(timeout);
        return timeout;
    }

進去看了以後,這個方法很簡單,有兩個關鍵的方法調用 一、start(),這個方法主要是看當前HashedWheelTimer的狀態是否已經啓動,若是沒有啓動則會調用workThread線程的啓動方法。二、計算超時時間和任務添加。咱們傳進來的任務會被包裝成一個HashWheelTimeout這個類,包裝以後會把這個包裝類放到timeouts這個阻塞隊列中去,實際上這時候任務並無放到某個具體槽位中,只是先放到阻塞隊列中,等待work從這個隊列中取值而後放到具體的槽位上,HashedWheelTimer是一個雙向鏈表,上面圖中已經有這個類的類圖結構,再貼一次: HashedWheelTimeout

咱們傳進來的任務就是它的task屬性,而後會根據當前時間、過時時間和任務開始時間計算出它的deadline,同事計算出它剩餘的輪數(remainingRounds)。
任務執行其實是調用的它的expire方法。當expire的時候會調用具體的業務任務的run方法。

HashedWheelTimer的expire方法是何時被執行的呢。上面也也說到在HashedWheelTimer中有一個workThread,這裏面會運行work。能讀到這個地方來的人應該不多了吧,不過能到這個地方你是幸運的,由於work這個類也就是實現這個算法中最核心的一個類了,先來概覽一下這個類

這個類實現了Runable接口,也就說是一個線程類,而後它會被workTread調用執行啓動。

  • transferTimeoutsToBuckets 把新加入的定時任務從阻塞隊列中取出而後放到相應的bucket中
  • processCancelledTasks 把取消的定時任務從阻塞隊列中取出,而後從相應的bucket中remove掉
  • waitForNextTick 指針往下走的方法,通過一個時間單位,指針會往下走,指向下一個bucket

run方法會一直循環從阻塞隊列中取值,而後放到bucket中,指針循環往下走,對remainderRounds對於0的任務進行執行,不是0的減一

do {
                /**
                 * 裏面是一個Thread.sleep操做,模擬指針一步一步往下走的操做。
                 */
                final long deadline = waitForNextTick();
                if (deadline > 0) {
                    /**
                     * 計算任務將要落到槽位,這本應該是個取模運算,不過這裏用了一個小技巧,就是把取模運算換爲了「按位與」,由於「按位與」要比取模運算快的多,
                     * 這個技巧就是當mast的值爲2的n次方-1時,能達到取模的效果。這裏要感謝一下王洪濤的分享
                     */
                    int idx = (int) (tick & mask);
                    processCancelledTasks();
                    //取到具體bucket,而後把任務放從阻塞隊列中拿到,放到bucket中
                    HashedWheelBucket bucket =
                            wheel[idx];

                    transferTimeoutsToBuckets();
                    //這裏面會調用全部HashedWheelTimeout的方法,就是看他的剩餘的輪數是否是大於0,若是是的話則會被執行,不是的話剩餘輪數減1
                    bucket.expireTimeouts(deadline);
                    tick++;
                }
            } while (WORKER_STATE_UPDATER.get(HashedWheelTimer.this) == WORKER_STATE_STARTED);

小結

到此源碼部分的分析基本上也就完畢了。固然還有一些取消任務的操做沒有分析,這些無外乎是一些反向操做。再拿來源碼看一眼便可。這個裏面涉及到的東西比較多,有不少的java的高階的用法,其實是能夠嘗試借鑑的,例如自定義的阻塞隊列,這個隊列的特性是面向多個生產者單個消費者。還有被volatile修飾的變量,threadFactory的使用等等。經過學習源碼,可以理清思路,增加見識。

後繼

固然HashedWheelTimer這個類屬於全內存任務計算,一般在咱們真正的業務中,是不會把這些任務直接放到jvm內存中的,要否則重啓以後任務不都會消失了麼,這樣咱們須要重寫HashedWheelTimer,只須要對它任務的添加和獲取進行重寫到相應的持久化中間件中便可(例如數據庫或者es等等)

參考和引用

[1][redis的發佈訂閱缺陷]

[[2]][Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil] [Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil]: http://www.cs.columbia.edu/~nahum/w6998/papers/sosp87-timing-wheels.pdf "Hashed and Hierarchical Timing Wheels: Data Structures for the Efficient Implementation of a Timer Facil"

[[3]][Hashed and Hierarchical Timing Wheels] [Hashed and Hierarchical Timing Wheels]: http://www.cse.wustl.edu/~cdgill/courses/cs6874/TimingWheels.ppt "Hashed and Hierarchical Timing Wheels"

相關文章
相關標籤/搜索