RabbitMQ 之消息的可靠性投遞

默認狀況下,消息發送端發送消息給 RabbitMQ 後,RabbitMQ 是不會返回任何信息的。 那麼咱們怎麼知道消息是中途丟失了仍是到達了 broker 呢?spring

RabbitMQ 提供了兩種確認消息是否投遞成功的方法bash

  • 設置 channel 爲 transaction 模式,經過 AMQP 事務機制實現,這也是 AMQP 協議層面提供的解決方案
  • 設置 channel 爲 confirm 模式,這是 RabbitMQ 提供的解決方案

兩種模式不能共存 服務器

channel 的 transaction 模式

RabbitMQ 中與事務機制有關的方法有三個網絡

  • txSelect:用於將當前 channel 設置成 transaction 模式
  • txCommit:用於提交事務
  • txRollback:用於回滾事務

在經過 txSelect 開啓事務以後,咱們即可以發佈消息給broker代理服務器了,若是txCommit提交成功了,則消息必定到達了broker了,若是在 txCommit 執行以前 broker 異常崩潰或者因爲其餘緣由拋出異常,這個時候咱們即可以捕獲異常經過 txRollback 回滾事務了框架

事務確實可以解決 producer 與 broker 之間消息確認的問題,只有消息成功被 broker 接收,事務提交才能成功,不然咱們即可以在捕獲異常進行事務回滾操做,同時進行消息重發,可是使用事務機制的話會下降RabbitMQ的性能。異步

RabbitMQ 提供了一個更好的方案,使用 channel 信道的 confirm 模式。ide

channel 的 confirm 模式

生產者經過調用 channel 的 confirmSelect 方法將 channel 設置爲 confirm 模式. 該模式下,全部在該信道上發佈的消息都會被分派一個惟一的ID(從1開始),當消息被投遞到全部匹配的隊列後,broker 就會發送一個(包含消息的惟一 ID 的)確認給發送端, 若是 RabbitMQ 由於自身內部錯誤致使消息丟失,就會發送一條nack消息,發送端的 Confirm Listener 會去監聽應答post

broker回傳給發送端的確認消息中 deliver-tag 域包含了確認消息的ID,此外 broker 也能夠設置 basic.ack 的 multiple 域,表示到這個ID以前的全部消息都已經獲得了處理性能

confirm模式最大的好處在於他是異步的,生產者能夠在等信道返回的同時繼續發送下一條消息。spa

延伸

上面說到「當消息被投遞到全部匹配的隊列後,broker 就會發送一個(包含消息的惟一 ID 的)確認給發送端」,萬一發送確認後, rabbitMq 崩潰了,消息隊列中的消息就都沒了,這時候發送端還覺得消息還在隊列中。

爲了防止這種狀況的發送,rabbitMq 須要對隊列和消息進行持久化。

當消息和隊列開啓持久化以後,確認信息會等到消息寫入磁盤以後再發出

Confirm 的三種使用方式

  • 普通確認:每發送一條消息後,調用channel.waitForConfirms()方法,同步等待服務器端confirm。其實是一種串行confirm了。
  • 批量確認:每發送一批消息後,調用channel.waitForConfirms()方法,同步等待服務器端confirm
  • 異步確認:爲channel添加一個監聽器,rabbitmq 會回調這個方法,示例代碼以下
// 添加一個確認監聽
channel.addConfirmListener(new ConfirmListener() {
    //消息失敗處理
    @Override
    public void handleNack(long deliveryTag, boolean multiple) throws IOException {
        //deliveryTag;惟一消息標籤
        //multiple:是否批量
        System.err.println("-------no ack!-----------");
    }
    //消息成功處理
    @Override
    public void handleAck(long deliveryTag, boolean multiple) throws IOException {
        System.err.println("-------ack!-----------");
    }
});
複製代碼

實際開發中不多會直接使用 RabbitMQ 包,更多時候是使用 spring 提供的 spring-rabbit ,它封裝了 RabbitMQ 並將它集成到了 spring 框架中。

這裏給出 springBoot 版的異步確認的詳細實現,供你們參考。

RabbitMQ confirm 模式之異步確認

批量 confirm 模式 和 異步confirm模式效率更高些,可是批量confirm模式下,若是 rabbitMq 返回失敗,那麼須要從新發送這一批消息,建議最好仍是選擇異步confirm模式

超時

RabbitMQ 的響應可能會超時,超時多是消息沒有到達 mq ,也有多是網絡延遲致使的。對於響應超時的消息,一般被認定爲投遞失敗。

對於投遞失敗的消息,須要進行消息補償

消息補償

消息補償由發送端本身設計,常見的設計方案爲

數據落庫,消息狀態打標

如圖

  • step1: 發送業務消息前,先將業務數據和消息狀態入庫,並將消息狀態初始化爲發送中
  • step2: 發送業務消息,設置超時時間,同時異步監聽 RabbitMQ 響應
  • step3: RabbitMQ 返回響應
  • step4: 根據響應結果,更新消息狀態,投遞成功則將消息狀態設置成成功
  • step5: 定時任務找出狀態爲發送中,且時間超時的
  • stem6: 從新投遞
  • stem7: 通過上述步驟屢次(一般是3次)循環後,依然失敗的,設置消息狀態爲失敗
  • stem8: 人工去解決狀態爲失敗的消息

重複投遞

消息補償機制可能會致使重複投遞,重複投遞可能致使消費端重複消費。但重複投遞又沒法徹底避免,所以消費端須要防止重複消費

總結

要保證 RabbitMQ 的消息可靠性投遞,須要作到如下幾點

  • 消息發送端開啓 channel 的 confirm 模式
  • 消息發送端異步接收 RabbitMQ 響應
  • RabbitMQ 對隊列和消息進行持久化
  • 消息發送端創建消息投遞失敗的補償機制

相關文章

RabbitMQ 之消息的可靠性消費

RabbitMQ 持久化

相關文章
相關標籤/搜索