如何用RabbitMQ實現延遲隊列

前言

jdkjuc 工具包中,提供了一種延遲隊列 DelayQueue。延遲隊列用處很是普遍,好比咱們最多見的場景就是在網購或者外賣平臺中發起一個訂單,若是不付款,通常 15 分鐘後就會被關閉,這個直接用定時任務是很差實現的,由於每一個用戶下單的時間並不肯定,因此這時候就須要用到延遲隊列。html

什麼是延遲隊列

延遲隊列自己也是隊列,只不過這個隊列是延遲的,意思就是說當咱們把一條消息放入延遲隊列,消息並不會馬上出隊,而是會在到達指定時間以後(或者說過了指定時間)纔會出隊,從而被消費者消費。java

利用死信隊列實現延遲隊列

RabbitMQ 中的死信隊列就是用來存儲特定條件下的消息,那麼假如咱們把這個條件設定爲指定時間過時(設定帶TTL 的消息或者隊列),就能夠用來實現延遲隊列的功能。瀏覽器

  1. 新建一個 TtlDelayRabbitConfig 配置類(省略了包名和導入),消息最開始發送至 ttl 消息隊列,這個隊列中全部的消息在 5 秒後過時,後期後會進入死信隊列:
@Configuration
public class TtlDelayRabbitConfig {

    //路由ttl消息交換機
    @Bean("ttlDelayFanoutExchange")
    public FanoutExchange fanoutExchange(){
        return new FanoutExchange("TTL_DELAY_FANOUT_EXCHANGE");
    }

    //ttl消息隊列
    @Bean("ttlDelayQueue")
    public Queue ttlQueue(){
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("x-message-ttl", 5000);//隊列中全部消息5秒後過時
        map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//過時後進入死信隊列
        return new Queue("TTL_QUEUE",false,false,false,map);
    }

    //Fanout交換機和productQueue綁定
    @Bean
    public Binding bindTtlFanoutExchange(@Qualifier("ttlDelayQueue") Queue queue, @Qualifier("ttlDelayFanoutExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }

    //fanout死信交換機
    @Bean("ttlDelayDeadLetterExchange")
    public FanoutExchange deadLetterExchange(){
        return new FanoutExchange("TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");
    }

    //死信隊列
    @Bean("ttlDelayDeadLetterQueue")
    public Queue ttlDelayDeadLetterQueue(){
        return new Queue("TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE");
    }

    //死信隊列和死信交換機綁定
    @Bean
    public Binding deadLetterQueueBindExchange(@Qualifier("ttlDelayDeadLetterQueue") Queue queue, @Qualifier("ttlDelayDeadLetterExchange") FanoutExchange fanoutExchange){
        return BindingBuilder.bind(queue).to(fanoutExchange);
    }
}
  1. 新建一個消費者 TtlDelayConsumer 類,監聽死信隊列,這裏收到的消息都是生產者生產消息以後的 5 秒,也就是延遲了 5 秒的消息:
@Component
public class TtlDelayConsumer {

    @RabbitHandler
    @RabbitListener(queues = "TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE")
    public void fanoutConsumer(String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("【延遲隊列】【" + sdf.format(new Date()) + "】收到死信隊列消息:" + msg);
    }
}
  1. 新建一個 DelayQueueController 類作生產者來發送消息:
@RestController
@RequestMapping("/delay")
public class DelayQueueController {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping(value="/ttl/send")
    public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){
        rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",msg);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息發送成功【" + sdf.format(new Date()) + "】");
        return "succ";
    }
}
  1. 最後咱們在瀏覽器輸入地址 http://localhost:8080/delay/ttl/send?msg=測試ttl延遲隊列 進行測試,能夠看到每條消息都是在發送 5 秒以後才能收到消息:

TTL 延遲隊列的問題

假如咱們實際中,有的消息是 10 分鐘過時,有的是 20 分鐘過時,這時候咱們就須要創建多個隊列,一旦時間維度很是龐大,那麼就須要維護很是多的隊列。說到這裏,可能不少人會有疑問,咱們能夠針對單條信息設置過時時間,大可沒必要去定義多個隊列?app

然而事實真的是如此嗎?接下來咱們經過一個例子來驗證下。分佈式

  1. 把上面示例中 TtlDelayRabbitConfig 類中的隊列定義函數 x-message-ttl 屬性去掉,不過須要注意的是咱們須要先把這個隊列後臺刪除掉,不然同名隊列重複建立無效:
@Bean("ttlDelayQueue")
public Queue ttlQueue(){
    Map<String, Object> map = new HashMap<String, Object>();
    //        map.put("x-message-ttl", 5000);//註釋掉這個屬性,隊列不設置過時時間
    map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//過時後進入死信隊列
    return new Queue("TTL_QUEUE",false,false,false,map);
}
  1. 而後將 DelayQueueController 類中的發送消息方法修改一下,對每條信息設置過時時間:
@GetMapping(value="/ttl/send")
    public String ttlMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg,
                             @RequestParam(value = "time") String millTimes){
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setExpiration(millTimes);//單條消息設置過時時間,單位:毫秒
        Message message = new Message(msg.getBytes(), messageProperties);
        rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",message);
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("消息發送成功【" + sdf.format(new Date()) + "】");
        return "succ";
    }
  1. 而後執行 2 條消息發送,一條 10 秒過時,一條 5 秒過時,先發送 10 秒的:
http://localhost:8080/delay/ttl/send?msg=10秒過時消息&time=10000
http://localhost:8080/delay/ttl/send?msg=5秒過時消息&time=5000
  1. 執行以後獲得以下結果:

咱們看到,兩條消息都是 10 秒後過時,這是巧合嗎?並非,這是由於 RabbitMQ 中的機制就是若是前一條消息沒有出隊,那麼即便後一條消息已經失效,也必需要等前一條消息出隊以後才能出隊,因此這就是爲何通常都儘可能避免同一個隊列單條消息設置不一樣過時時間的作法。函數

死信隊列實現的延遲隊列缺點

經過以上兩個例子,使用死信隊列來實現延遲隊列,咱們能夠獲得幾個很明顯的缺點:工具

  • 若是有很是多的時間點(好比有的 10 分鐘過時,有的 20 分鐘過時等),則須要建立不一樣的交換機和隊列來實現消息的路由。
  • 單獨設置消息的 TTL 時可能會形成消息的阻塞。由於當前一條消息沒有出隊,後一條消息即便到期了也不能出隊。
  • 消息可能會有必定的延遲(上面的示例中就能夠看到有一點延遲)。

爲了不 TTL 和死信隊列可能形成的問題,因此就很是有必要用一種新的更好的方案來替代實現延遲隊列,這就是延時隊列插件。測試

利用插件實現延遲隊列

RabbitMQ3.5.7 版本以後,提供了一個插件(rabbitmq-delayed-message-exchange)來實現延遲隊列 ,同時需保證 Erlang/OPT 版本爲 18.0 以後。ui

安裝延遲隊列插件

  1. RabbitMQ版本在 3.5.7-3.7.x 的能夠執行如下命令進行下載(也能夠直接經過瀏覽器下載):
wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez

若是 RabbitMQ3.8 以後的版本,能夠點擊這裏,找到延遲隊列對應版本的插件,而後下載。插件

  1. 下載好以後,將插件上傳到 plugins 目錄下,執行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange 命令啓動插件。若是要禁止該插件,則能夠執行命令 rabbitmq-plugins disable rabbitmq_delayed_message_exchange(啓用插件後須要重啓 RabbitMQ 纔會生效)。

延遲隊列插件示例

  1. 新建一個 PluginDelayRabbitConfig 配置類:
@Configuration
public class PluginDelayRabbitConfig {
    @Bean("pluginDelayExchange")
    public CustomExchange pluginDelayExchange() {
        Map<String, Object> argMap = new HashMap<>();
        argMap.put("x-delayed-type", "direct");//必需要配置這個類型,能夠是direct,topic和fanout
        //第二個參數必須爲x-delayed-message
        return new CustomExchange("PLUGIN_DELAY_EXCHANGE","x-delayed-message",false, false, argMap);
    }

    @Bean("pluginDelayQueue")
    public Queue pluginDelayQueue(){
        return new Queue("PLUGIN_DELAY_QUEUE");
    }

    @Bean
    public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue,@Qualifier("pluginDelayExchange") CustomExchange customExchange){
        return BindingBuilder.bind(queue).to(customExchange).with("delay").noargs();
    }
}
  1. 新建一個消費者類 PluginDelayConsumer
@Component
public class PluginDelayConsumer {

    @RabbitHandler
    @RabbitListener(queues = "PLUGIN_DELAY_QUEUE")//監聽延時隊列
    public void fanoutConsumer(String msg){
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        System.out.println("【插件延遲隊列】【" + sdf.format(new Date()) + "】收到消息:" + msg);
    }
}
  1. 在上面示例中的 DelayQueueController 類,新增一個方法:
@GetMapping(value="/plugin/send")
public String pluginMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg){
    MessageProperties messageProperties = new MessageProperties();
    messageProperties.setHeader("x-delay",5000);//延遲5秒被刪除
    Message message = new Message(msg.getBytes(), messageProperties);
    amqpTemplate.convertAndSend("PLUGIN_DELAY_EXCHANGE","delay",message);//交換機和路由鍵必須和配置文件類中保持一致
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    System.out.println("消息發送成功【" + sdf.format(new Date()) + "】");
    return "succ";
}
  1. 接下來就能夠訪問地址 http://localhost:8080/delay/plugin/send?msg=插件延遲隊列消息 進行測試,能夠看到,消息在延時 5 秒以後被消費:

總結

延遲隊列的使用很是普遍,若是是單機部署,能夠考慮使用 jdk 自帶的 DelayQueue,分佈式部署能夠採用 RabbitMQRedis 等中間件來實現延遲隊列。本文主要介紹瞭如何利用 RabbitMQ 實現兩種延遲隊列的兩種方案,固然本文的例子只是引導,並無開啓回調等消息確認模式,若是想了解 RabbitMQ 消息的可靠性傳輸的,能夠點擊這裏

相關文章
相關標籤/搜索