SpringBoot實戰電商項目mall(35k+star)地址: https://github.com/macrozheng/mall
RabbitMQ實現延遲消息的方式有兩種,一種是使用死信隊列
實現,另外一種是使用延遲插件
實現。死信隊列
實現咱們之前曾經講過,具體參考《mall整合RabbitMQ實現延遲消息》,此次咱們講個更簡單的,使用延遲插件
實現。html
學習本文須要對RabbitMQ有所瞭解,還不瞭解的朋友能夠看下:《花了3天總結的RabbitMQ實用技巧,有點東西!》java
首先咱們須要下載並安裝RabbitMQ的延遲插件。
rabbitmq_delayed_message_exchange
便可找到咱們須要下載的插件,下載和RabbitMQ配套的版本,不要弄錯;
plugins
目錄下;
sbin
目錄下,使用以下命令啓用延遲插件;rabbitmq-plugins enable rabbitmq_delayed_message_exchange
接下來咱們須要在SpringBoot中實現延遲消息功能,此次依然沿用商品下單的場景。好比說有個用戶下單了,他60分鐘不支付訂單,訂單就會被取消,這就是一個典型的延遲消息使用場景。
pom.xml
文件中添加AMQP
相關依賴;<!--消息隊列相關依賴--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
application.yml
添加RabbitMQ的相關配置;spring: rabbitmq: host: localhost # rabbitmq的鏈接地址 port: 5672 # rabbitmq的鏈接端口號 virtual-host: /mall # rabbitmq的虛擬host username: mall # rabbitmq的用戶名 password: mall # rabbitmq的密碼 publisher-confirms: true #若是對異步消息須要回調必須設置爲true
/** * 消息隊列配置 * Created by macro on 2018/9/14. */ @Configuration public class RabbitMqConfig { /** * 訂單延遲插件消息隊列所綁定的交換機 */ @Bean CustomExchange orderPluginDirect() { //建立一個自定義交換機,能夠發送延遲消息 Map<String, Object> args = new HashMap<>(); args.put("x-delayed-type", "direct"); return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args); } /** * 訂單延遲插件隊列 */ @Bean public Queue orderPluginQueue() { return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName()); } /** * 將訂單延遲插件隊列綁定到交換機 */ @Bean public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) { return BindingBuilder .bind(orderPluginQueue) .to(orderPluginDirect) .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey()) .noargs(); } }
x-delay
頭來設置消息從交換機發送到隊列的延遲時間;/** * 取消訂單消息的發出者 * Created by macro on 2018/9/14. */ @Component public class CancelOrderSender { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class); @Autowired private AmqpTemplate amqpTemplate; public void sendMessage(Long orderId,final long delayTimes){ //給延遲隊列發送消息 amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { //給消息設置延遲毫秒值 message.getMessageProperties().setHeader("x-delay",delayTimes); return message; } }); LOGGER.info("send delay message orderId:{}",orderId); } }
/** * 取消訂單消息的處理者 * Created by macro on 2018/9/14. */ @Component @RabbitListener(queues = "mall.order.cancel.plugin") public class CancelOrderReceiver { private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class); @Autowired private OmsPortalOrderService portalOrderService; @RabbitHandler public void handle(Long orderId){ LOGGER.info("receive delay message orderId:{}",orderId); portalOrderService.cancelOrder(orderId); } }
/** * 前臺訂單管理Service * Created by macro on 2018/8/30. */ @Service public class OmsPortalOrderServiceImpl implements OmsPortalOrderService { private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class); @Autowired private CancelOrderSender cancelOrderSender; @Override public CommonResult generateOrder(OrderParam orderParam) { //todo 執行一系類下單操做,具體參考mall項目 LOGGER.info("process generateOrder"); //下單完成後開啓一個延遲消息,用於當用戶沒有付款時取消訂單(orderId應該在下單後生成) sendDelayMessageCancelOrder(11L); return CommonResult.success(null, "下單成功"); } @Override public void cancelOrder(Long orderId) { //todo 執行一系類取消訂單操做,具體參考mall項目 LOGGER.info("process cancelOrder orderId:{}",orderId); } private void sendDelayMessageCancelOrder(Long orderId) { //獲取訂單超時時間,假設爲60分鐘(測試用的30秒) long delayTimes = 30 * 1000; //發送延遲消息 cancelOrderSender.sendMessage(orderId, delayTimes); } }
30s
,咱們設置的延遲時間。2020-06-08 13:46:01.474 INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl : process generateOrder 2020-06-08 13:46:01.482 INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender : send delay message orderId:11 2020-06-08 13:46:31.517 INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver : receive delay message orderId:11 2020-06-08 13:46:31.520 INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl : process cancelOrder orderId:11
咱們以前使用過死信隊列的方式,這裏咱們把兩種方式作個對比,先來聊下這兩種方式的實現原理。
死信隊列是這樣一個隊列,若是消息發送到該隊列並超過了設置的時間,就會被轉發到設置好的處理超時消息的隊列當中去,利用該特性能夠實現延遲消息。git
經過安裝插件,自定義交換機,讓交換機擁有延遲發送消息的能力,從而實現延遲消息。github
因爲死信隊列方式須要建立兩個交換機(死信隊列交換機+處理隊列交換機)、兩個隊列(死信隊列+處理隊列),而延遲插件方式只需建立一個交換機和一個隊列,因此後者使用起來更簡單。spring
https://github.com/macrozheng...bash
mall項目全套學習教程連載中,關注公衆號第一時間獲取。app