RabbitMQ延遲消息發送

爲何使用延遲消息?java

不一樣於同步消息,有些業務場景下但願能夠實現延遲必定時間再消費消息。git

典型的場景有微信、支付寶等第三方支付回調接口,會在用戶支付後3秒、5秒、30秒等等時間後向應用服務器發送回調請求,確保應用服務器能夠正確收到消息。數據庫

那有些朋友就會說了,把須要定時處理的數據存到數據庫中用定時任務就能夠實現,爲何還弄個異步消息。增長後臺維護成本。服務器

使用定時任務固然沒有問題能夠實現該問題。在小數據量狀況下沒有問題。但當數據量交大的時候怎麼辦?若是每一個任務的延遲時間不一樣怎麼辦?微信

其餘方式實現消息隊列數據結構

名稱 實現方式 詳細說明
Redis 使用zset數據結構 使用zset的score屬性存放執行時間戳,起一個死循環的線程不斷的取第一個Key值,若是當前時間戳大於該Key的socre 值時將它取出來消費,注意不須要遍歷整個Zset集合,以避免形成性能浪費
定時任務 給定週期掃描待處理消息 使用該方式間隔時間很差控制,給短會形成無心義的掃描,增長數據庫壓力,給長了偏差較大
定時任務 動態建立惟一性定時任務 一次性的任務會增長數據庫存儲,須要定時清理,如相差時間較近的任務較多,也會形成性能較差
時間輪 自定義 自定義一個時間輪的數據結構,啓動一個後臺線程,延遲一秒,獲取時間輪中的任務啓動子線程獨立執行時間輪的任務

如何選擇消息中間件?app

中間件 是否原生支持 說明
RocketMQ 支持 不支持任意時間的延遲消息的設置,僅支持內置預設值的延遲時間間隔的延遲消息。預設值的延遲時間間隔爲:1s、 5s、 10s、 30s、 1m、 2m、 3m、 4m、 5m、 6m、 7m、 8m、 9m、 10m、 20m、 30m、 1h、 2h
RabbitMQ 不支持 可以使用消息的TTL和死信Exchange實現
Kafka 不支持 可以使用TimingWheel 實現
AcitveMQ 支持

因本身在使用RabbitMQ作爲消息中間件,因此直接選用了RabbitMQ來實現。dom

實現以前異步

在實現以前咱們先須要知道RabbitMQ如下兩個概念。性能

  • TTL(Time To Live)消息過時時間。

消息若是在隊列中一直沒有被消費而且存在時間超過了TTL,消息就會變成了"死信" (Dead Message),後續沒法再被消費了。

  • DLX(Dead-Letter-Exchange)死信交換器。

它的做用實際上是用來接收死信消息(dead message)的。

  1. 消息被拒絕(Basic.Reject/Basic.Nack) ,井且設置requeue 參數爲false
  2. 消息過時
  3. 隊列達到最大長度

由於消息若是未被正常消費並設置了requeue爲false時會進入死信隊列,咱們能夠監控消費死信隊列中消息,來觀察和分析系統的問題。

RabbitMQ能夠從兩種維度設置消息過時時間,分別是隊列和消息自己。兩種方式哪一個時間小先執行哪一個。

實現思路

想到有兩種實現方式和效果。甚至能夠結合使用。

第一種:設定固定幾個延遲時間(像RocketMQ中間件)

固定延遲消息隊列

第二種:實現自定義任意時間延遲

自定義任意時間延遲

以上兩種方式各有優缺點,我本身實現的是第二種,下面詳細說明

圖中後半段死信路由與應用消費基本相同,只要在消費端綁將一個正常隊列與死信路由綁定就行。

/**
 * @Author: maomao
 * @Date: 2019-09-04 18:34
 */
@Slf4j
@Component
public class FreeCloudMQConsume {
    @RabbitListener(bindings = {@QueueBinding(value = @Queue(value = "free.cloud.out.mq",durable = "true"),
                                 exchange = @Exchange(value = "free.cloud.die.exchange",type = ExchangeTypes.TOPIC),
                                 key = "free.cloud.out.mq.dead.message.#")})
    public void print(String message){
        log.info("print 5 ---- > {}",message);
    }
}

調用方發送消息

/**
     * 建立延遲隊列,會隨指定延遲時間+5秒後刪除隊列
     * @param queueName
     * @param delayMillis
     * @return
     */
    private static Queue createDelayQueue(String queueName, Integer delayMillis) {
        /**
         * 隊列名稱  //死信時間 ,死信從新投遞的交換機 ,路由到隊列的routingKey
         */
        String time = String.valueOf(System.currentTimeMillis());
        String delayQueueName = queueName + ".delay_" + delayMillis + "_" + time;
        return QueueBuilder.durable(delayQueueName)
                //設置消息失效時間
                .withArgument("x-message-ttl",delayMillis * 1000)
                //設置隊列自動刪除時間 ,比消息延遲時間多5秒
                .withArgument("x-expires", (delayMillis + 5) * 1000)
                //設置死信路由
                .withArgument("x-dead-letter-exchange", "free.cloud.die.exchange")
                //設置死信路由routingKey
                .withArgument("x-dead-letter-routing-key", queueName + ".dead.message." + time)
                .build();
    }

    /**
     * 發送延遲消息
     * @param queueName
     * @param message
     * @param delayMillis
     */
    public static void sendDelayMessage(String queueName,Object message,Integer delayMillis){
        //死信消息隊列(動態建立,會銷燬)
        Queue delayQueue = createDelayQueue(queueName, delayMillis);
        //建立隊列
        addQueue(delayQueue);
        //延遲消息路由Key
        StringBuilder delayRoutingKey = new StringBuilder(queueName + ".delay");
        delayRoutingKey.append(".").append(message.hashCode() + "_" + RandomUtil.randomString(5));
        //綁定延遲路由
        RabbitMqUtil.addBinding(delayQueue,delayExchange,delayRoutingKey.toString());
        getRabbitTemplate().convertAndSend("free.cloud.delay.exchange",delayRoutingKey.toString(),message);
    }

以上是自定義延遲消息的關鍵實現代碼,完整代碼能夠 點擊這裏 獲取

效果

相關文章
相關標籤/搜索