[MQ]再談延時隊列

小記

最近項目裏有需求,在接口調用完畢後將一些消息經過MQ通知給另外一個服務,而且由於業務的緣由,須要停留一分鐘再投遞到MQ,另外一個團隊來消費,我原本想用RabbitMQ(如下簡稱RMQ)來實現,但通過和同事討論決定不用RMQ來實現延時,RMQ只充當消息通知,延時在本地進行實現。本地採用一個單機的延時隊列,是我另外一個同事寫的簡單組件,拿過來直接用就好了,把功能作完,順利上線,可是以後問題仍是暴露了出來,MQ消費者那邊的團隊反饋消息有時候沒收到,因而我開始排查問題所在,確認了我業務代碼是沒問題的,可是看服務端的日誌顯示的確沒發出去,可是由於是本地的一個延時隊列,消息放在內存裏,我也無法查隊列裏的具體消息狀況,也不敢下結論是本地延時隊列的問題,由於開發環境和測試環境都沒出現問題。只能先跟蹤一下,次日把前面三天的日誌都拉下來看發現問題所在,許多實例都出現了消息在次日要麼就當天的很晚的時候才發出去的狀況,排除了多是由於虛擬機沒有同步宿主機的時間的緣由以後(由於延時隊列裏面是獲取的當前時間和消息建立時間的差來判斷時間間隔),我這下基本肯定這個是我同事寫的延時隊列的問題。html

問題來源

其實這個延時隊列邏輯很簡單,數據結構就是一個數組,入隊列就是把消息放在可用位置上,到了數量知足必定條件的時候就擴容,出隊列的時候全數組掃描,當碰到到期的消息的時候,將消息取出。其實如今看來這個延時隊列其實設計得不是很優雅,若是取元素,須要通過不少次掃描大數組,而且擴容的時候對內存的消耗也大,這裏代碼就不帖了,這個故事告訴咱們,一個組件要給別人用,必需要通過多方面專業的測試才行,此次的問題後來我和同事討論其實仍是由於有些case沒有測試到致使的。另外就是,選用組件的時候最好仍是選用已知的穩定的,由於通過檢驗的纔是出故障可能性比較小的。redis

扯一扯延時隊列

延時隊列在業務中常常會用到,好比網上買個東西,訂單生成了可是多少時間內沒支付就關閉訂單,定時邏輯等等。以前我在學RabbitMQ的時候也實現過相似的功能。具體能夠看RabbitMQ延時隊列,今天來整理一下去設計一個延時隊列須要些什麼而且有哪些方案。算法

  • 設計延時隊列(單機/分佈式)須要考慮哪些
    • 及時性 消費者端可否及時收到
    • 可靠性 消息不能像我那裏的問題同樣,沒有被及時消費
    • 可恢復 萬一出現問題,以前的數據須要可以恢復
    • 可撤回 尚未到延時時間的消息能夠撤回
    • 高可用 在一個實例失效的狀況下其餘實例還能繼續工做
    • 任務丟失補償 任務丟失了以後咋辦
    • .....

單機延時隊列

  • Java自帶的DelayQueue

來粗略看一下里面帶的結構有啥數組

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {

    private final transient ReentrantLock lock = new ReentrantLock();
    private final PriorityQueue<E> q = new PriorityQueue<E>();
    .....

能夠看到實際上這裏面的結構是一個PriorityQueue,咱們知道實際上這個隊列能夠用來作最大堆或者最小堆,取出的元素是經過比較器的規則比較出來的最大值或者最小值。再來看泛型裏的參數,是都須要實現一個Delayed的接口的,再來看看接口數據結構

public interface Delayed extends Comparable<Delayed> {

    /**
     * Returns the remaining delay associated with this object, in the
     * given time unit.
     *
     * @param unit the time unit
     * @return the remaining delay; zero or negative values indicate
     * that the delay has already elapsed
     */
    long getDelay(TimeUnit unit);
}

從註釋中,能夠得知這裏面的方法是用來獲取剩餘的時間的。而且這接口仍是實現了比較器的接口的,因此不難推出,這裏其實就是經過堆排序,來找到最先過時的元素。也就是最早應該出隊列的元素。分佈式

簡單作個demo,是實現如下Delayed的接口ide

public class DelayTask implements Delayed {

    /**
     * 消息編號
     * */
    private int index;

    /**
     * 延時時長+入隊時間的值
     * */
    private long dealAt;

    public DelayTask(long time,int index){
        this.dealAt = time;
        this.index = index;
    }

    @Override
    public long getDelay(TimeUnit unit) {
        return unit.convert(
                dealAt-System.currentTimeMillis(),
                TimeUnit.MILLISECONDS);
    }

    @Override
    public int compareTo(Delayed o) {
        if(getDelay(TimeUnit.MILLISECONDS)>o.getDelay(TimeUnit.MILLISECONDS)) {
            return 1;
        }else {
            return -1;
        }
    }
    //getter setter
}

作個簡單的測試測試

public class DelayQueueTest {

    public static void main(String[] args) throws InterruptedException{
        DelayQueue<DelayTask> queue = new DelayQueue<>();
        long currentTime = System.currentTimeMillis();
        long[] delayTimes = {10000L,5000L,15000L};
        for (int i = 0;i < delayTimes.length;i++) {
            DelayTask t = new DelayTask(delayTimes[i]+currentTime,i);
            queue.add(t);
        }
        while(!queue.isEmpty()) {
            DelayTask t = queue.take();
            if (t != null) {
                queue.poll();
                System.out.println("當前執行的任務編號爲:" + t.getIndex());
                long timeSpan = System.currentTimeMillis()-currentTime;
                System.out.println("時間間隔爲:"+timeSpan);
            }
            Thread.sleep(1000);
        }
    }
}

運行結果this

當前執行的任務編號爲:1
時間間隔爲:5001
當前執行的任務編號爲:0
時間間隔爲:10000
當前執行的任務編號爲:2
時間間隔爲:15000

那麼上面的方法有啥利弊呢,首先優勢確定是簡單,缺點也顯而易見,可靠性差,而且內存佔用的問題也很明顯。線程

  • 時間輪算法

像我前面提到我公司的同事的作法中,循環去遍歷整個數組去檢測消息是否達到延時時間的方法其實只能適用於小服務而且調用量不大的狀況,一旦像調用量大了起來,實際上輪詢整個數組去檢測消息是否達到延時時間是很低效的。那麼在這基礎上,能夠採用時間輪的辦法,一個時間輪表明一個週期,一個週期裏分爲幾個時間間隔,每個時間間隔裏包含在這一分鐘內全部的定時任務,時間輪在結構上是一個雙向鏈表。

如圖所示

avatar

假設這裏一個時間節點表明一分鐘,這裏一個時間輪也就是週期爲八分鐘,噹噹前時間到達時間節點2的時候,這說明1中的任務已經所有過時且處理完成,時間節點2對應的定時任務就開始處理。這樣作的優勢是能夠經過一個線程監控多個定時任務,可是缺點也很明顯,就是時間顆粒度由節點的間隔決定,而且這些任務的時間間隔還須要用一樣的時間顆粒度。而且須要考慮,不在時間週期裏的任務如何處理。而後延時隊列的其餘特性都還須要經過本身實現來補上。

代碼這裏先挖個坑,以後我補上。

分佈式中的延時隊列

  • Redis實現

Zset的排序功能,直接提供了很方便的解決辦法,只要咱們把Score設置爲定時任務預計執行時間的時間戳,也就是當前時間+延時的時間,這樣排序後首先拿到的就是最先過時的,命令也很簡單,就是

ZRANGEBYSCORE key min max

就能夠獲取到max對應時間戳以前的全部任務。這種作法的優勢是,許多功能redis都實現了,好比持久化,高可用性這些。可是缺點也有,那就是消息的延時和咱們輪詢讀redis的速度有關,獲取當前時間以前的定時任務,可能有任務離當前時間比較遠,而且消息過多的狀況下,redis自己會受必定影響

  • RabbitMQ實現

這個我在前面有寫過相似的文章。

RabbitMQ延時隊列

  • RocketMQ實現

阿里的開源消息隊列,但我目前還沒作太多瞭解,在我補齊這個中間件的技能點的時候一起補上。

相關文章
相關標籤/搜索