在 jdk
的 juc
工具包中,提供了一種延遲隊列 DelayQueue
。延遲隊列用處很是普遍,好比咱們最多見的場景就是在網購或者外賣平臺中發起一個訂單,若是不付款,通常 15
分鐘後就會被關閉,這個直接用定時任務是很差實現的,由於每一個用戶下單的時間並不肯定,因此這時候就須要用到延遲隊列。html
延遲隊列自己也是隊列,只不過這個隊列是延遲的,意思就是說當咱們把一條消息放入延遲隊列,消息並不會馬上出隊,而是會在到達指定時間以後(或者說過了指定時間)纔會出隊,從而被消費者消費。java
RabbitMQ
中的死信隊列就是用來存儲特定條件下的消息,那麼假如咱們把這個條件設定爲指定時間過時(設定帶TTL
的消息或者隊列),就能夠用來實現延遲隊列的功能。瀏覽器
TtlDelayRabbitConfig
配置類(省略了包名和導入),消息最開始發送至 ttl
消息隊列,這個隊列中全部的消息在 5
秒後過時,後期後會進入死信隊列:@Configuration public class TtlDelayRabbitConfig { //路由ttl消息交換機 @Bean("ttlDelayFanoutExchange") public FanoutExchange fanoutExchange(){ return new FanoutExchange("TTL_DELAY_FANOUT_EXCHANGE"); } //ttl消息隊列 @Bean("ttlDelayQueue") public Queue ttlQueue(){ Map<String, Object> map = new HashMap<String, Object>(); map.put("x-message-ttl", 5000);//隊列中全部消息5秒後過時 map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//過時後進入死信隊列 return new Queue("TTL_QUEUE",false,false,false,map); } //Fanout交換機和productQueue綁定 @Bean public Binding bindTtlFanoutExchange(@Qualifier("ttlDelayQueue") Queue queue, @Qualifier("ttlDelayFanoutExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } //fanout死信交換機 @Bean("ttlDelayDeadLetterExchange") public FanoutExchange deadLetterExchange(){ return new FanoutExchange("TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE"); } //死信隊列 @Bean("ttlDelayDeadLetterQueue") public Queue ttlDelayDeadLetterQueue(){ return new Queue("TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE"); } //死信隊列和死信交換機綁定 @Bean public Binding deadLetterQueueBindExchange(@Qualifier("ttlDelayDeadLetterQueue") Queue queue, @Qualifier("ttlDelayDeadLetterExchange") FanoutExchange fanoutExchange){ return BindingBuilder.bind(queue).to(fanoutExchange); } }
TtlDelayConsumer
類,監聽死信隊列,這裏收到的消息都是生產者生產消息以後的 5
秒,也就是延遲了 5
秒的消息:@Component public class TtlDelayConsumer { @RabbitHandler @RabbitListener(queues = "TTL_DELAY_DEAD_LETTER_FANOUT_QUEUE") public void fanoutConsumer(String msg){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("【延遲隊列】【" + sdf.format(new Date()) + "】收到死信隊列消息:" + msg); } }
DelayQueueController
類作生產者來發送消息:@RestController @RequestMapping("/delay") public class DelayQueueController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping(value="/ttl/send") public String clearVipInfo(@RequestParam(value = "msg",defaultValue = "no message") String msg){ rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",msg); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息發送成功【" + sdf.format(new Date()) + "】"); return "succ"; } }
http://localhost:8080/delay/ttl/send?msg=測試ttl延遲隊列
進行測試,能夠看到每條消息都是在發送 5
秒以後才能收到消息:假如咱們實際中,有的消息是 10
分鐘過時,有的是 20
分鐘過時,這時候咱們就須要創建多個隊列,一旦時間維度很是龐大,那麼就須要維護很是多的隊列。說到這裏,可能不少人會有疑問,咱們能夠針對單條信息設置過時時間,大可沒必要去定義多個隊列?app
然而事實真的是如此嗎?接下來咱們經過一個例子來驗證下。分佈式
TtlDelayRabbitConfig
類中的隊列定義函數 x-message-ttl
屬性去掉,不過須要注意的是咱們須要先把這個隊列後臺刪除掉,不然同名隊列重複建立無效:@Bean("ttlDelayQueue") public Queue ttlQueue(){ Map<String, Object> map = new HashMap<String, Object>(); // map.put("x-message-ttl", 5000);//註釋掉這個屬性,隊列不設置過時時間 map.put("x-dead-letter-exchange", "TTL_DELAY_DEAD_LETTER_FANOUT_EXCHANGE");//過時後進入死信隊列 return new Queue("TTL_QUEUE",false,false,false,map); }
DelayQueueController
類中的發送消息方法修改一下,對每條信息設置過時時間:@GetMapping(value="/ttl/send") public String ttlMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg, @RequestParam(value = "time") String millTimes){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setExpiration(millTimes);//單條消息設置過時時間,單位:毫秒 Message message = new Message(msg.getBytes(), messageProperties); rabbitTemplate.convertAndSend("TTL_DELAY_FANOUT_EXCHANGE","",message); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息發送成功【" + sdf.format(new Date()) + "】"); return "succ"; }
2
條消息發送,一條 10
秒過時,一條 5
秒過時,先發送 10
秒的:http://localhost:8080/delay/ttl/send?msg=10秒過時消息&time=10000 http://localhost:8080/delay/ttl/send?msg=5秒過時消息&time=5000
咱們看到,兩條消息都是 10
秒後過時,這是巧合嗎?並非,這是由於 RabbitMQ
中的機制就是若是前一條消息沒有出隊,那麼即便後一條消息已經失效,也必需要等前一條消息出隊以後才能出隊,因此這就是爲何通常都儘可能避免同一個隊列單條消息設置不一樣過時時間的作法。函數
經過以上兩個例子,使用死信隊列來實現延遲隊列,咱們能夠獲得幾個很明顯的缺點:工具
10
分鐘過時,有的 20
分鐘過時等),則須要建立不一樣的交換機和隊列來實現消息的路由。TTL
時可能會形成消息的阻塞。由於當前一條消息沒有出隊,後一條消息即便到期了也不能出隊。爲了不 TTL
和死信隊列可能形成的問題,因此就很是有必要用一種新的更好的方案來替代實現延遲隊列,這就是延時隊列插件。測試
在 RabbitMQ
的 3.5.7
版本以後,提供了一個插件(rabbitmq-delayed-message-exchange
)來實現延遲隊列 ,同時需保證 Erlang/OPT
版本爲 18.0
以後。ui
RabbitMQ
版本在 3.5.7-3.7.x
的能夠執行如下命令進行下載(也能夠直接經過瀏覽器下載):wget https://bintray.com/rabbitmq/community-plugins/download_file?file_path=rabbitmq_delayed_message_exchange-0.0.1.ez
若是 RabbitMQ
是 3.8
以後的版本,能夠點擊這裏,找到延遲隊列對應版本的插件,而後下載。插件
plugins
目錄下,執行 rabbitmq-plugins enable rabbitmq_delayed_message_exchange
命令啓動插件。若是要禁止該插件,則能夠執行命令 rabbitmq-plugins disable rabbitmq_delayed_message_exchange
(啓用插件後須要重啓 RabbitMQ
纔會生效)。PluginDelayRabbitConfig
配置類:@Configuration public class PluginDelayRabbitConfig { @Bean("pluginDelayExchange") public CustomExchange pluginDelayExchange() { Map<String, Object> argMap = new HashMap<>(); argMap.put("x-delayed-type", "direct");//必需要配置這個類型,能夠是direct,topic和fanout //第二個參數必須爲x-delayed-message return new CustomExchange("PLUGIN_DELAY_EXCHANGE","x-delayed-message",false, false, argMap); } @Bean("pluginDelayQueue") public Queue pluginDelayQueue(){ return new Queue("PLUGIN_DELAY_QUEUE"); } @Bean public Binding pluginDelayBinding(@Qualifier("pluginDelayQueue") Queue queue,@Qualifier("pluginDelayExchange") CustomExchange customExchange){ return BindingBuilder.bind(queue).to(customExchange).with("delay").noargs(); } }
PluginDelayConsumer
:@Component public class PluginDelayConsumer { @RabbitHandler @RabbitListener(queues = "PLUGIN_DELAY_QUEUE")//監聽延時隊列 public void fanoutConsumer(String msg){ SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("【插件延遲隊列】【" + sdf.format(new Date()) + "】收到消息:" + msg); } }
DelayQueueController
類,新增一個方法:@GetMapping(value="/plugin/send") public String pluginMsgSend(@RequestParam(value = "msg",defaultValue = "no message") String msg){ MessageProperties messageProperties = new MessageProperties(); messageProperties.setHeader("x-delay",5000);//延遲5秒被刪除 Message message = new Message(msg.getBytes(), messageProperties); amqpTemplate.convertAndSend("PLUGIN_DELAY_EXCHANGE","delay",message);//交換機和路由鍵必須和配置文件類中保持一致 SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("消息發送成功【" + sdf.format(new Date()) + "】"); return "succ"; }
http://localhost:8080/delay/plugin/send?msg=插件延遲隊列消息
進行測試,能夠看到,消息在延時 5
秒以後被消費:延遲隊列的使用很是普遍,若是是單機部署,能夠考慮使用 jdk
自帶的 DelayQueue
,分佈式部署能夠採用 RabbitMQ
,Redis
等中間件來實現延遲隊列。本文主要介紹瞭如何利用 RabbitMQ
實現兩種延遲隊列的兩種方案,固然本文的例子只是引導,並無開啓回調等消息確認模式,若是想了解 RabbitMQ
消息的可靠性傳輸的,能夠點擊這裏。