SpringBoot2.3整合RabbitMQ實現延遲消費消息

1.源碼獲取地址

文章末尾有源代碼地址
https://www.sunnyblog.top/detail.html?id=1265257400324063232
本章節主要實現消息的延遲消費,在學習延遲消費以前必須先了解RabbitMQ兩個基本概念,消息的TTL和死信Exchange,經過這二者的組合來實現消息的延遲消費。
不想看原理講解的,直接經過標題6看代碼實現html

2.消息的TTL(Time To Live)

消息的TTL就是消息的存活時間。RabbitMQ能夠對隊列和消息分別設置TTL。對隊列設置就是隊列沒有消費者連着的保留時間,也能夠對每個單獨的消息作單獨的設置。超過了這個時間,咱們認爲這個消息就死了,稱之爲死信。app

3.死信交換器 Dead Letter Exchanges

  • 一個消息在知足以下條件下,會進死信路由,記住這裏是路由而不是隊列,一個路由能夠對應不少隊列。
  • 一個消息被Consumer拒收了,而且reject方法的參數裏requeue是false。也就是說不會被再次放在隊列裏,被其餘消費者使用。
  • 上面的消息的TTL到了,消息過時了
  • 隊列的長度限制滿了。排在前面的消息會被丟棄或者扔到死信路由上。 死信交換器(Dead Letter Exchange)其實就是一種普通的exchange,和建立其餘exchange沒有兩樣。只是在某一個設置Dead Letter Exchange的隊列中有消息過時了,會自動觸發消息的轉發,發送到Dead Letter Exchange中去。

4.實現延遲消費原理

file

  • 大概原理:首先發送消息到死信隊列,死信隊列設置ttl過時時間,到期以後會自動將消息發送到通常隊列實現消息的消費
  • 實現步驟以下
  • 建立死信交換器
  • 建立死信隊列
  • 將死信隊列與死信交換機綁定,不能是任意綁定了,而是要指定一個RoutingKey(路由key)消息的發送方在向 Exchange發送消息時,也必須指定消息的RoutingKey。Exchange再也不把消息交給每個綁定的隊列,而是根據消息的Routing Key進行判斷,只有隊列的Routingkey與消息的Routing key徹底一致,纔會接收到消息.
  • 建立正常交換器
  • 建立正常隊列
  • 將正常隊列綁定到正常交換器

5.基於案例實現消息的延遲消費

這裏咱們以最熟悉的12306購票爲例進行案例場景的分析,12306購票步驟以下:學習

  • 首先登陸12306根據日期和起點站等條件進行搶票下訂單
  • 搶到票訂單處於未支付狀態,並提示支付時間30分鐘內

file

  • 這裏就能夠使用到延時隊列,在下訂單完成的時候將訂單號發送到MQ的死信隊列,並設置30分鐘過時,30分鐘之後死信隊列的數據會轉發到正常隊列,從正常隊列中獲取到下訂單的訂單號,而後咱們根據訂單號查詢訂單的支付狀態,若是已經支付咱們不作任何操做,若是未支付取消訂單,關閉支付狀態,將票回滾到票池供其餘用戶購買

6.代碼實現

  • 在RabbitMQConfig中建立隊列、交換機以及綁定關係測試

    @Configuration
    public class RabbitMQConfig {ui

    /**
         * 測試發送消息到MQ
         * @return
*/
            @Bean
            public Queue testHello() {
                    return new Queue(SysConstant.QUEUE_TEST_HELLO);
            }


            /**
             * 死信交換機
             * @return
             */
            @Bean
            public DirectExchange sysOrderDelayExchange() {
                    return new DirectExchange(SysConstant.SYS_ORDER_DELAY_EXCHANGE);
            }

            /**
             * 死信隊列
             * @return
             */
            @Bean
            public Queue sysOrderDelayQueue() {
                    Map<String, Object> map = new HashMap<String, Object>(16);
                    map.put("x-dead-letter-exchange",SysConstant.SYS_ORDER_RECEIVE_EXCHANGE); //指定死信送往的交換機
                    map.put("x-dead-letter-routing-key", SysConstant.SYS_ORDER_RECEIVE_KEY); //指定死信的routingkey
                    return new Queue(SysConstant.SYS_ORDER_DELAY_QUEUE, true, false, false, map);
            }

            /**
             * 給死信隊列綁定死信交換機
             * @return
             */
            @Bean
            public Binding sysOrderDelayBinding() {
                    return BindingBuilder.bind(sysOrderDelayQueue()).to(sysOrderDelayExchange()).with(SysConstant.SYS_ORDER_DELAY_KEY);
            }

            /**
             * 死信接收交換機,用於接收死信隊列的消息
             * @return
             */
            @Bean
            public DirectExchange sysOrderReceiveExchange() {
                    return new DirectExchange(SysConstant.SYS_ORDER_RECEIVE_EXCHANGE);
            }

            /**
             * 死信接收隊列
             * @return
             */
            @Bean
            public Queue sysOrderReceiveQueue() {
                    return new Queue(SysConstant.SYS_ORDER_RECEIVE_QUEUE);
            }

            /**
             * 死信接收交換機綁定接收死信隊列消費隊列
             * @return
             */
            @Bean
            public Binding sysOrdeReceiveBinding() {
                    return BindingBuilder.bind(sysOrderReceiveQueue()).to(sysOrderReceiveExchange()).with(SysConstant.SYS_ORDER_RECEIVE_KEY);
            }
    }
  • 發送延時消息到死信交換器方法spa

    @Service
    public class MsgService {日誌

    @Autowired
        private RabbitTemplate rabbitTemplate;
        /**
         * 發送延時消息到mq
         * @param exchange 死信交換機
         * @param routeKey 路由key
         * @param data 發送數據
         * @param delayTime 過時時間,單位毫秒
*/
                    public void sendDelayMsgToMQ(String exchange, String routeKey, String data,int delayTime) {
                            rabbitTemplate.convertAndSend(exchange, routeKey, data, message -> {
                                    message.getMessageProperties().setExpiration(delayTime + "");
                                    return message;
                            });
                    }
            }
  • 監聽隊列消息ReceiveMsgListener類code

    /**orm

    * 獲取到的延時消息
        * 這裏接收到消息進行對應的業務處理(例如:取消訂單,關閉支付,回滾庫存等 ...)
        * @param msg
*/
            @RabbitListener(queues = SysConstant.SYS_ORDER_RECEIVE_QUEUE)
            @RabbitHandler
            public void getdelayMsg(String msg) {
                    log.info("MQ接收消息時間:{},消息內容:{}", DateUtil.formatDateTime(DateUtil.date()),msg);
                    log.info("------->這裏實現訂單關閉、支付關閉、回滾庫存業務邏輯...");
            }
  • 建立Controller向隊列發送消息,設置過時時間10秒htm

    @RestController
    @RequestMapping("mq")
    @Slf4j
    public class MQController {

    @Autowired
        private MsgService msgService;
    
        @GetMapping("sendMsg")
        public String sendMsg() {
                log.info("發送延時消息時間:" + DateUtil.formatDateTime(DateUtil.date()));
    
                OrderInfo orderInfo = new OrderInfo();
                orderInfo.setOrderId(IdUtil.fastSimpleUUID());
                orderInfo.setOrderState("待支付");
                orderInfo.setPayMoney(999.88);
                msgService.sendDelayMsgToMQ(SysConstant.SYS_ORDER_DELAY_EXCHANGE,SysConstant.SYS_ORDER_DELAY_KEY, JSONUtil.toJsonStr(orderInfo),10*1000);//1分鐘
                return JSONUtil.toJsonStr("發送延時消息成功");
        }

    }

  • 啓動服務,能夠看到MQ中建立對應的隊列和交換器

file
file

  • 控制檯日誌能夠看到發送消息與消費消息間隔時間是10s

file

7.更多MQ技術文檔獲取

https://www.sunnyblog.top/index.html?tagId=1264009609236971520

詳細開發技術文檔盡在 點擊這裏查看技術文檔 ;更多技術文章: https://www.sunnyblog.top;任何疑問加QQ羣諮詢:534073451
相關文章
相關標籤/搜索