1.上一篇介紹了在 SpringBoot 中怎麼使用 RabbitMQ 來實現 RPC 功能,分享了可能踩到的坑及解決辦法;
2.本篇主要介紹消息可能會存在丟失的場景及解決思路,基本上涵蓋了可能會遇到的全部的場景。
持久化能夠提升 RabbitMQ 的可靠性,以防止在異常狀況(好比:重啓、關機、宕機等)下的數據丟失。 RabbitMQ 持久化分爲三部分:交換機的持久化、隊列的持久化、消息的持久化。java
交換機持久化是指將交換機的屬性數據存儲在磁盤上,當 MQ 的服務器發生意外或關閉以後,在重啓 RabbitMQ 時不須要從新手動或執行代碼去建立交換機了,交換機會自動被建立,至關於一直存在。
在建立交換機的時候將durable
參數設置爲true
便可。 好比,我聲明一個類型爲 direct 的交換機:spring
/** * 設置交換機,類型爲 direct * @return DirectExchange */ @Bean DirectExchange myExchange() { return new DirectExchange(QueueConstants.QUEUE\_EXCHANGE\_NAME, true, false); }
durable
參數設置爲true
,則交換機的元數據會被存儲在磁盤上,對於一個長期使用的交換機來講,建議將其設置爲持久化。若是不將隊列設置爲持久化,那麼在 RabbitMQ 服務重啓以後,相關隊列的元數據會丟失,數據也會丟失。隊列都沒有了,消息也找不到地方存儲了。
一樣,在建立隊列的時候將durable
參數設置爲true
便可。數據庫
/** * 建立隊列 */ @Bean public Queue myQueue() { return new Queue(QueueConstants.RPC_QUEUE1); }
RabbitMQ 的消息是依附於隊列存在的,因此要想消息持久化,那麼前提是隊列也必須設置持久化。segmentfault
在建立消息的時候,添加一個持久化消息的屬性(將delivery_mode
設置爲 2)。服務器
在 SpringBoot 中使用 rabbitTemplate 發送的消息默認就是持久化的,由於默認已經設置爲 delivery_mode = 2
,下面咱們經過查看源碼來驗證一下。微信
1> sendAndReceive
生產者發送消息的時候會使用 rabbitTemplate 的 sendAndReceive 接口來發送消息:網絡
@Nullable public Message sendAndReceive(String exchange, String routingKey, Message message) throws AmqpException { return this.sendAndReceive(exchange, routingKey, message, (CorrelationData)null); }
2> Message
第三個參數 Message 有一個 MessageProperties 屬性源碼分析
打開 Message.class:性能
public class Message implements Serializable { private static final long serialVersionUID = -7177590352110605597L; private static final String ENCODING = Charset.defaultCharset().name(); private static final Set<String> whiteListPatterns = new LinkedHashSet(Arrays.asList("java.util.*", "java.lang.*")); private final MessageProperties messageProperties; private final byte[] body;
3> MessageProperties
打開 MessageProperties.class :this
static { DEFAULT_DELIVERY_MODE = MessageDeliveryMode.PERSISTENT; DEFAULT_PRIORITY = 0; }
MessageDeliveryMode.class:
public enum MessageDeliveryMode { NON_PERSISTENT, PERSISTENT; private MessageDeliveryMode() { } public static int toInt(MessageDeliveryMode mode) { switch(mode) { case NON_PERSISTENT: return 1; case PERSISTENT: return 2; default: return -1; } }
經過源碼查看在 SpringBoot 中使用 rabbimqTemplate 發送的消息默認就是持久化的消息。
1.不知道生產者發送的消息到底是否已經到達 RabbitMQ Server; > 2.不知道生產者發送的消息是否已經成功的分配到隊列中去。
開啓消息發送確認,經過 ConfirmCallback 接口 和 ReturnCallback 接口 來保障。
備註
具體的操做方式能夠參考 RabbitMQ系列之消息確認機制 這篇文章。
消費者收到消息還沒來得及處理服務就宕機了。
消費端開啓消息確認(ACK),將消息設置爲手動確認:
# 開啓 ACK(消費者接收到消息時手動確認) spring.rabbitmq.listener.simple.acknowledge-mode=manual
這樣雖然服務宕機,可是在重啓以後,消費者仍然會消費到該條數據。
備註
具體的操做方式能夠參考 RabbitMQ 系列之消息確認機制 這篇文章。
持久化的消息成功存入 RabbitMQ 以後,若是在存入磁盤的這個過程當中 RabbitMQ 服務節點宕機、異常重啓等,消息還沒來得及存入磁盤。
可使用 RabbitMQ 的鏡集羣像模式進行部署,若是主節點在這個特殊的時間段內掛掉了,會自動切換到從節點,這樣就保證了高可用性,除非整個集羣都掛掉。
備註
瞭解更多關於 RabbitMQ 鏡像集羣模式能夠參考 RabbitMQ系列之部署模式 這篇文章。
好比設置爲持久化的消息,在保存到磁盤的過程當中,當前隊列節點掛了,存儲節點的磁盤也掛了。
歡迎你們關注個人微信公衆號閱讀更多文章: