手把手實現一條延時消息

前言

近期在維護公司的調度平臺,其中有個關鍵功能那就是定時任務;定時任務你們平時確定接觸的很多,好比 JDK 中的 TimerScheduledExecutorService、調度框架 Quartz 等。java

一般用於實現 XX 時間後的延時任務,或週期性任務;git

好比一個常見的業務場景:用戶下單 N 分鐘未能支付便自動取消訂單。github

實現這類需求一般有兩種方式:數據庫

  • 輪詢定時任務:給定週期內掃描全部未支付的訂單,查看時間是否到期。
  • 延時消息:訂單建立的時候發送一條 N 分鐘到期的信息,一旦消息消費後即可判斷訂單是否能夠取消。

先看第一種,這類方式實現較爲簡單,只須要啓動一個定時任務便可;但缺點一樣也很明顯,這個間隔掃描的時間很差控制。segmentfault

給短了會形成不少無心義的掃描,增大數據庫壓力,給長了又會使得偏差較大。api

固然最大的問題仍是效率較低,隨着訂單增多耗時會呈線性增加,最差的狀況甚至會出現上一波輪詢尚未掃描完,下一波調度又來了。數組


這時第二種方案就要顯得靠譜多了,經過延時消息能夠去掉沒必要要的訂單掃描,實時性也比較高。數據結構

延時消息

這裏咱們不過多討論這類需求如何實現;重點聊聊這個延時消息,看它是如何實現的,基於實現延時消息的數據結構還能實現定時任務。框架

我在以前的開源 IM 項目中也加入了此類功能,能夠很直觀的發送一條延時消息,效果以下:函數

使用 :delay hahah 2 發送了一條兩秒鐘的延時消息,另一個客戶端將會在兩秒鐘以後收到該消息。

具體的實現步驟會在後文繼續分析。

時間輪

要實現延時消息就不得不提到一種數據結構【時間輪】,時間輪聽這名字能夠很直觀的抽象出它的數據結構。

其實本質上它就是一個環形的數組,如圖所示,假設咱們建立了一個長度爲 8 的時間輪。


task0 = 當咱們須要新建一個 5s 延時消息,則只須要將它放到下標爲 5 的那個槽中。

task1 = 而若是是一個 10s 的延時消息,則須要將它放到下標爲 2 的槽中,但同時須要記錄它所對應的圈數,否則就和 2 秒的延時消息重複了。

task2= 當建立一個 21s 的延時消息時,它所在的位置就和 task0 相同了,都在下標爲 5 的槽中,因此爲了區別須要爲他加上圈數爲 2。

經過這張圖能夠更直觀的理解。

當咱們須要取出延時消息時,只須要每秒往下移動這個指針,而後取出該位置的全部任務便可。

固然取出任務以前還得判斷圈數是否爲 0 ,不爲 0 時說明該任務還得再輪幾圈,同時須要將圈數 -1 。

這樣就可避免輪詢全部的任務,不過若是時間輪的槽比較少,致使某一個槽上的任務很是多那效率也比較低,這就和 HashMaphash 衝突是同樣的。

編碼實現

理論講完後咱們來看看實際的編碼實現,爲此我建立了一個 RingBufferWheel 類。

它的主要功能以下:

  • 能夠添加指定時間的延時任務,在這個任務中能夠實現本身的業務邏輯。
  • 中止運行(包含強制中止和全部任務完成後中止)。
  • 查看待執行任務數量。

首先直接看看這個類是如何使用的。

我在這裏建立了 65 個延時任務,每一個任務都比前一個延後 1s 執行;同時自定義了一個 Job 類來實現本身的業務邏輯,最後調用 stop(false) 會在全部任務執行完畢後退出。

構造函數

先來看看其中的構造函數,這裏一共有兩個構造函數,用於接收一個線程池及時間輪的大小。

線程池的做用會在後面講到。

這裏的時間輪大小也是有講究的,它的長度必須得是 2∧n,至於爲何有這個要求後面也會講到。

默認狀況下會初始化一個長度爲 64 的數組。

添加任務

下面來看看添加任務的邏輯,根據咱們以前的那張抽象圖其實很容易實現。


首先咱們要定義一個 Task 類,用於抽象任務;它自己也是一個線程,一旦延時到期便會執行其中的 run 函數,因此使用時即可繼承該類,將業務邏輯寫在 run() 中便可。

它其中還有兩個成員變量,也很好理解。

  • cycleNum 用於記錄該任務所在時間輪的圈數。
  • key 在這裏其實就是延時時間。

//經過 key 計算應該存放的位置
    private Set<Task> get(int key) {
        int index = mod(key, bufferSize);
        return (Set<Task>) ringBuffer[index];
    }

    private int mod(int target, int mod) {
        // equals target % mod
        target = target + tick.get() ;
        return target & (mod - 1);
    }

首先是根據延時時間 (key) 計算出所在的位置,其實就和 HashMap 同樣的取模運算,只不過這裏使用了位運算替代了取模,同時效率會高上很多。

這樣也解釋了爲何數組長度必定得是 2∧n

而後查看該位置上是否存在任務,不存在就新建一個;存在天然就是將任務寫入這個集合並更新回去。

private int cycleNum(int target, int mod) {
        //equals target/mod
        return target >> Integer.bitCount(mod - 1);
    }
其中的 cycleNum() 天然是用於計算該任務所處的圈數,也是考慮到效率問題,使用位運算替代了除法。
private void put(int key, Set<Task> tasks) {
        int index = mod(key, bufferSize);
        ringBuffer[index] = tasks;
    }

put() 函數就很是簡單了,就是將任務寫入指定數組下標便可。

啓動時間輪

任務寫進去後下一步即是啓動這個時間輪了,我這裏定義了一個 start() 函數。

其實本質上就是開啓了一個後臺線程來作這個事情:

它會一直從時間輪中取出任務來運行,而運行這些任務的線程即是咱們在初始化時傳入的線程池;因此全部的延時任務都是由自定義的線程池調度完成的,這樣能夠避免時間輪的阻塞。

這裏調用的 remove(index) 很容易猜到是用於獲取當前數組中的全部任務。

邏輯很簡單就再也不贅述,不過其中的 size2Notify() 卻是值得說一下。

他是用於在中止任務時,主線程等待全部延時任務執行完畢的喚醒條件。這類用法幾乎是全部線程間通訊的常規套路,值得收入技能包。

中止時間輪

剛纔提到的喚醒主線程得配合這裏的中止方法使用:

若是是強制中止那便什麼也無論,直接更新中止標誌,同時關閉線程池便可。

但若是是軟中止(等待全部任務執行完畢)時,那就得經過上文提到的方式阻塞主線程,直到任務執行完畢後被喚醒。

CIM 中的應用

介紹了核心原理和基本 API 後,咱們來看看實際業務場景如何結合使用(背景是一個即時通信項目)。

我這裏所使用的場景在文初也提到了,就是真的發送一條延時消息;

現有的消息都是實時消息,因此要實現一個延時消息即是在現有的發送客戶端處將延時消息放入到這個時間輪中,在任務到期時再執行真正的消息發送邏輯。

因爲項目自己結合了 Spring,因此第一步天然是配置 bean

bean 配置好後其實就可使用了。

每當發送的是延時消息時,只須要將這個消息封裝爲一個 Job 放到時間輪中,而後在本身的業務類中完成業務便可。

後續能夠優化下 api,不用每次新增任務都要調用 start() 方法。

這樣一個延時消息的應用便完成了。

總結

時間輪這樣的應用還很是多,好比 Netty 中的 HashedWheelTimer 工具原理也差很少,能夠用於維護長鏈接心跳信息。

甚至 Kafka 在這基礎上還優化出了層級時間輪,這些都是後話了,你們感興趣的話能夠自行搜索資料或者抽時間我再完善一次。

這篇文章從前期準備到擼碼實現仍是花了很多時間,若是對你有幫助的話還請點贊轉發。

本文的全部源碼均可在此處查閱:

https://github.com/crossoverJie/cim

你的點贊與分享是對我最大的支持

相關文章
相關標籤/搜索