RabbitMQ系列之怎麼確保消息不丟失

1.上一篇介紹了在 SpringBoot 中怎麼使用 RabbitMQ 來實現 RPC 功能,分享了可能踩到的坑及解決辦法;
2.本篇主要介紹消息可能會存在丟失的場景及解決思路,基本上涵蓋了可能會遇到的全部的場景。

一. 經過設置持久化

持久化能夠提升 RabbitMQ 的可靠性,以防止在異常狀況(好比:重啓、關機、宕機等)下的數據丟失。 RabbitMQ 持久化分爲三部分:交換機的持久化、隊列的持久化、消息的持久化。java

1. 什麼是交換機持久化

交換機持久化是指將交換機的屬性數據存儲在磁盤上,當 MQ 的服務器發生意外或關閉以後,在重啓 RabbitMQ 時不須要從新手動或執行代碼去建立交換機了,交換機會自動被建立,至關於一直存在。

2. 怎麼將交換機持久化

在建立交換機的時候將durable參數設置爲true便可。 好比,我聲明一個類型爲 direct 的交換機:spring

/**

 * 設置交換機,類型爲 direct

 * @return DirectExchange

 */

@Bean

DirectExchange myExchange() {

return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false);

}
說明
  • 經過將durable參數設置爲true,則交換機的元數據會被存儲在磁盤上,對於一個長期使用的交換機來講,建議將其設置爲持久化。

3. 隊列持久化

若是不將隊列設置爲持久化,那麼在 RabbitMQ 服務重啓以後,相關隊列的元數據會丟失,數據也會丟失。隊列都沒有了,消息也找不到地方存儲了。

4. 怎麼將隊列持久化

一樣,在建立隊列的時候將durable參數設置爲true便可。數據庫

/**
 * 建立隊列
 */
@Bean
public Queue myQueue() {
    return new Queue(QueueConstants.RPC_QUEUE1);
}
說明
  • durable 參數默認爲 false,只針對當前鏈接有效,當 RabbitMQ 服務重啓後數據會丟失;
  • 隊列的持久化能保證其自己的元數據不會因異常狀況而丟失,可是並不能保證內部所存儲的消息不會丟失;
  • 若是要確保消息不會丟失,就須要設置消息的持久化。

5. 消息持久化

RabbitMQ 的消息是依附於隊列存在的,因此要想消息持久化,那麼前提是隊列也必須設置持久化。segmentfault

6. 怎麼將消息持久化

在建立消息的時候,添加一個持久化消息的屬性(將delivery_mode設置爲 2)。服務器

SpringBoot中怎麼設置消息持久化

在 SpringBoot 中使用 rabbitTemplate 發送的消息默認就是持久化的,由於默認已經設置爲 delivery_mode = 2,下面咱們經過查看源碼來驗證一下。微信

源碼分析

1> sendAndReceive
生產者發送消息的時候會使用 rabbitTemplate 的 sendAndReceive 接口來發送消息:網絡

@Nullable
    public Message sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException {
        return this.sendAndReceive(exchange, routingKey, message, (CorrelationData)null);
    }

2> Message
第三個參數 Message 有一個 MessageProperties 屬性源碼分析

打開 Message.class:性能

public class Message implements Serializable {
    private static final long serialVersionUID = -7177590352110605597L;
    private static final String ENCODING = Charset.defaultCharset().name();
    private static final Set<String> whiteListPatterns = new LinkedHashSet(Arrays.asList("java.util.*", "java.lang.*"));
    private final MessageProperties messageProperties;
    private final byte[] body;

3> MessageProperties
打開 MessageProperties.class :this

static {
        DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT;
        DEFAULT_PRIORITY = 0;
    }

MessageDeliveryMode.class:

public enum MessageDeliveryMode {
    NON_PERSISTENT,
    PERSISTENT;

    private MessageDeliveryMode() {
    }

    public static int toInt(MessageDeliveryMode mode) {
        switch(mode) {
        case NON_PERSISTENT:
            return 1;
        case PERSISTENT:
            return 2;
        default:
            return -1;
        }
    }
結論

經過源碼查看在 SpringBoot 中使用 rabbimqTemplate 發送的消息默認就是持久化的消息。

7. 總結

  1. 設置了隊列和消息的持久化,當 RabbitMQ 服務重啓以後,消息依舊會存在;
  2. 僅設置隊列持久化,重啓以後消息會丟失;
  3. 僅設置消息持久化,重啓以後隊列會消失,所以消息也就丟失了,因此只設置消息持久化而不設置隊列持久化是沒有意義的;
  4. 將全部的消息都設置爲持久化(寫入磁盤的速度比寫入內存的速度慢的多),可能會影響 RabbitMQ 的性能,對於可靠性不是那麼高的消息能夠不採用持久化來提升 RabbitMQ 的吞吐量。

二. 生產者開啓發送確認

場景

1.不知道生產者發送的消息到底是否已經到達 RabbitMQ Server; > 2.不知道生產者發送的消息是否已經成功的分配到隊列中去。

解決辦法

開啓消息發送確認,經過 ConfirmCallback 接口 和 ReturnCallback 接口 來保障。

備註
具體的操做方式能夠參考 RabbitMQ系列之消息確認機制 這篇文章。

三. 消費者開啓消息確認(ACK)

場景

消費者收到消息還沒來得及處理服務就宕機了。

解決辦法

消費端開啓消息確認(ACK),將消息設置爲手動確認:

# 開啓 ACK(消費者接收到消息時手動確認) 

spring.rabbitmq.listener.simple.acknowledge-mode=manual

這樣雖然服務宕機,可是在重啓以後,消費者仍然會消費到該條數據。
備註
具體的操做方式能夠參考 RabbitMQ 系列之消息確認機制 這篇文章。

四. 使用 RabbitMQ 的鏡集羣像模式進行部署 #### 場景

持久化的消息成功存入 RabbitMQ 以後,若是在存入磁盤的這個過程當中 RabbitMQ 服務節點宕機、異常重啓等,消息還沒來得及存入磁盤。

解決辦法

可使用 RabbitMQ 的鏡集羣像模式進行部署,若是主節點在這個特殊的時間段內掛掉了,會自動切換到從節點,這樣就保證了高可用性,除非整個集羣都掛掉。

備註
瞭解更多關於 RabbitMQ 鏡像集羣模式能夠參考 RabbitMQ系列之部署模式 這篇文章。

五. 消息補償機制

場景

好比設置爲持久化的消息,在保存到磁盤的過程當中,當前隊列節點掛了,存儲節點的磁盤也掛了。

解決辦法

  1. 因爲系統功能複雜,加上網絡不肯定性太多,因此消息補償機制須要創建在系統記錄了詳細的日誌,好比消息發送日誌,消息接收日誌,存入數據庫日誌等的前提下;
  2. 經過手動觸發或者定時掃描,從這些日誌中提取出符合消息補償要求的數據,進行消息補償。

關注微信公衆號

歡迎你們關注個人微信公衆號閱讀更多文章:
微信公衆號二維碼.jpg

相關文章
相關標籤/搜索