springboot + rabbitmq 用了消息確認機制,感受掉坑裏了

本文收錄在我的博客: www.chengxy-nds.top,技術資源共享,一塊兒進步

最近部門號召大夥多組織一些技術分享會,說是要活躍公司的技術氛圍,但早就看穿一切的我知道,這 T M 就是爲了刷KPI。不過,話說回來這的確是件好事,與其開那些沒味的扯皮會,多作技術交流仍是頗有助於我的成長的。javascript

因而乎我主動報名參加了分享,咳咳咳~ ,真的不是爲了那點KPI,就是想和大夥一塊兒學習學習!java

在這裏插入圖片描述

此次我分享的是 springboot + rabbitmq 如何實現消息確認機制,以及在實際開發中的一點踩坑經驗,其實總體的內容比較簡單,有時候事情就是這麼神奇,越是簡單的東西就越容易出錯。git

能夠看到使用了 RabbitMQ 之後,咱們的業務鏈路明顯變長了,雖然作到了系統間的解耦,但可能形成消息丟失的場景也增長了。例如:程序員

  • 消息生產者 - > rabbitmq服務器(消息發送失敗)
  • rabbitmq服務器自身故障致使消息丟失
  • 消息消費者 - > rabbitmq服務(消費消息失敗)

在這裏插入圖片描述
因此說能不使用中間件就儘可能不要用,若是爲了用而用只會徒增煩惱。開啓消息確認機制之後,儘管很大程度上保證了消息的準確送達,但因爲頻繁的確認交互,rabbitmq 總體效率變低,吞吐量降低嚴重,不是很是重要的消息真心不建議你用消息確認機制。github


下邊咱們先來實現springboot + rabbitmq消息確認機制,再對遇到的問題作具體分析。redis

1、準備環境

一、引入 rabbitmq 依賴包

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

二、修改 application.properties 配置

配置中須要開啓 發送端消費端 的消息確認。spring

spring.rabbitmq.host=127.0.0.1
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

# 發送者開啓 confirm 確認機制
spring.rabbitmq.publisher-confirms=true
# 發送者開啓 return 確認機制
spring.rabbitmq.publisher-returns=true
####################################################
# 設置消費端手動 ack
spring.rabbitmq.listener.simple.acknowledge-mode=manual
# 是否支持重試
spring.rabbitmq.listener.simple.retry.enabled=true

三、定義 Exchange 和 Queue

定義交換機 confirmTestExchange 和隊列 confirm_test_queue ,並將隊列綁定在交換機上。springboot

@Configuration
public class QueueConfig {

    @Bean(name = "confirmTestQueue")
    public Queue confirmTestQueue() {
        return new Queue("confirm_test_queue", true, false, false);
    }

    @Bean(name = "confirmTestExchange")
    public FanoutExchange confirmTestExchange() {
        return new FanoutExchange("confirmTestExchange");
    }

    @Bean
    public Binding confirmTestFanoutExchangeAndQueue(
            @Qualifier("confirmTestExchange") FanoutExchange confirmTestExchange,
            @Qualifier("confirmTestQueue") Queue confirmTestQueue) {
        return BindingBuilder.bind(confirmTestQueue).to(confirmTestExchange);
    }
}
rabbitmq 的消息確認分爲兩部分:發送消息確認 和 消息接收確認。

在這裏插入圖片描述

2、消息發送確認

發送消息確認:用來確認生產者 producer 將消息發送到 brokerbroker 上的交換機 exchange 再投遞給隊列 queue的過程當中,消息是否成功投遞。服務器

消息從 producerrabbitmq broker有一個 confirmCallback 確認模式。app

消息從 exchangequeue 投遞失敗有一個 returnCallback 退回模式。

咱們能夠利用這兩個Callback來確保消的100%送達。

一、 ConfirmCallback確認模式

消息只要被 rabbitmq broker 接收到就會觸發 confirmCallback 回調 。

@Slf4j
@Component
public class ConfirmCallbackService implements RabbitTemplate.ConfirmCallback {
    
    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {

        if (!ack) {
            log.error("消息發送異常!");
        } else {
            log.info("發送者爸爸已經收到確認,correlationData={} ,ack={}, cause={}", correlationData.getId(), ack, cause);
        }
    }
}

實現接口 ConfirmCallback ,重寫其confirm()方法,方法內有三個參數correlationDataackcause

  • correlationData:對象內部只有一個 id 屬性,用來表示當前消息的惟一性。
  • ack:消息投遞到broker 的狀態,true表示成功。
  • cause:表示投遞失敗的緣由。

但消息被 broker 接收到只能表示已經到達 MQ服務器,並不能保證消息必定會被投遞到目標 queue 裏。因此接下來須要用到 returnCallback

二、 ReturnCallback 退回模式

若是消息未能投遞到目標 queue 裏將觸發回調 returnCallback ,一旦向 queue 投遞消息未成功,這裏通常會記錄下當前消息的詳細投遞數據,方便後續作重發或者補償等操做。

@Slf4j
@Component
public class ReturnCallbackService implements RabbitTemplate.ReturnCallback {

    @Override
    public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
        log.info("returnedMessage ===> replyCode={} ,replyText={} ,exchange={} ,routingKey={}", replyCode, replyText, exchange, routingKey);
    }
}

實現接口ReturnCallback,重寫 returnedMessage() 方法,方法有五個參數message(消息體)、replyCode(響應code)、replyText(響應內容)、exchange(交換機)、routingKey(隊列)。

下邊是具體的消息發送,在rabbitTemplate中設置 ConfirmReturn 回調,咱們經過setDeliveryMode()對消息作持久化處理,爲了後續測試建立一個 CorrelationData對象,添加一個id10000000000

@Autowired
    private RabbitTemplate rabbitTemplate;

    @Autowired
    private ConfirmCallbackService confirmCallbackService;

    @Autowired
    private ReturnCallbackService returnCallbackService;

    public void sendMessage(String exchange, String routingKey, Object msg) {

        /**
         * 確保消息發送失敗後能夠從新返回到隊列中
         * 注意:yml須要配置 publisher-returns: true
         */
        rabbitTemplate.setMandatory(true);

        /**
         * 消費者確認收到消息後,手動ack回執回調處理
         */
        rabbitTemplate.setConfirmCallback(confirmCallbackService);

        /**
         * 消息投遞到隊列失敗回調處理
         */
        rabbitTemplate.setReturnCallback(returnCallbackService);

        /**
         * 發送消息
         */
        rabbitTemplate.convertAndSend(exchange, routingKey, msg,
                message -> {
                    message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
                    return message;
                },
                new CorrelationData(UUID.randomUUID().toString()));
    }

3、消息接收確認

消息接收確認要比消息發送確認簡單一點,由於只有一個消息回執(ack)的過程。使用@RabbitHandler註解標註的方法要增長 channel(信道)、message 兩個參數。

@Slf4j
@Component
@RabbitListener(queues = "confirm_test_queue")
public class ReceiverMessage1 {
    
    @RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("小富收到消息:{}", msg);

            //TODO 具體業務
            
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        }  catch (Exception e) {
            
            if (message.getMessageProperties().getRedelivered()) {
                
                log.error("消息已重複處理失敗,拒絕再次接收...");
                
                channel.basicReject(message.getMessageProperties().getDeliveryTag(), false); // 拒絕消息
            } else {
                
                log.error("消息即將再次返回隊列處理...");
                
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); 
            }
        }
    }
}

消費消息有三種回執方法,咱們來分析一下每種方法的含義。

一、basicAck

basicAck:表示成功確認,使用此回執方法後,消息會被rabbitmq broker 刪除。

void basicAck(long deliveryTag, boolean multiple)

deliveryTag:表示消息投遞序號,每次消費消息或者消息從新投遞後,deliveryTag都會增長。手動消息確認模式下,咱們能夠對指定deliveryTag的消息進行acknackreject等操做。

multiple:是否批量確認,值爲 true 則會一次性 ack全部小於當前消息 deliveryTag 的消息。

舉個栗子: 假設我先發送三條消息deliveryTag分別是五、六、7,可它們都沒有被確認,當我發第四條消息此時deliveryTag爲8,multiple設置爲 true,會將五、六、七、8的消息所有進行確認。

二、basicNack

basicNack :表示失敗確認,通常在消費消息業務異常時用到此方法,能夠將消息從新投遞入隊列。

void basicNack(long deliveryTag, boolean multiple, boolean requeue)

deliveryTag:表示消息投遞序號。

multiple:是否批量確認。

requeue:值爲 true 消息將從新入隊列。

三、basicReject

basicReject:拒絕消息,與basicNack區別在於不能進行批量操做,其餘用法很類似。

void basicReject(long deliveryTag, boolean requeue)

deliveryTag:表示消息投遞序號。

requeue:值爲 true 消息將從新入隊列。

4、測試

發送消息測試一下消息確認機制是否生效,從執行結果上看發送者發消息後成功回調,消費端成功的消費了消息。
在這裏插入圖片描述
用抓包工具Wireshark 觀察一下rabbitmq amqp協議交互的變化,也多了 ack 的過程。
在這裏插入圖片描述

5、踩坑日誌

一、不消息確認

這是一個很是沒技術含量的坑,但倒是很是容易犯錯的地方。

開啓消息確認機制,消費消息別忘了channel.basicAck,不然消息會一直存在,致使重複消費。
在這裏插入圖片描述

二、消息無限投遞

在我最開始接觸消息確認機制的時候,消費端代碼就像下邊這樣寫的,思路很簡單:處理完業務邏輯後確認消息, int a = 1 / 0 發生異常後將消息從新投入隊列。

@RabbitHandler
    public void processHandler(String msg, Channel channel, Message message) throws IOException {

        try {
            log.info("消費者 2 號收到:{}", msg);

            int a = 1 / 0;

            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

        } catch (Exception e) {

            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
        }
    }

可是有個問題是,業務代碼一旦出現 bug 99.9%的狀況是不會自動修復,一條消息會被無限投遞進隊列,消費端無限執行,致使了死循環。

在這裏插入圖片描述

本地的CPU被瞬間打滿了,你們能夠想象一下當時在生產環境致使服務死機,我是有多慌。

在這裏插入圖片描述
並且rabbitmq management 只有一條未被確認的消息。

在這裏插入圖片描述

通過測試分析發現,當消息從新投遞到消息隊列時,這條消息不會回到隊列尾部,還是在隊列頭部。

消費者會馬上消費這條消息,業務處理再拋出異常,消息再從新入隊,如此反覆進行。致使消息隊列處理出現阻塞,致使正常消息也沒法運行。

而咱們當時的解決方案是,先將消息進行應答,此時消息隊列會刪除該條消息,同時咱們再次發送該消息到消息隊列,異常消息就放在了消息隊列尾部,這樣既保證消息不會丟失,又保證了正常業務的進行。

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
// 從新發送消息到隊尾
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),
                    message.getMessageProperties().getReceivedRoutingKey(), MessageProperties.PERSISTENT_TEXT_PLAIN,
                    JSON.toJSONBytes(msg));

但這種方法並無解決根本問題,錯誤消息仍是會時不時報錯,後面優化設置了消息重試次數,達到了重試上限之後,手動確認,隊列刪除此消息,並將消息持久化入MySQL並推送報警,進行人工處理和定時任務作補償。

三、重複消費

如何保證 MQ 的消費是冪等性,這個須要根據具體業務而定,能夠藉助MySQL、或者redis 將消息持久化,經過再消息中的惟一性屬性校驗。

demoGitHub 地址 https://github.com/chengxy-nd...

原創不易,燃燒秀髮輸出內容,若是有一丟丟收穫,點個贊鼓勵一下吧!

習慣在VX看技術文章,想要獲取更多Java資源的同窗,能夠關注個人公衆號:程序員內點事,暗號:[666]
相關文章
相關標籤/搜索