系統學習消息隊列分享(六) 如何確保消息不會丟失?

對於剛剛接觸消息隊列的同窗,最常遇到的問題,也是最頭痛的問題就是丟消息了。對於大部分業務系統來講,丟消息意味着數據丟失,是徹底沒法接受的。數據庫

其實,如今主流的消息隊列產品都提供了很是完善的消息可靠性保證機制,徹底能夠作到在消息傳遞過程當中,即便發生網絡中斷或者硬件故障,也能確保消息的可靠傳遞,不丟消息。服務器

絕大部分丟消息的緣由都是因爲開發者不熟悉消息隊列,沒有正確使用和配置消息隊列致使的。雖然不一樣的消息隊列提供的 API 不同,相關的配置項也不一樣,可是在保證消息可靠傳遞這塊兒,它們的實現原理是同樣的。網絡

這節課咱們就來說一下,消息隊列是怎麼保證消息可靠傳遞的,這裏面的實現原理是怎麼樣的。當你熟知原理之後,不管你使用任何一種消息隊列,再簡單看一下它的 API 和相關配置項,就能很快知道該如何配置消息隊列,寫出可靠的代碼,避免消息丟失。異步

檢測消息丟失的方法

咱們說,用消息隊列最尷尬的狀況不是丟消息,而是消息丟了還不知道。通常而言,一個新的系統剛剛上線,各方面都不太穩定,須要一個磨合期,這個時候,特別須要監控到你的系統中是否有消息丟失的狀況。分佈式

若是是 IT 基礎設施比較完善的公司,通常都有分佈式鏈路追蹤系統,使用相似的追蹤系統能夠很方便地追蹤每一條消息。若是沒有這樣的追蹤系統,這裏我提供一個比較簡單的方法,來檢查是否有消息丟失的狀況。spa

咱們能夠利用消息隊列的有序性來驗證是否有消息丟失。原理很是簡單,在 Producer 端,咱們給每一個發出的消息附加一個連續遞增的序號,而後在 Consumer 端來檢查這個序號的連續性。日誌

若是沒有消息丟失,Consumer 收到消息的序號必然是連續遞增的,或者說收到的消息,其中的序號必然是上一條消息的序號 +1。若是檢測到序號不連續,那就是丟消息了。還能夠經過缺失的序號來肯定丟失的是哪條消息,方便進一步排查緣由。code

大多數消息隊列的客戶端都支持攔截器機制,你能夠利用這個攔截器機制,在 Producer 發送消息以前的攔截器中將序號注入到消息中,在 Consumer 收到消息的攔截器中檢測序號的連續性,這樣實現的好處是消息檢測的代碼不會侵入到你的業務代碼中,待你的系統穩定後,也方便將這部分檢測的邏輯關閉或者刪除。blog

若是是在一個分佈式系統中實現這個檢測方法,有幾個問題須要你注意。隊列

首先,像 Kafka 和 RocketMQ 這樣的消息隊列,它是不保證在 Topic 上的嚴格順序的,只能保證分區上的消息是有序的,因此咱們在發消息的時候必需要指定分區,而且,在每一個分區單獨檢測消息序號的連續性。

若是你的系統中 Producer 是多實例的,因爲並很差協調多個 Producer 之間的發送順序,因此也須要每一個 Producer 分別生成各自的消息序號,而且須要附加上 Producer 的標識,在 Consumer 端按照每一個 Producer 分別來檢測序號的連續性。

Consumer 實例的數量最好和分區數量一致,作到 Consumer 和分區一一對應,這樣會比較方便地在 Consumer 內檢測消息序號的連續性。

確保消息可靠傳遞

講完了檢測消息丟失的方法,接下來咱們一塊兒來看一下,整個消息從生產到消費的過程當中,哪些地方可能會致使丟消息,以及應該如何避免消息丟失。

你能夠看下這個圖,一條消息從生產到消費完成這個過程,能夠劃分三個階段,爲了方便描述,我給每一個階段分別起了個名字。

 

 

 

 

  • 生產階段: 在這個階段,從消息在 Producer 建立出來,通過網絡傳輸發送到 Broker 端。
  • 存儲階段: 在這個階段,消息在 Broker 端存儲,若是是集羣,消息會在這個階段被複制到其餘的副本上。
  • 消費階段: 在這個階段,Consumer 從 Broker 上拉取消息,通過網絡傳輸發送到 Consumer 上。

 

1. 生產階段

在生產階段,消息隊列經過最經常使用的請求確認機制,來保證消息的可靠傳遞:當你的代碼調用發消息方法時,消息隊列的客戶端會把消息發送到 Broker,Broker 收到消息後,會給客戶端返回一個確認響應,代表消息已經收到了。客戶端收到響應後,完成了一次正常消息的發送。

只要 Producer 收到了 Broker 的確認響應,就能夠保證消息在生產階段不會丟失。有些消息隊列在長時間沒收到發送確認響應後,會自動重試,若是重試再失敗,就會以返回值或者異常的方式告知用戶。

你在編寫發送消息代碼時,須要注意,正確處理返回值或者捕獲異常,就能夠保證這個階段的消息不會丟失。以 Kafka 爲例,咱們看一下如何可靠地發送消息:

同步發送時,只要注意捕獲異常便可。

try { RecordMetadata metadata = producer.send(record).get(); System.out.println(" 消息發送成功。"); } catch (Throwable e) { System.out.println(" 消息發送失敗!"); System.out.println(e); }

異步發送時,則須要在回調方法裏進行檢查。這個地方是須要特別注意的,不少丟消息的緣由就是,咱們使用了異步發送,卻沒有在回調中檢查發送結果。

producer.send(record, (metadata, exception) -> { if (metadata != null) { System.out.println(" 消息發送成功。"); } else { System.out.println(" 消息發送失敗!"); System.out.println(exception); } });

2. 存儲階段

在存儲階段正常狀況下,只要 Broker 在正常運行,就不會出現丟失消息的問題,可是若是 Broker 出現了故障,好比進程死掉了或者服務器宕機了,仍是可能會丟失消息的。

若是對消息的可靠性要求很是高,能夠經過配置 Broker 參數來避免由於宕機丟消息。

對於單個節點的 Broker,須要配置 Broker 參數,在收到消息後,將消息寫入磁盤後再給 Producer 返回確認響應,這樣即便發生宕機,因爲消息已經被寫入磁盤,就不會丟失消息,恢復後還能夠繼續消費。例如,在 RocketMQ 中,須要將刷盤方式 flushDiskType 配置爲 SYNC_FLUSH 同步刷盤。

若是是 Broker 是由多個節點組成的集羣,須要將 Broker 集羣配置成:至少將消息發送到 2 個以上的節點,再給客戶端回覆發送確認響應。這樣當某個 Broker 宕機時,其餘的 Broker 能夠替代宕機的 Broker,也不會發生消息丟失。後面我會專門安排一節課,來說解在集羣模式下,消息隊列是如何經過消息複製來確保消息的可靠性的。

3. 消費階段

消費階段採用和生產階段相似的確認機制來保證消息的可靠傳遞,客戶端從 Broker 拉取消息後,執行用戶的消費業務邏輯,成功後,纔會給 Broker 發送消費確認響應。若是 Broker 沒有收到消費確認響應,下次拉消息的時候還會返回同一條消息,確保消息不會在網絡傳輸過程當中丟失,也不會由於客戶端在執行消費邏輯中出錯致使丟失。

你在編寫消費代碼時須要注意的是,不要在收到消息後就當即發送消費確認,而是應該在執行完全部消費業務邏輯以後,再發送消費確認。

一樣,咱們以用 Python 語言消費 RabbitMQ 消息爲例,來看一下如何實現一段可靠的消費代碼:

def callback(ch, method, properties, body): print(" [x] 收到消息 %r" % body) # 在這兒處理收到的消息 database.save(body) print(" [x] 消費完成 ") # 完成消費業務邏輯後發送消費確認響應 ch.basic_ack(delivery_tag = method.delivery_tag) channel.basic_consume(queue='hello', on_message_callback=callback)

你能夠看到,在消費的回調方法 callback 中,正確的順序是,先是把消息保存到數據庫中,而後再發送消費確認響應。這樣若是保存消息到數據庫失敗了,就不會執行消費確認的代碼,下次拉到的仍是這條消息,直到消費成功。

小結

這節課我帶你們分析了一條消息從發送到消費整個流程中,消息隊列是如何確保消息的可靠性,不會丟失的。這個過程能夠分爲分三個階段,每一個階段都須要正確的編寫代碼而且設置正確的配置項,才能配合消息隊列的可靠性機制,確保消息不會丟失。

  • 在生產階段,你須要捕獲消息發送的錯誤,並重發消息。
  • 在存儲階段,你能夠經過配置刷盤和複製相關的參數,讓消息寫入到多個副本的磁盤上,來確保消息不會由於某個 Broker 宕機或者磁盤損壞而丟失。
  • 在消費階段,你須要在處理徹底部消費業務邏輯以後,再發送消費確認。

你在理解了這幾個階段的原理後,若是再出現丟消息的狀況,應該能夠經過在代碼中加一些日誌的方式,很快定位到是哪一個階段出了問題,而後再進一步深刻分析,快速找到問題緣由。

相關文章
相關標籤/搜索