RabbitMQ實現延遲消息竟然如此簡單,整個插件就完事了!

SpringBoot實戰電商項目mall(35k+star)地址: https://github.com/macrozheng/mall

摘要

RabbitMQ實現延遲消息的方式有兩種,一種是使用死信隊列實現,另外一種是使用延遲插件實現。死信隊列實現咱們之前曾經講過,具體參考《mall整合RabbitMQ實現延遲消息》,此次咱們講個更簡單的,使用延遲插件實現。html

學前準備

學習本文須要對RabbitMQ有所瞭解,還不瞭解的朋友能夠看下:《花了3天總結的RabbitMQ實用技巧,有點東西!》java

插件安裝

首先咱們須要下載並安裝RabbitMQ的延遲插件。
  • 去RabbitMQ的官網下載插件,插件地址:https://www.rabbitmq.com/comm...
  • 直接搜索rabbitmq_delayed_message_exchange便可找到咱們須要下載的插件,下載和RabbitMQ配套的版本,不要弄錯;

  • 將插件文件複製到RabbitMQ安裝目錄的plugins目錄下;

  • 進入RabbitMQ安裝目錄的sbin目錄下,使用以下命令啓用延遲插件;
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
  • 啓用插件成功後就能夠看到以下信息,以後從新啓動RabbitMQ服務便可。

實現延遲消息

接下來咱們須要在SpringBoot中實現延遲消息功能,此次依然沿用商品下單的場景。好比說有個用戶下單了,他60分鐘不支付訂單,訂單就會被取消,這就是一個典型的延遲消息使用場景。
  • 首先咱們須要在pom.xml文件中添加AMQP相關依賴;
<!--消息隊列相關依賴-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
  • 以後在application.yml添加RabbitMQ的相關配置;
spring:
  rabbitmq:
    host: localhost # rabbitmq的鏈接地址
    port: 5672 # rabbitmq的鏈接端口號
    virtual-host: /mall # rabbitmq的虛擬host
    username: mall # rabbitmq的用戶名
    password: mall # rabbitmq的密碼
    publisher-confirms: true #若是對異步消息須要回調必須設置爲true
  • 接下來建立RabbitMQ的Java配置,主要用於配置交換機、隊列和綁定關係;
/**
 * 消息隊列配置
 * Created by macro on 2018/9/14.
 */
@Configuration
public class RabbitMqConfig {

    /**
     * 訂單延遲插件消息隊列所綁定的交換機
     */
    @Bean
    CustomExchange  orderPluginDirect() {
        //建立一個自定義交換機,能夠發送延遲消息
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), "x-delayed-message",true, false,args);
    }

    /**
     * 訂單延遲插件隊列
     */
    @Bean
    public Queue orderPluginQueue() {
        return new Queue(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getName());
    }

    /**
     * 將訂單延遲插件隊列綁定到交換機
     */
    @Bean
    public Binding orderPluginBinding(CustomExchange orderPluginDirect,Queue orderPluginQueue) {
        return BindingBuilder
                .bind(orderPluginQueue)
                .to(orderPluginDirect)
                .with(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey())
                .noargs();
    }

}
  • 建立一個取消訂單消息的發出者,經過給消息設置x-delay頭來設置消息從交換機發送到隊列的延遲時間;
/**
 * 取消訂單消息的發出者
 * Created by macro on 2018/9/14.
 */
@Component
public class CancelOrderSender {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderSender.class);
    @Autowired
    private AmqpTemplate amqpTemplate;

    public void sendMessage(Long orderId,final long delayTimes){
        //給延遲隊列發送消息
        amqpTemplate.convertAndSend(QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getExchange(), QueueEnum.QUEUE_ORDER_PLUGIN_CANCEL.getRouteKey(), orderId, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                //給消息設置延遲毫秒值
                message.getMessageProperties().setHeader("x-delay",delayTimes);
                return message;
            }
        });
        LOGGER.info("send delay message orderId:{}",orderId);
    }
}
  • 建立一個取消訂單消息的接收者,用於處理訂單延遲插件隊列中的消息。
/**
 * 取消訂單消息的處理者
 * Created by macro on 2018/9/14.
 */
@Component
@RabbitListener(queues = "mall.order.cancel.plugin")
public class CancelOrderReceiver {
    private static Logger LOGGER =LoggerFactory.getLogger(CancelOrderReceiver.class);
    @Autowired
    private OmsPortalOrderService portalOrderService;
    @RabbitHandler
    public void handle(Long orderId){
        LOGGER.info("receive delay message orderId:{}",orderId);
        portalOrderService.cancelOrder(orderId);
    }
}
  • 而後在咱們的訂單業務實現類中添加以下邏輯,當下單成功以前,往消息隊列中發送一個取消訂單的延遲消息,這樣若是訂單沒有被支付的話,就能取消訂單了;
/**
 * 前臺訂單管理Service
 * Created by macro on 2018/8/30.
 */
@Service
public class OmsPortalOrderServiceImpl implements OmsPortalOrderService {
    private static Logger LOGGER = LoggerFactory.getLogger(OmsPortalOrderServiceImpl.class);
    @Autowired
    private CancelOrderSender cancelOrderSender;

    @Override
    public CommonResult generateOrder(OrderParam orderParam) {
        //todo 執行一系類下單操做,具體參考mall項目
        LOGGER.info("process generateOrder");
        //下單完成後開啓一個延遲消息,用於當用戶沒有付款時取消訂單(orderId應該在下單後生成)
        sendDelayMessageCancelOrder(11L);
        return CommonResult.success(null, "下單成功");
    }

    @Override
    public void cancelOrder(Long orderId) {
        //todo 執行一系類取消訂單操做,具體參考mall項目
        LOGGER.info("process cancelOrder orderId:{}",orderId);
    }

    private void sendDelayMessageCancelOrder(Long orderId) {
        //獲取訂單超時時間,假設爲60分鐘(測試用的30秒)
        long delayTimes = 30 * 1000;
        //發送延遲消息
        cancelOrderSender.sendMessage(orderId, delayTimes);
    }

}
  • 啓動項目後,在Swagger中調用下單接口;

  • 調用完成後查看控制檯日誌能夠發現,從消息發送和消息接收處理正好相差了30s,咱們設置的延遲時間。
2020-06-08 13:46:01.474  INFO 1644 --- [nio-8080-exec-1] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process generateOrder
2020-06-08 13:46:01.482  INFO 1644 --- [nio-8080-exec-1] c.m.m.tiny.component.CancelOrderSender   : send delay message orderId:11
2020-06-08 13:46:31.517  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.component.CancelOrderReceiver    : receive delay message orderId:11
2020-06-08 13:46:31.520  INFO 1644 --- [cTaskExecutor-4] c.m.m.t.s.i.OmsPortalOrderServiceImpl    : process cancelOrder orderId:11

兩種實現方式對比

咱們以前使用過死信隊列的方式,這裏咱們把兩種方式作個對比,先來聊下這兩種方式的實現原理。

死信隊列

死信隊列是這樣一個隊列,若是消息發送到該隊列並超過了設置的時間,就會被轉發到設置好的處理超時消息的隊列當中去,利用該特性能夠實現延遲消息。git

延遲插件

經過安裝插件,自定義交換機,讓交換機擁有延遲發送消息的能力,從而實現延遲消息。github

結論

因爲死信隊列方式須要建立兩個交換機(死信隊列交換機+處理隊列交換機)、兩個隊列(死信隊列+處理隊列),而延遲插件方式只需建立一個交換機和一個隊列,因此後者使用起來更簡單。spring

項目源碼地址

https://github.com/macrozheng...bash

公衆號

mall項目全套學習教程連載中,關注公衆號第一時間獲取。app

公衆號圖片

相關文章
相關標籤/搜索