延遲隊列的使用場景:1.未按時支付的訂單,30分鐘過時以後取消訂單;2.給活躍度比較低的用戶間隔N天以後推送消息,提升活躍度;3.過1分鐘給新註冊會員的用戶,發送註冊郵件等。html
實現延遲隊列的方式有兩種:java
注意: 延遲插件rabbitmq-delayed-message-exchange是在RabbitMQ 3.5.7及以上的版本才支持的,依賴Erlang/OPT 18.0及以上運行環境。git
因爲使用死信交換器相對曲折,本文重點介紹第二種方式,使用rabbitmq-delayed-message-exchange插件完成延遲隊列的功能。github
打開官網下載:http://www.rabbitmq.com/community-plugins.htmlspring
選擇相應的對應的版本「3.7.x」點擊下載。docker
注意: 下載的是.zip的安裝包,下載完以後須要手動解壓。api
拷貝插件到Docker:springboot
docker cp D:\rabbitmq_delayed_message_exchange-20171201-3.7.x.ez rabbit:/pluginsbash
RabbitMQ在Docker的安裝,請參照本系列的上一篇文章:http://www.apigo.cn/2018/09/11/springboot13/ide
進入docker內部:
docker exec -it rabbit /bin/bash
開啓插件:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
查詢安裝的全部插件:
rabbitmq-plugins list
安裝正常,效果以下圖:
重啓RabbitMQ,使插件生效
docker restart rabbit
import com.example.rabbitmq.mq.DirectConfig; import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; @Configuration public class DelayedConfig { final static String QUEUE_NAME = "delayed.goods.order"; final static String EXCHANGE_NAME = "delayedec"; @Bean public Queue queue() { return new Queue(DelayedConfig.QUEUE_NAME); } // 配置默認的交換機 @Bean CustomExchange customExchange() { Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); //參數二爲類型:必須是x-delayed-message return new CustomExchange(DelayedConfig.EXCHANGE_NAME, "x-delayed-message", true, false, args); } // 綁定隊列到交換器 @Bean Binding binding(Queue queue, CustomExchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DelayedConfig.QUEUE_NAME).noargs(); } }
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component public class DelayedSender { @Autowired private AmqpTemplate rabbitTemplate; public void send(String msg) { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("發送時間:" + sf.format(new Date())); rabbitTemplate.convertAndSend(DelayedConfig.EXCHANGE_NAME, DelayedConfig.QUEUE_NAME, msg, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay", 3000); return message; } }); } }
import org.springframework.amqp.rabbit.annotation.RabbitHandler; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; @Component @RabbitListener(queues = "delayed.goods.order") public class DelayedReceiver { @RabbitHandler public void process(String msg) { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("接收時間:" + sdf.format(new Date())); System.out.println("消息內容:" + msg); } }
import com.example.rabbitmq.RabbitmqApplication; import com.example.rabbitmq.mq.delayed.DelayedSender; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner; import java.text.SimpleDateFormat; import java.util.Date; @RunWith(SpringRunner.class) @SpringBootTest public class DelayedTest { @Autowired private DelayedSender sender; @Test public void Test() throws InterruptedException { SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd"); sender.send("Hi Admin."); Thread.sleep(5 * 1000); //等待接收程序執行以後,再退出測試 } }
執行結果以下:
發送時間:2018-09-11 20:47:51 接收時間:2018-09-11 20:47:54 消息內容:Hi Admin.
完整代碼訪問個人GitHub:https://github.com/vipstone/springboot-example/tree/master/springboot-rabbitmq
到此爲止咱們已經使用「rabbitmq-delayed-message-exchange」插件實現了延遲功能,可是須要注意的一點是,若是使用命令「rabbitmq-plugins disable rabbitmq_delayed_message_exchange」禁用了延遲插件,那麼全部未發送的延遲消息都將丟失。