四種策略確保 RabbitMQ 消息發送可靠性!你用哪一種?

@[toc] 微服務能夠設計成消息驅動的微服務,響應式系統也能夠基於消息中間件來作,從這個角度來講,在互聯網應用開發中,消息中間件真的是過重要了。java

今天,以 RabbitMQ 爲例,鬆哥來和你們聊一聊消息中間消息發送可靠性的問題。git

注意,如下內容我主要和你們討論如何確保消息生產者將消息發送成功,並不涉及消息消費的問題。github

1. RabbitMQ 消息發送機制

你們知道,RabbitMQ 中的消息發送引入了 Exchange(交換機)的概念,消息的發送首先到達交換機上,而後再根據既定的路由規則,由交換機將消息路由到不一樣的 Queue(隊列)中,再由不一樣的消費者去消費。spring

大體的流程就是這樣,因此要確保消息發送的可靠性,主要從兩方面去確認:數據庫

  1. 消息成功到達 Exchange
  2. 消息成功到達 Queue

若是能確認這兩步,那麼咱們就能夠認爲消息發送成功了。markdown

若是這兩步中任一步驟出現問題,那麼消息就沒有成功送達,此時咱們可能要經過重試等方式去從新發送消息,屢次重試以後,若是消息仍是不能到達,則可能就須要人工介入了。併發

通過上面的分析,咱們能夠確認,要確保消息成功發送,咱們只須要作好三件事就能夠了:app

  1. 確認消息到達 Exchange。
  2. 確認消息到達 Queue。
  3. 開啓定時任務,定時投遞那些發送失敗的消息。

2. RabbitMQ 的努力

上面提出的三個步驟,第三步須要咱們本身實現,前兩步 RabbitMQ 則有現成的解決方案。dom

如何確保消息成功到達 RabbitMQ?RabbitMQ 給出了兩種方案:ide

  1. 開啓事務機制
  2. 發送方確認機制

這是兩種不一樣的方案,不能夠同時開啓,只能選擇其中之一,若是二者同時開啓,則會報以下錯誤:

咱們分別來看。如下全部案例都在 Spring Boot 中展開,文末能夠下載相關源碼。

2.1 開啓事務機制

Spring Boot 中開啓 RabbitMQ 事務機制的方式以下:

首先須要先提供一個事務管理器,以下:

@Bean
RabbitTransactionManager transactionManager(ConnectionFactory connectionFactory) {
    return new RabbitTransactionManager(connectionFactory);
}
複製代碼

接下來,在消息生產者上面作兩件事:添加事務註解並設置通訊信道爲事務模式:

@Service
public class MsgService {
    @Autowired
    RabbitTemplate rabbitTemplate;

    @Transactional
    public void send() {
        rabbitTemplate.setChannelTransacted(true);
        rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes());
        int i = 1 / 0;
    }
}
複製代碼

這裏注意兩點:

  1. 發送消息的方法上添加 @Transactional 註解標記事務。
  2. 調用 setChannelTransacted 方法設置爲 true 開啓事務模式。

這就 OK 了。

在上面的案例中,咱們在結尾來了個 1/0 ,這在運行時必然拋出異常,咱們能夠嘗試運行該方法,發現消息並未發送成功。

當咱們開啓事務模式以後,RabbitMQ 生產者發送消息會多出四個步驟:

  1. 客戶端發出請求,將信道設置爲事務模式。
  2. 服務端給出回覆,贊成將信道設置爲事務模式。
  3. 客戶端發送消息。
  4. 客戶端提交事務。
  5. 服務端給出響應,確認事務提交。

上面的步驟,除了第三步是原本就有的,其餘幾個步驟都是無緣無故多出來的。因此你們看到,事務模式其實效率有點低,這並不是一個最佳解決方案。咱們能夠想一想,什麼項目會用到消息中間件?通常來講都是一些高併發的項目,這個時候併發性能尤其重要。

因此,RabbitMQ 還提供了發送方確認機制(publisher confirm)來確保消息發送成功,這種方式,性能要遠遠高於事務模式,一塊兒來看下。

2.2 發送方確認機制

2.2.1 單條消息處理

首先咱們移除剛剛關於事務的代碼,而後在 application.properties 中配置開啓消息發送方確認機制,以下:

spring.rabbitmq.publisher-confirm-type=correlated
spring.rabbitmq.publisher-returns=true
複製代碼

第一行是配置消息到達交換器的確認回調,第二行則是配置消息到達隊列的回調。

第一行屬性的配置有三個取值:

  1. none:表示禁用發佈確認模式,默認即此。
  2. correlated:表示成功發佈消息到交換器後會觸發的回調方法。
  3. simple:相似 correlated,而且支持 waitForConfirms()waitForConfirmsOrDie() 方法的調用。

接下來咱們要開啓兩個監聽,具體配置以下:

@Configuration
public class RabbitConfig implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnsCallback {
    public static final String JAVABOY_EXCHANGE_NAME = "javaboy_exchange_name";
    public static final String JAVABOY_QUEUE_NAME = "javaboy_queue_name";
    private static final Logger logger = LoggerFactory.getLogger(RabbitConfig.class);
    @Autowired
    RabbitTemplate rabbitTemplate;
    @Bean
    Queue queue() {
        return new Queue(JAVABOY_QUEUE_NAME);
    }
    @Bean
    DirectExchange directExchange() {
        return new DirectExchange(JAVABOY_EXCHANGE_NAME);
    }
    @Bean
    Binding binding() {
        return BindingBuilder.bind(queue())
                .to(directExchange())
                .with(JAVABOY_QUEUE_NAME);
    }

    @PostConstruct
    public void initRabbitTemplate() {
        rabbitTemplate.setConfirmCallback(this);
        rabbitTemplate.setReturnsCallback(this);
    }

    @Override
    public void confirm(CorrelationData correlationData, boolean ack, String cause) {
        if (ack) {
            logger.info("{}:消息成功到達交換器",correlationData.getId());
        }else{
            logger.error("{}:消息發送失敗", correlationData.getId());
        }
    }

    @Override
    public void returnedMessage(ReturnedMessage returned) {
        logger.error("{}:消息未成功路由到隊列",returned.getMessage().getMessageProperties().getMessageId());
    }
}
複製代碼

關於這個配置類,我說以下幾點:

  1. 定義配置類,實現 RabbitTemplate.ConfirmCallbackRabbitTemplate.ReturnsCallback 兩個接口,這兩個接口,前者的回調用來肯定消息到達交換器,後者則會在消息路由到隊列失敗時被調用。
  2. 定義 initRabbitTemplate 方法並添加 @PostConstruct 註解,在該方法中爲 rabbitTemplate 分別配置這兩個 Callback。

這就能夠了。

接下來咱們對消息發送進行測試。

首先咱們嘗試將消息發送到一個不存在的交換機中,像下面這樣:

rabbitTemplate.convertAndSend("RabbitConfig.JAVABOY_EXCHANGE_NAME",RabbitConfig.JAVABOY_QUEUE_NAME,"hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
複製代碼

注意第一個參數是一個字符串,不是變量,這個交換器並不存在,此時控制檯會報以下錯誤:

接下來咱們給定一個真實存在的交換器,可是給一個不存在的隊列,像下面這樣:

rabbitTemplate.convertAndSend(RabbitConfig.JAVABOY_EXCHANGE_NAME,"RabbitConfig.JAVABOY_QUEUE_NAME","hello rabbitmq!".getBytes(),new CorrelationData(UUID.randomUUID().toString()));
複製代碼

注意此時第二個參數是一個字符串,不是變量。

能夠看到,消息雖然成功達到交換器了,可是沒有成功路由到隊列(由於隊列不存在)。

這是一條消息的發送,咱們再來看看消息的批量發送。

2.2.2 消息批量處理

若是是消息批量處理,那麼發送成功的回調監聽是同樣的,這裏再也不贅述。

這就是 publisher-confirm 模式。

相比於事務,這種模式下的消息吞吐量會獲得極大的提高。

3. 失敗重試

失敗重試分兩種狀況,一種是壓根沒找到 MQ 致使的失敗重試,另外一種是找到 MQ 了,可是消息發送失敗了。

兩種重試咱們分別來看。

3.1 自帶重試機制

前面所說的事務機制和發送方確認機制,都是發送方確認消息發送成功的辦法。若是發送方一開始就連不上 MQ,那麼 Spring Boot 中也有相應的重試機制,可是這個重試機制就和 MQ 自己沒有關係了,這是利用 Spring 中的 retry 機制來完成的,具體配置以下:

spring.rabbitmq.template.retry.enabled=true
spring.rabbitmq.template.retry.initial-interval=1000ms
spring.rabbitmq.template.retry.max-attempts=10
spring.rabbitmq.template.retry.max-interval=10000ms
spring.rabbitmq.template.retry.multiplier=2
複製代碼

從上往下配置含義依次是:

  • 開啓重試機制。
  • 重試起始間隔時間。
  • 最大重試次數。
  • 最大重試間隔時間。
  • 間隔時間乘數。(這裏配置間隔時間乘數爲 2,則第一次間隔時間 1 秒,第二次重試間隔時間 2 秒,第三次 4 秒,以此類推)

配置完成後,再次啓動 Spring Boot 項目,而後關掉 MQ,此時嘗試發送消息,就會發送失敗,進而致使自動重試。

3.2 業務重試

業務重試主要是針對消息沒有到達交換器的狀況。

若是消息沒有成功到達交換器,根據咱們第二小節的講解,此時就會觸發消息發送失敗回調,在這個回調中,咱們就能夠作文章了!

總體思路是這樣:

  1. 首先建立一張表,用來記錄發送到中間件上的消息,像下面這樣:

每次發送消息的時候,就往數據庫中添加一條記錄。這裏的字段都很好理解,有三個我額外說下:

  • status:表示消息的狀態,有三個取值,0,1,2 分別表示消息發送中、消息發送成功以及消息發送失敗。
  • tryTime:表示消息的第一次重試時間(消息發出去以後,在 tryTime 這個時間點還未顯示發送成功,此時就能夠開始重試了)。
  • count:表示消息重試次數。

其餘字段都很好理解,我就不一一囉嗦了。

  1. 在消息發送的時候,咱們就往該表中保存一條消息發送記錄,並設置狀態 status 爲 0,tryTime 爲 1 分鐘以後。
  2. 在 confirm 回調方法中,若是收到消息發送成功的回調,就將該條消息的 status 設置爲1(在消息發送時爲消息設置 msgId,在消息發送成功回調時,經過 msgId 來惟一鎖定該條消息)。
  3. 另外開啓一個定時任務,定時任務每隔 10s 就去數據庫中撈一次消息,專門去撈那些 status 爲 0 而且已通過了 tryTime 時間記錄,把這些消息拎出來後,首先判斷其重試次數是否已超過 3 次,若是超過 3 次,則修改該條消息的 status 爲 2,表示這條消息發送失敗,而且再也不重試。對於重試次數沒有超過 3 次的記錄,則從新去發送消息,而且爲其 count 的值+1。

大體的思路就是上面這樣,鬆哥這裏就不給出代碼了,鬆哥的 vhr 裏邊郵件發送就是這樣的思路來處理的,完整代碼你們能夠參考 vhr 項目(github.com/lenve/vhr)。

固然這種思路有兩個弊端:

  1. 去數據庫走一遭,可能拖慢 MQ 的 Qos,不過有的時候咱們並不須要 MQ 有很高的 Qos,因此這個應用時要看具體狀況。
  2. 按照上面的思路,可能會出現同一條消息重複發送的狀況,不過這都不是事,咱們在消息消費時,解決好冪等性問題就好了。

固然,你們也要注意,消息是否要確保 100% 發送成功,也要看具體狀況。

4. 小結

好啦,這就是關於消息生產者的一些常見問題以及對應的解決方案,下篇文章鬆哥和你們探討若是保證消息消費成功並解決冪等性問題。

本文涉及到的相關源代碼你們能夠在這裏下載:github.com/lenve/javab…

相關文章
相關標籤/搜索