解決RabbitMQ消息丟失問題和保證消息可靠性(一)

工做中常常用到消息中間件來解決系統間的解耦問題或者高併發消峯問題,可是消息的可靠性如何保證一直是個很大的問題,什麼狀況下消息就不見了?如何防止消息丟失?下面經過這篇文章,咱們就聊聊RabbitMQ 消息可靠性如何解決的?

本文分三部分說明

  1. RabbitMQ 消息丟失場景有哪些?
  2. 如何避免消息丟失?
  3. 如何設計部署消息中間件保證消息可靠性?

RabbitMQ 消息丟失場景有哪些?

首先咱們看下消息週期投遞過程:java

咱們把該圖分三部分,左中右,每部分都會致使消息丟失狀況,下面就詳細聊聊每一個階段消息是如何丟的:web

1.生產者生產消息到RabbitMQ Server 消息丟失場景

1) 外界環境問題致使:發生網絡丟包、網絡故障等形成RabbitMQ Server端收不到消息,由於生產環境的網絡是很複雜的,網絡抖動,丟包現象很常見,下面會講到針對這個問題是如何解決的。編程

2) 代碼層面,配置層面,考慮不全致使消息丟失bash

事例1:
通常狀況下,生產者使用Confirm模式投遞消息,若是方案不夠嚴謹,好比RabbitMQ Server 接收消息失敗後會發送nack消息通知生產者,生產者監聽消息失敗或者沒作任何事情,消息存在丟失風險;服務器

事例2:
生產者發送消息到exchange後,發送的路由和queue沒有綁定,消息會存在丟失狀況,下面會講到具體的例子,保證意外狀況的發生,即便發生,也在可控範圍內。網絡

2.RabbitMQ Server中存儲的消息丟失或可靠性不足

1)消息未徹底持久化,當機器重啓後,消息會所有丟失,甚至Queue也不見了架構

假如:你僅僅持久化了Message,而Exchange,Queue沒有持久化,這個持久化是無效的。 記得以前公司有一哥們忘記持久化Queue致使機器重啓後,Queue不見了,天然Message也丟失了。併發

2)單節點模式問題,若是某個節點掛了,消息就不能用了,業務可能癱瘓,只能等待異步

若是作了消息持久化方案,消息會持久化硬盤,機器重啓後消息不會丟失;可是還有一個極端狀況,這臺服務器磁盤忽然壞了(公司遇到過磁盤問題仍是不少的),消息持久化不了,非高可用狀態,這個模式生產環境慎重考慮。高併發

3)普通集羣模式:某個節點掛了,該節點上的消息不能用,有影響的業務癱瘓,只能等待節點恢復重啓可用(創建在消息持久化)

雖然這個模式進步了一點點,多個節點,可是消息仍是不能保證可靠,爲何呢?

由於RabbitMQ 集羣模式有點特殊,隊列的內容僅僅存在某一個節點上面,不會存在全部節點上面,全部節點僅僅存放消息結構和元數據(能夠理解爲索引,這也是爲了提升性能,若是每次把全部內容同步到全部節點是有開銷代價的)。 下面本身畫了一張圖介紹普通集羣丟失消息狀況:

這裏有三個節點,一般狀況下一個磁盤節點,兩個內存節點,首先先說明下, Queue1 內容僅僅存在節點note1上面,在建立隊列的時候已經固定了,note2,note3 僅僅存放的是元數據,這個必定要清楚,Producer發送消息到note2,note2 會同步元數據到其餘節點,內容會同步note1。

那咱們想下,圖中的Q1問題,note1掛了,這個節點的Queues所有暫時不可用,節點恢復後可用。

咱們說下圖片中備註2中的問題,Producer發送消息到note2,note2在同步note1前note1掛了,此時你的心情是怎麼樣的。。。後面會講具體的策略

4)鏡像模式:能夠解決上面的問題,可是仍是有意外狀況發生

好比:持久化的消息,保存到硬盤過程當中,當前隊列節點掛了,存儲節點硬盤又壞了,消息丟了,怎麼辦?下面會詳細介紹

3.RabbitMQ Server到消費者消息丟失

  1. 消費端接收到相關消息以後,消費端還沒來得及處理消息,消費端機器就宕機了,此時消息若是處理不當會有丟失風險,後面會講到如何處理這個狀況,消費端也有ack機制

如何避免消息丟失?

下面也是從三個方面介紹:

1.生產者生產消息到RabbitMQ Server 可靠性保證

2.RabbitMQ Server中存儲的消息如何保證

3.RabbitMQ Server到消費者消息如何不丟

1. 生產者生產消息到RabbitMQ Server可靠性保證

這個過程,消息可能會丟,好比發生網絡丟包、網絡故障等形成消息丟失,通常狀況下若是不採起措施,生產者沒法感知消息是否已經正確無誤的發送到exchange中,若是生產者能感知到的話,它能夠進行進一步的處理動做,好比從新投遞相關消息以確保消息的可靠性。

1.1 一般有一種方案能夠解決:就是 AMQP協議提供的一個事務機制

RabbitMQ客戶端中Channel 接口提供了幾個事務機制相關的方法:

channel.txSelect

channel.txCommit

channel.txRollback

源碼截圖以下:com.rabbitmq.client 包中public interface Channel extendsShutdownNotifier {}接口

在生產者發送消息以前,經過channel.txSelect開啓一個事務,接着發送消息, 若是消息投遞server失敗,進行事務回滾channel.txRollback,而後從新發送, 若是server收到消息,就提交事務channel.txCommit

可是,不多有人這麼幹,由於這是同步操做,一條消息發送以後會使發送端阻塞,以等待RabbitMQ Server的迴應,以後才能繼續發送下一條消息,生產者生產消息的吞吐量和性能都會大大下降。

1.2 幸運的是RabbitMQ提供了一個改進方案,即發送方確認機制(publisher confirm)

首先生產者經過調用channel.confirmSelect方法將信道設置爲confirm模式,一旦信道進入confirm模式,全部在該信道上面發佈的消息都會被指派一個惟一的ID(從1開始),一旦消息被投遞到全部匹配的隊列以後,RabbitMQ就會發送一個確認(Basic.Ack)給生產者(包含消息的惟一deliveryTag和multiple參數),這就使得生產者知曉消息已經正確到達了目的地了。

其實Confirm模式有三種方式實現:

  1. 串行confirm模式:producer每發送一條消息後,調用waitForConfirms()方法,等待broker端confirm,若是服務器端返回false或者在超時時間內未返回,客戶端進行消息重傳。
  2. 批量confirm模式:producer每發送一批消息後,調用waitForConfirms()方法,等待broker端confirm。
  3. 異步confirm模式:提供一個回調方法,broker confirm了一條或者多條消息後producer端會回調這個方法。 咱們分別來看看這三種confirm模式

串行confirm

for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
    if (channel.waitForConfirms()) {
        System.out.println("發送成功");
    } else {
        //發送失敗這裏可進行消息從新投遞的邏輯
        System.out.println("發送失敗");
    }
}
複製代碼

批量confirm模式

for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
}
if (channel.waitForConfirms()) {
    System.out.println("發送成功");
} else {
    System.out.println("發送失敗");
}
複製代碼

上面代碼是簡單版本的,生產環境絕對不是循環發送的,而是根據業務狀況, 各個客戶端程序須要按期(每x秒)或定量(每x條)或者二者結合來publish消息,而後等待服務器端confirm。相比普通confirm模式,批量能夠極大提高confirm效率。

可是有沒有發現什麼問題?

問題1: 批量發送的邏輯複雜化了。

問題2: 一旦出現confirm返回false或者超時的狀況時,客戶端須要將這一批次的消息所有重發,這會帶來明顯的重複消息數量,而且當消息常常丟失時,批量confirm性能應該是不升反降的。

異步confirm模式

Channel channel = channelManager.getPublisherChannel(namespaceName);
ProxiedConfirmListener confirmListener = new ProxiedConfirmListener();//監聽類
confirmListener.setChannelManager(channelManager);
confirmListener.setChannel(channel);
confirmListener.setNamespace(namespaceName);
confirmListener.addSuccessCallbacks(successCallbacks);
channel.addConfirmListener(confirmListener);
channel.confirmSelect();//開啓confirm模式
AMQP.BasicProperties messageProperties = null;
if (message.getProperty() instanceof AMQP.BasicProperties) {
    messageProperties = (AMQP.BasicProperties) message.getProperty();
}
confirmListener.toConfirm(channel.getNextPublishSeqNo(), rawMsg);
for(int i = 0;i<50;i++){
    channel.basicPublish(
            exchange, routingKey,
            mandatory, immediate,
            messageProperties,
            message.getContent()
    );
}
複製代碼

異步模式須要本身多寫一部分複雜的代碼實現,異步監聽類,監聽server端的通知消息,異步的好處性能會大幅度提高,發送完畢以後,能夠繼續發送其餘消息。 MQServer通知生產端ConfirmListener監聽類:用戶能夠繼承接口實現本身的實現類,處理消息確認機制,此處繼承類代碼省略,就是上面 ProxiedConfirmListener 類: 下面貼下要實現的接口:

package com.rabbitmq.client;

import java.io.IOException;

/**
 * Implement this interface in order to be notified of Confirm events.
 * Acks represent messages handled successfully; Nacks represent
 * messages lost by the broker.  Note, the lost messages could still
 * have been delivered to consumers, but the broker cannot guarantee
 * this.
 */
public interface ConfirmListener {
    /**
    ** handleAck RabbitMQ消息接收成功的方法,成功後業務能夠作的事情
    ** 發送端投遞消息前,須要把消息先存起來,好比用KV存儲,接收到ack後刪除
    **/
    void handleAck(long deliveryTag, boolean multiple)
        throws IOException;

    //handleNack RabbitMQ消息接收失敗的通知方法,用戶能夠在這裏從新投遞消息
    void handleNack(long deliveryTag, boolean multiple)
        throws IOException;
}
複製代碼

上面的接口頗有意思,若是是你的話,怎麼實現? 消息投遞前如何存儲消息,ack 和 nack 如何處理消息?

下面看下異步confirm的消息投遞流程:

解釋下這張圖片:

channel1 連續發類1,2,3條消息到RabbitMQ-Server,RabbitMQ-Server通知返回一條通知,裏面包含回傳給生產者的確認消息中的deliveryTag包含了確認消息的序號,此外還有一個參數multiple=true,表示到這個序號以前的全部消息都已經獲得了處理。這樣客戶端和服務端通知的次數就減小類,提高類性能。

channel3 發送的消息失敗了,生產端須要對投遞消息從新投遞,須要額外處理代碼。 那麼生產端須要作什麼事情呢?由於是異步的,生產端須要存儲消息而後根據server通知的消息,確認如何處理,因而咱們面臨的問題是:

第一:發送消息以前把消息存起來

第二:監聽ack 和 nack 並作響應處理

那麼怎麼存儲呢?

咱們分析下,可使用SortedMap 存儲,保證有序,可是有個問題高併發狀況下, 每秒可能幾千甚至上萬的消息投遞出去,消息的ack要等幾百毫秒的話,放內存可能有內存溢出的風險。因此建議採用KV存儲,KV存儲承載高併發能力高,性能好,可是要保證KV 高可用,單個有個缺點就是又引入了第三方中間件,複雜度升高。

解決了上面的問題,下面還會遇到一個問題,消息丟失的另外一個狀況?

事務機制和publisher confirm機制確保的是消息可以正確的發送至RabbitMQ,這裏的「發送至RabbitMQ」的含義是指消息被正確的發往至RabbitMQ的交換器,若是此交換器沒有匹配的隊列的話,那麼消息也將會丟失,怎麼辦?

這裏有兩個解決方案,

1. 使用mandatory 設置true

2. 利用備份交換機(alternate-exchange):實現沒有路由到隊列的消息

咱們看下RabbitMQ客戶端代碼方法

Channel 類中 發佈消息方法

void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body)
            throws IOException;
            
            
複製代碼

解釋下:basicPublish 方法中的,mandatory和immediate

/**
     * 當mandatory標誌位設置爲true時,若是exchange根據自身類型和消息routeKey沒法找到一個符合條件的queue, 那麼會調用basic.return方法將消息返回給生產者<br>
     * 當mandatory設置爲false時,出現上述情形broker會直接將消息扔掉。
     */
    @Setter(AccessLevel.PACKAGE)
    private boolean mandatory = false;

    /**
     * 當immediate標誌位設置爲true時,若是exchange在將消息路由到queue(s)時發現對於的queue上沒有消費者, 那麼這條消息不會放入隊列中。
     當immediate標誌位設置爲false時,exchange路由的隊列沒有消費者時,該消息會經過basic.return方法返還給生產者。
     * RabbitMQ 3.0版本開始去掉了對於immediate參數的支持,對此RabbitMQ官方解釋是:這個關鍵字違背了生產者和消費者之間解耦的特性,由於生產者不關心消息是否被消費者消費掉
     */
    @Setter(AccessLevel.PACKAGE)
    private boolean immediate;
    
複製代碼

因此爲了保證消息的可靠性,須要設置發送消息代碼邏輯。若是不單獨形式設置mandatory=false

使用mandatory 設置true的時候有個關鍵點要調整,生產者如何獲取到沒有被正確路由到合適隊列的消息呢?經過調用channel.addReturnListener來添加ReturnListener監聽器實現,只要發送的消息,沒有路由到具體的隊列,ReturnListener就會收到監聽消息。

channel.addReturnListener(new ReturnListener() {
            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP
                    .BasicProperties basicProperties, byte[] body) throws IOException {
                String message = new String(body);
                //進入該方法表示,沒路由到具體的隊列
                //監聽到消息,能夠從新投遞或者其它方案來提升消息的可靠性。
                System.out.println("Basic.Return返回的結果是:" + message);
            }
 });
複製代碼

此時有人問了,不想複雜化生產者的編程邏輯,又不想消息丟失,那麼怎麼辦? 還好RabbitMQ提供了一個叫作alternate-exchange東西,翻譯下就是備份交換器,這個幹什麼用呢?很簡單,它能夠將未被路由的消息存儲在另外一個exchange隊列中,再在須要的時候去處理這些消息。

那如何實現呢?

簡單一點能夠經過webui管理後臺設置,當你新建一個exchange業務的時候,能夠給它設置Arguments,這個參數就是 alternate-exchange,其實alternate-exchange就是一個普通的exchange,類型最好是fanout 方便管理

當你發送消息到你本身的exchange時候,對應key沒有路由到queue,就會自動轉移到alternate-exchange對應的queue,起碼消息不會丟失。

下面一張圖看下投遞過程:

那麼有人有個疑問,上面介紹了,兩種方式處理,發送的消息沒法路由到隊列的方案, 若是備份交換器和mandatory參數一塊兒使用,會有什麼效果?

答案是:mandatory參數無效

因爲篇幅太長,我會再分一篇文章出來說下面的內容

2. RabbitMQ Server中存儲的消息如何保證消息可靠性和高可用

待續...

3. RabbitMQ Server到消費者消息如何不丟

待續...

再聊聊大廠都是如何使用在生產環境的

待續...

END

若有收穫,請幫忙轉發,後續會有更好文章貢獻,您的鼓勵是做者最大的動力!

歡迎關注個人公衆號:架構師的修煉,得到獨家整理的學習資源和平常乾貨推送。

相關文章
相關標籤/搜索