最近看了 @JavaGuide 發佈的一篇『面試官問我如何保證Kafka不丟失消息?我哭了!』,這篇文章承接這個主題,來聊聊如何保證 RocketMQ 不丟失消息。java
一條消息從生產到被消費,將會經歷三個階段:git
以上任一階段均可能會丟失消息,咱們只要找到這三個階段丟失消息緣由,採用合理的辦法避免丟失,就能夠完全解決消息丟失的問題。github
生產者(Producer) 經過網絡發送消息給 Broker,當 Broker 收到以後,將會返回確認響應信息給 Producer。因此生產者只要接收到返回的確認響應,就表明消息在生產階段未丟失。面試
RocketMQ 發送消息示例代碼以下:apache
DefaultMQProducer mqProducer=new DefaultMQProducer("test"); // 設置 nameSpace 地址 mqProducer.setNamesrvAddr("namesrvAddr"); mqProducer.start(); Message msg = new Message("test_topic" /* Topic */, "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); // 發送消息到一個Broker try { SendResult sendResult = mqProducer.send(msg); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
send
方法是一個同步操做,只要這個方法不拋出任何異常,就表明消息已經發送成功。網絡
消息發送成功僅表明消息已經到了 Broker 端,Broker 在不一樣配置下,可能會返回不一樣響應狀態:異步
SendStatus.SEND_OK
SendStatus.FLUSH_DISK_TIMEOUT
SendStatus.FLUSH_SLAVE_TIMEOUT
SendStatus.SLAVE_NOT_AVAILABLE
引用官方狀態說明:ide
上圖中不一樣 broker 端配置將會在下文詳細解釋
另外 RocketMQ 還提供異步的發送的方式,適合於鏈路耗時較長,對響應時間較爲敏感的業務場景。工具
DefaultMQProducer mqProducer = new DefaultMQProducer("test"); // 設置 nameSpace 地址 mqProducer.setNamesrvAddr("127.0.0.1:9876"); mqProducer.setRetryTimesWhenSendFailed(5); mqProducer.start(); Message msg = new Message("test_topic" /* Topic */, "Hello World".getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */ ); try { // 異步發送消息到,主線程不會被阻塞,馬上會返回 mqProducer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 消息發送成功, } @Override public void onException(Throwable e) { // 消息發送失敗,能夠持久化這條數據,後續進行補償處理 } }); } catch (RemotingException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); }
異步發送消息必定要注意重寫回調方法,在回調方法中檢查發送結果。性能
不論是同步仍是異步的方式,都會碰到網絡問題致使發送失敗的狀況。針對這種狀況,咱們能夠設置合理的重試次數,當出現網絡問題,能夠自動重試。設置方式以下:
// 同步發送消息重試次數,默認爲 2 mqProducer.setRetryTimesWhenSendFailed(3); // 異步發送消息重試次數,默認爲 2 mqProducer.setRetryTimesWhenSendAsyncFailed(3);
默認狀況下,消息只要到了 Broker 端,將會優先保存到內存中,而後馬上返回確認響應給生產者。隨後 Broker 按期批量的將一組消息從內存異步刷入磁盤。
這種方式減小 I/O 次數,能夠取得更好的性能,可是若是發生機器掉電,異常宕機等狀況,消息還未及時刷入磁盤,就會出現丟失消息的狀況。
若想保證 Broker 端不丟消息,保證消息的可靠性,咱們須要將消息保存機制修改成同步刷盤方式,即消息存儲磁盤成功,纔會返回響應。
修改 Broker 端配置以下:
## 默認狀況爲 ASYNC_FLUSH flushDiskType = SYNC_FLUSH
若 Broker 未在同步刷盤時間內(默認爲 5s)完成刷盤,將會返回 SendStatus.FLUSH_DISK_TIMEOUT
狀態給生產者。
集羣部署
爲了保證可用性,Broker 一般採用一主(master)多從(slave)部署方式。爲了保證消息不丟失,消息還須要複製到 slave 節點。
默認方式下,消息寫入 master 成功,就能夠返回確認響應給生產者,接着消息將會異步複製到 slave 節點。
注:master 配置:flushDiskType = SYNC_FLUSH
此時若 master 忽然宕機且不可恢復,那麼還未複製到 slave 的消息將會丟失。
爲了進一步提升消息的可靠性,咱們能夠採用同步的複製方式,master 節點將會同步等待 slave 節點複製完成,纔會返回確認響應。
異步複製與同步複製區別以下圖:
注: 你們不要被上圖誤導,broker master 只能配置一種複製方式,上圖只爲解釋同步複製的與異步複製的概念。
Broker master 節點 同步複製配置以下:
## 默認爲 ASYNC_MASTER brokerRole=SYNC_MASTER
若是 slave 節點未在指定時間內同步返回響應,生產者將會收到 SendStatus.FLUSH_SLAVE_TIMEOUT
返回狀態。
小結
結合生產階段與存儲階段,若須要嚴格保證消息不丟失,broker 須要採用以下配置:
## master 節點配置 flushDiskType = SYNC_FLUSH brokerRole=SYNC_MASTER ## slave 節點配置 brokerRole=slave flushDiskType = SYNC_FLUSH
同時這個過程咱們還須要生產者配合,判斷返回狀態是不是 SendStatus.SEND_OK
。如果其餘狀態,就須要考慮補償重試。
雖然上述配置提升消息的高可靠性,可是會下降性能,生產實踐中須要綜合選擇。
消費者從 broker 拉取消息,而後執行相應的業務邏輯。一旦執行成功,將會返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
狀態給 Broker。
若是 Broker 未收到消費確認響應或收到其餘狀態,消費者下次還會再次拉取到該條消息,進行重試。這樣的方式有效避免了消費者消費過程發生異常,或者消息在網絡傳輸中丟失的狀況。
消息消費的代碼以下:
// 實例化消費者 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer"); // 設置NameServer的地址 consumer.setNamesrvAddr("namesrvAddr"); // 訂閱一個或者多個Topic,以及Tag來過濾須要消費的消息 consumer.subscribe("test_topic", "*"); // 註冊回調實現類來處理從broker拉取回來的消息 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 執行業務邏輯 // 標記該消息已經被成功消費 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 啓動消費者實例 consumer.start();
以上消費消息過程的,咱們須要注意返回消息狀態。只有當業務邏輯真正執行成功,咱們才能返回 ConsumeConcurrentlyStatus.CONSUME_SUCCESS
。不然咱們須要返回 ConsumeConcurrentlyStatus.RECONSUME_LATER
,稍後再重試。
看完 RocketMQ 不丟消息處理辦法,回頭再看這篇 kafka,有沒有發現,二者解決思路是同樣的,區別就是參數配置不同而已。
因此下一次,面試官再問你 XX 消息隊列如何保證不丟消息?若是你沒用過這個消息隊列,也不要哭,微笑面對他,從容給他分析那幾步會丟失,而後大體解決思路。
最後咱們還能夠說出咱們的思考,雖然提升消息可靠性,可是可能致使消息重發,重複消費。因此對於消費客戶端,須要注意保證冪等性。
可是要注意了,這時面試官可能就會跟你的話題,讓你來聊聊如何保證冪等性,必定先想好再說哦。
什麼?你還不知道如何實現冪等?那就趕忙關注@程序通事,後面文章咱們就來聊聊冪等這個話題。
才疏學淺,不免會有紕漏,若是你發現了錯誤的地方,還請你留言給我指出來,我對其加以修改。
再次感謝您的閱讀,我是樓下小黑哥,一位還未禿頭的工具猿,下篇文章咱們再見~
歡迎關注個人公衆號:程序通事,得到平常乾貨推送。若是您對個人專題內容感興趣,也能夠關注個人博客: studyidea.cn