【RabbitMQ】一文帶你搞定RabbitMQ延遲隊列

本文口味:魚香肉絲   預計閱讀:10分鐘html

1、說明

在上一篇中,介紹了RabbitMQ中的死信隊列是什麼,什麼時候使用以及如何使用RabbitMQ的死信隊列。相信經過上一篇的學習,對於死信隊列已經有了更多的瞭解,這一篇的內容也跟死信隊列息息相關,若是你還不瞭解死信隊列,那麼建議你先進行上一篇文章的閱讀。java

這一篇裏,咱們將繼續介紹RabbitMQ的高級特性,經過本篇的學習,你將收穫:git

  1. 什麼是延時隊列
  2. 延時隊列使用場景
  3. RabbitMQ中的TTL
  4. 如何利用RabbitMQ來實現延時隊列

2、本文大綱

如下是本文大綱:程序員

1.png

本文閱讀前,須要對RabbitMQ以及死信隊列有一個簡單的瞭解。github

3、什麼是延時隊列

延時隊列,首先,它是一種隊列,隊列意味着內部的元素是有序的,元素出隊和入隊是有方向性的,元素從一端進入,從另外一端取出。web

其次,延時隊列,最重要的特性就體如今它的延時屬性上,跟普通的隊列不同的是,普通隊列中的元素老是等着但願被早點取出處理,而延時隊列中的元素則是但願被在指定時間獲得取出和處理,因此延時隊列中的元素是都是帶時間屬性的,一般來講是須要被處理的消息或者任務。shell

簡單來講,延時隊列就是用來存放須要在指定時間被處理的元素的隊列。數據庫

4、延時隊列使用場景

那麼何時須要用延時隊列呢?考慮一下如下場景:架構

  1. 訂單在十分鐘以內未支付則自動取消。
  2. 新建立的店鋪,若是在十天內都沒有上傳過商品,則自動發送消息提醒。
  3. 帳單在一週內未支付,則自動結算。
  4. 用戶註冊成功後,若是三天內沒有登錄則進行短信提醒。
  5. 用戶發起退款,若是三天內沒有獲得處理則通知相關運營人員。
  6. 預約會議後,須要在預約的時間點前十分鐘通知各個與會人員參加會議。

這些場景都有一個特色,須要在某個事件發生以後或者以前的指定時間點完成某一項任務,如:發生訂單生成事件,在十分鐘以後檢查該訂單支付狀態,而後將未支付的訂單進行關閉;發生店鋪建立事件,十天後檢查該店鋪上新商品數,而後通知上新數爲0的商戶;發生帳單生成事件,檢查帳單支付狀態,而後自動結算未支付的帳單;發生新用戶註冊事件,三天後檢查新註冊用戶的活動數據,而後通知沒有任何活動記錄的用戶;發生退款事件,在三天以後檢查該訂單是否已被處理,如仍未被處理,則發送消息給相關運營人員;發生預約會議事件,判斷離會議開始是否只有十分鐘了,若是是,則通知各個與會人員。app

看起來彷佛使用定時任務,一直輪詢數據,每秒查一次,取出須要被處理的數據,而後處理不就完事了嗎?若是數據量比較少,確實能夠這樣作,好比:對於「若是帳單一週內未支付則進行自動結算」這樣的需求,若是對於時間不是嚴格限制,而是寬鬆意義上的一週,那麼天天晚上跑個定時任務檢查一下全部未支付的帳單,確實也是一個可行的方案。但對於數據量比較大,而且時效性較強的場景,如:「訂單十分鐘內未支付則關閉「,短時間內未支付的訂單數據可能會有不少,活動期間甚至會達到百萬甚至千萬級別,對這麼龐大的數據量仍舊使用輪詢的方式顯然是不可取的,極可能在一秒內沒法完成全部訂單的檢查,同時會給數據庫帶來很大壓力,沒法知足業務要求並且性能低下。

更重要的一點是,不!優!雅!

沒錯,做爲一名有追求的程序員,始終應該追求更優雅的架構和更優雅的代碼風格,寫代碼要像寫詩同樣優美。【滑稽】

這時候,延時隊列就能夠閃亮登場了,以上場景,正是延時隊列的用武之地。

既然延時隊列能夠解決不少特定場景下,帶時間屬性的任務需求,那麼如何構造一個延時隊列呢?接下來,本文將介紹如何用RabbitMQ來實現延時隊列。

5、RabbitMQ中的TTL

在介紹延時隊列以前,還須要先介紹一下RabbitMQ中的一個高級特性——TTL(Time To Live)

TTL是什麼呢?TTL是RabbitMQ中一個消息或者隊列的屬性,代表一條消息或者該隊列中的全部消息的最大存活時間,單位是毫秒。換句話說,若是一條消息設置了TTL屬性或者進入了設置TTL屬性的隊列,那麼這條消息若是在TTL設置的時間內沒有被消費,則會成爲「死信」(至於什麼是死信,請翻看上一篇)。若是同時配置了隊列的TTL和消息的TTL,那麼較小的那個值將會被使用。

那麼,如何設置這個TTL值呢?有兩種方式,第一種是在建立隊列的時候設置隊列的「x-message-ttl」屬性,以下:

Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

這樣全部被投遞到該隊列的消息都最多不會存活超過6s。

另外一種方式即是針對每條消息設置TTL,代碼以下:

AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

這樣這條消息的過時時間也被設置成了6s。

但這兩種方式是有區別的,若是設置了隊列的TTL屬性,那麼一旦消息過時,就會被隊列丟棄,而第二種方式,消息即便過時,也不必定會被立刻丟棄,由於消息是否過時是在即將投遞到消費者以前斷定的,若是當前隊列有嚴重的消息積壓狀況,則已過時的消息也許還能存活較長時間。

另外,還須要注意的一點是,若是不設置TTL,表示消息永遠不會過時,若是將TTL設置爲0,則表示除非此時能夠直接投遞該消息到消費者,不然該消息將會被丟棄。

6、如何利用RabbitMQ實現延時隊列

前一篇裏介紹了若是設置死信隊列,前文中又介紹了TTL,至此,利用RabbitMQ實現延時隊列的兩大要素已經集齊,接下來只須要將它們進行調和,再加入一點點調味料,延時隊列就能夠新鮮出爐了。

想一想看,延時隊列,不就是想要消息延遲多久被處理嗎,TTL則恰好能讓消息在延遲多久以後成爲死信,另外一方面,成爲死信的消息都會被投遞到死信隊列裏,這樣只須要消費者一直消費死信隊列裏的消息就萬事大吉了,由於裏面的消息都是但願被當即處理的消息。

從下圖能夠大體看出消息的流向:

23.png

生產者生產一條延時消息,根據須要延時時間的不一樣,利用不一樣的routingkey將消息路由到不一樣的延時隊列,每一個隊列都設置了不一樣的TTL屬性,並綁定在同一個死信交換機中,消息過時後,根據routingkey的不一樣,又會被路由到不一樣的死信隊列中,消費者只須要監聽對應的死信隊列進行處理便可。

下面來看代碼:

先聲明交換機、隊列以及他們的綁定關係:

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    public static final String DELAY_QUEUEA_NAME = "delay.queue.demo.business.queuea";
    public static final String DELAY_QUEUEB_NAME = "delay.queue.demo.business.queueb";
    public static final String DELAY_QUEUEA_ROUTING_KEY = "delay.queue.demo.business.queuea.routingkey";
    public static final String DELAY_QUEUEB_ROUTING_KEY = "delay.queue.demo.business.queueb.routingkey";
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEA_ROUTING_KEY = "delay.queue.demo.deadletter.delay_10s.routingkey";
    public static final String DEAD_LETTER_QUEUEB_ROUTING_KEY = "delay.queue.demo.deadletter.delay_60s.routingkey";
    public static final String DEAD_LETTER_QUEUEA_NAME = "delay.queue.demo.deadletter.queuea";
    public static final String DEAD_LETTER_QUEUEB_NAME = "delay.queue.demo.deadletter.queueb";

    // 聲明延時Exchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 聲明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 聲明延時隊列A 延時10s
    // 並綁定到對應的死信交換機
    @Bean("delayQueueA")
    public Queue delayQueueA(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    這裏聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這裏聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEA_ROUTING_KEY);
        // x-message-ttl  聲明隊列的TTL
        args.put("x-message-ttl", 6000);
        return QueueBuilder.durable(DELAY_QUEUEA_NAME).withArguments(args).build();
    }

    // 聲明延時隊列B 延時 60s
    // 並綁定到對應的死信交換機
    @Bean("delayQueueB")
    public Queue delayQueueB(){
        Map<String, Object> args = new HashMap<>(2);
        // x-dead-letter-exchange    這裏聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這裏聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEB_ROUTING_KEY);
        // x-message-ttl  聲明隊列的TTL
        args.put("x-message-ttl", 60000);
        return QueueBuilder.durable(DELAY_QUEUEB_NAME).withArguments(args).build();
    }

    // 聲明死信隊列A 用於接收延時10s處理的消息
    @Bean("deadLetterQueueA")
    public Queue deadLetterQueueA(){
        return new Queue(DEAD_LETTER_QUEUEA_NAME);
    }

    // 聲明死信隊列B 用於接收延時60s處理的消息
    @Bean("deadLetterQueueB")
    public Queue deadLetterQueueB(){
        return new Queue(DEAD_LETTER_QUEUEB_NAME);
    }

    // 聲明延時隊列A綁定關係
    @Bean
    public Binding delayBindingA(@Qualifier("delayQueueA") Queue queue,
                                    @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEA_ROUTING_KEY);
    }

    // 聲明業務隊列B綁定關係
    @Bean
    public Binding delayBindingB(@Qualifier("delayQueueB") Queue queue,
                                    @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEB_ROUTING_KEY);
    }

    // 聲明死信隊列A綁定關係
    @Bean
    public Binding deadLetterBindingA(@Qualifier("deadLetterQueueA") Queue queue,
                                    @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEA_ROUTING_KEY);
    }

    // 聲明死信隊列B綁定關係
    @Bean
    public Binding deadLetterBindingB(@Qualifier("deadLetterQueueB") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEB_ROUTING_KEY);
    }
}

接下來,建立兩個消費者,分別對兩個死信隊列的消息進行消費:

@Slf4j
@Component
public class DeadLetterQueueConsumer {

    @RabbitListener(queues = DEAD_LETTER_QUEUEA_NAME)
    public void receiveA(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},死信隊列A收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }

    @RabbitListener(queues = DEAD_LETTER_QUEUEB_NAME)
    public void receiveB(Message message, Channel channel) throws IOException {
        String msg = new String(message.getBody());
        log.info("當前時間:{},死信隊列B收到消息:{}", new Date().toString(), msg);
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }
}

而後是消息的生產者:

@Component
public class DelayMessageSender {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendMsg(String msg, DelayTypeEnum type){
        switch (type){
            case DELAY_10s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEA_ROUTING_KEY, msg);
                break;
            case DELAY_60s:
                rabbitTemplate.convertAndSend(DELAY_EXCHANGE_NAME, DELAY_QUEUEB_ROUTING_KEY, msg);
                break;
        }
    }
}

接下來,咱們暴露一個web接口來生產消息:

@Slf4j
@RequestMapping("rabbitmq")
@RestController
public class RabbitMQMsgController {

    @Autowired
    private DelayMessageSender sender;

    @RequestMapping("sendmsg")
    public void sendMsg(String msg, Integer delayType){
        log.info("當前時間:{},收到請求,msg:{},delayType:{}", new Date(), msg, delayType);
        sender.sendMsg(msg, Objects.requireNonNull(DelayTypeEnum.getDelayTypeEnumByValue(delayType)));
    }
}

準備就緒,啓動!

打開rabbitMQ的管理後臺,能夠看到咱們剛纔建立的交換機和隊列信息:

2.png

4.png

3.png

接下來,咱們來發送幾條消息,http://localhost:8080/rabbitmq/sendmsg?msg=testMsg1&delayType=1 http://localhost:8080/rabbitmq/sendmsg?msg=testMsg2&delayType=2

日誌以下:

2019-07-28 16:02:19.813  INFO 3860 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:02:19 CST 2019,收到請求,msg:testMsg1,delayType:1
2019-07-28 16:02:19.815  INFO 3860 --- [nio-8080-exec-9] .l.DirectReplyToMessageListenerContainer : SimpleConsumer [queue=amq.rabbitmq.reply-to, consumerTag=amq.ctag-o-qPpkWIkRm73DIrOIVhig identity=766339] started
2019-07-28 16:02:25.829  INFO 3860 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:02:25 CST 2019,死信隊列A收到消息:testMsg1
2019-07-28 16:02:41.326  INFO 3860 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:02:41 CST 2019,收到請求,msg:testMsg2,delayType:2
2019-07-28 16:03:41.329  INFO 3860 --- [ntContainer#0-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:03:41 CST 2019,死信隊列B收到消息:testMsg2

第一條消息在6s後變成了死信消息,而後被消費者消費掉,第二條消息在60s以後變成了死信消息,而後被消費掉,這樣,一個還算ok的延時隊列就打造完成了。

不過,等等,若是這樣使用的話,豈不是每增長一個新的時間需求,就要新增一個隊列,這裏只有6s和60s兩個時間選項,若是須要一個小時後處理,那麼就須要增長TTL爲一個小時的隊列,若是是預約會議室而後提早通知這樣的場景,豈不是要增長無數個隊列才能知足需求??

嗯,仔細想一想,事情並不簡單。

7、RabbitMQ延時隊列優化

顯然,須要一種更通用的方案才能知足需求,那麼就只能將TTL設置在消息屬性裏了。咱們來試一試。

增長一個延時隊列,用於接收設置爲任意延時時長的消息,增長一個相應的死信隊列和routingkey:

@Configuration
public class RabbitMQConfig {

    public static final String DELAY_EXCHANGE_NAME = "delay.queue.demo.business.exchange";
    public static final String DELAY_QUEUEC_NAME = "delay.queue.demo.business.queuec";
    public static final String DELAY_QUEUEC_ROUTING_KEY = "delay.queue.demo.business.queuec.routingkey";
    public static final String DEAD_LETTER_EXCHANGE = "delay.queue.demo.deadletter.exchange";
    public static final String DEAD_LETTER_QUEUEC_ROUTING_KEY = "delay.queue.demo.deadletter.delay_anytime.routingkey";
    public static final String DEAD_LETTER_QUEUEC_NAME = "delay.queue.demo.deadletter.queuec";

    // 聲明延時Exchange
    @Bean("delayExchange")
    public DirectExchange delayExchange(){
        return new DirectExchange(DELAY_EXCHANGE_NAME);
    }

    // 聲明死信Exchange
    @Bean("deadLetterExchange")
    public DirectExchange deadLetterExchange(){
        return new DirectExchange(DEAD_LETTER_EXCHANGE);
    }

    // 聲明延時隊列C 不設置TTL
    // 並綁定到對應的死信交換機
    @Bean("delayQueueC")
    public Queue delayQueueC(){
        Map<String, Object> args = new HashMap<>(3);
        // x-dead-letter-exchange    這裏聲明當前隊列綁定的死信交換機
        args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
        // x-dead-letter-routing-key  這裏聲明當前隊列的死信路由key
        args.put("x-dead-letter-routing-key", DEAD_LETTER_QUEUEC_ROUTING_KEY);
        return QueueBuilder.durable(DELAY_QUEUEC_NAME).withArguments(args).build();
    }

    // 聲明死信隊列C 用於接收延時任意時長處理的消息
    @Bean("deadLetterQueueC")
    public Queue deadLetterQueueC(){
        return new Queue(DEAD_LETTER_QUEUEC_NAME);
    }

    // 聲明延時列C綁定關係
    @Bean
    public Binding delayBindingC(@Qualifier("delayQueueC") Queue queue,
                                 @Qualifier("delayExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DELAY_QUEUEC_ROUTING_KEY);
    }

    // 聲明死信隊列C綁定關係
    @Bean
    public Binding deadLetterBindingC(@Qualifier("deadLetterQueueC") Queue queue,
                                      @Qualifier("deadLetterExchange") DirectExchange exchange){
        return BindingBuilder.bind(queue).to(exchange).with(DEAD_LETTER_QUEUEC_ROUTING_KEY);
    }
}

增長一個死信隊列C的消費者:

@RabbitListener(queues = DEAD_LETTER_QUEUEC_NAME)
public void receiveC(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    log.info("當前時間:{},死信隊列C收到消息:{}", new Date().toString(), msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

再次啓動!而後訪問:http://localhost:8080/rabbitmq/delayMsg?msg=testMsg1delayTime=5000 來生產消息,注意這裏的單位是毫秒。

2019-07-28 16:45:07.033  INFO 31468 --- [nio-8080-exec-4] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:45:07 CST 2019,收到請求,msg:testMsg1,delayTime:5000
2019-07-28 16:45:11.694  INFO 31468 --- [nio-8080-exec-5] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:45:11 CST 2019,收到請求,msg:testMsg2,delayTime:5000
2019-07-28 16:45:12.048  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:45:12 CST 2019,死信隊列C收到消息:testMsg1
2019-07-28 16:45:16.709  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:45:16 CST 2019,死信隊列C收到消息:testMsg2

看起來彷佛沒什麼問題,但不要高興的太早,在最開始的時候,就介紹過,若是使用在消息屬性上設置TTL的方式,消息可能並不會按時「死亡「,由於RabbitMQ只會檢查第一個消息是否過時,若是過時則丟到死信隊列,索引若是第一個消息的延時時長很長,而第二個消息的延時時長很短,則第二個消息並不會優先獲得執行。

實驗一下:

2019-07-28 16:49:02.957  INFO 31468 --- [nio-8080-exec-8] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:49:02 CST 2019,收到請求,msg:longDelayedMsg,delayTime:20000
2019-07-28 16:49:10.671  INFO 31468 --- [nio-8080-exec-9] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 16:49:10 CST 2019,收到請求,msg:shortDelayedMsg,delayTime:2000
2019-07-28 16:49:22.969  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:49:22 CST 2019,死信隊列C收到消息:longDelayedMsg
2019-07-28 16:49:22.970  INFO 31468 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 16:49:22 CST 2019,死信隊列C收到消息:shortDelayedMsg

咱們先發了一個延時時長爲20s的消息,而後發了一個延時時長爲2s的消息,結果顯示,第二個消息會在等第一個消息成爲死信後纔會「死亡「。

8、利用RabbitMQ插件實現延遲隊列

上文中提到的問題,確實是一個硬傷,若是不能實如今消息粒度上添加TTL,並使其在設置的TTL時間及時死亡,就沒法設計成一個通用的延時隊列。

那如何解決這個問題呢?不要慌,安裝一個插件便可:https://www.rabbitmq.com/community-plugins.html ,下載rabbitmq_delayed_message_exchange插件,而後解壓放置到RabbitMQ的插件目錄。

接下來,進入RabbitMQ的安裝目錄下的sbin目錄,執行下面命令讓該插件生效,而後重啓RabbitMQ。

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

而後,咱們再聲明幾個Bean:

@Configuration
public class DelayedRabbitMQConfig {
    public static final String DELAYED_QUEUE_NAME = "delay.queue.demo.delay.queue";
    public static final String DELAYED_EXCHANGE_NAME = "delay.queue.demo.delay.exchange";
    public static final String DELAYED_ROUTING_KEY = "delay.queue.demo.delay.routingkey";

    @Bean
    public Queue immediateQueue() {
        return new Queue(DELAYED_QUEUE_NAME);
    }

    @Bean
    public CustomExchange customExchange() {
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAYED_EXCHANGE_NAME, "x-delayed-message", true, false, args);
    }

    @Bean
    public Binding bindingNotify(@Qualifier("immediateQueue") Queue queue,
                                 @Qualifier("customExchange") CustomExchange customExchange) {
        return BindingBuilder.bind(queue).to(customExchange).with(DELAYED_ROUTING_KEY).noargs();
    }
}

controller層再添加一個入口:

@RequestMapping("delayMsg2")
public void delayMsg2(String msg, Integer delayTime) {
    log.info("當前時間:{},收到請求,msg:{},delayTime:{}", new Date(), msg, delayTime);
    sender.sendDelayMsg(msg, delayTime);
}

消息生產者的代碼也須要修改:

public void sendDelayMsg(String msg, Integer delayTime) {
    rabbitTemplate.convertAndSend(DELAYED_EXCHANGE_NAME, DELAYED_ROUTING_KEY, msg, a ->{
        a.getMessageProperties().setDelay(delayTime);
        return a;
    });
}

最後,再建立一個消費者:

@RabbitListener(queues = DELAYED_QUEUE_NAME)
public void receiveD(Message message, Channel channel) throws IOException {
    String msg = new String(message.getBody());
    log.info("當前時間:{},延時隊列收到消息:{}", new Date().toString(), msg);
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}

一切準備就緒,啓動!而後分別訪問如下連接:

http://localhost:8080/rabbitmq/delayMsg2?msg=msg1&delayTime=20000
http://localhost:8080/rabbitmq/delayMsg2?msg=msg2&delayTime=2000

日誌以下:

2019-07-28 17:28:13.729  INFO 25804 --- [nio-8080-exec-2] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 17:28:13 CST 2019,收到請求,msg:msg1,delayTime:20000
2019-07-28 17:28:20.607  INFO 25804 --- [nio-8080-exec-1] c.m.d.controller.RabbitMQMsgController   : 當前時間:Sun Jul 28 17:28:20 CST 2019,收到請求,msg:msg2,delayTime:2000
2019-07-28 17:28:22.624  INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 17:28:22 CST 2019,延時隊列收到消息:msg2
2019-07-28 17:28:33.751  INFO 25804 --- [ntContainer#1-1] c.m.d.mq.DeadLetterQueueConsumer         : 當前時間:Sun Jul 28 17:28:33 CST 2019,延時隊列收到消息:msg1

第二個消息被先消費掉了,符合預期。至此,RabbitMQ實現延時隊列的部分就完結了。

9、總結

延時隊列在須要延時處理的場景下很是有用,使用RabbitMQ來實現延時隊列能夠很好的利用RabbitMQ的特性,如:消息可靠發送、消息可靠投遞、死信隊列來保障消息至少被消費一次以及未被正確處理的消息不會被丟棄。另外,經過RabbitMQ集羣的特性,能夠很好的解決單點故障問題,不會由於單個節點掛掉致使延時隊列不可用或者消息丟失。

固然,延時隊列還有不少其它選擇,好比利用Java的DelayQueu,利用Redis的zset,利用Quartz或者利用kafka的時間輪,這些方式各有特色,但就像爐石傳說通常,這些知識就比如手裏的卡牌,知道的越多,能夠用的卡牌也就越多,遇到問題便能遊刃有餘,因此須要大量的知識儲備和經驗積累才能打造出更出色的卡牌組合,讓本身解決問題的能力獲得更好的提高。

但另外一方面,隨着時間的流逝和閱歷的增加,愈來愈感受到本身的能力有限,沒法獨自面對紛繁複雜且多變的業務需求,在不少方面須要其餘人的協助才能很好的完成任務。也知道聞道有前後,術業有專攻,不會再狂妄自大,以爲本身能把全部事情都搞定,也將重心慢慢轉移到研究如何有效的進行團隊合做上來,我相信一個高度協調的團隊永遠比一我的戰鬥要更有價值。

花了一個週末的時間完成了這篇文章,文中全部的代碼都上傳到了github,https://github.com/MFrank2016/delayed-queue-demo若有須要能夠自行查閱,但願能對你有幫助,若是有錯誤的地方,歡迎指正,也歡迎關注個人公衆號進行留言交流。

TIM圖片20190714173105.png

相關文章
相關標籤/搜索