消息中間件-消息的可靠性傳遞

前言

消息中間件的可靠性消息傳遞,是消息中間件領域很是重要的方案落實問題(在這以前的MQ理論,MQ選型是抽象層次更高的問題,這裏不談)。數據庫

而且這個問題與平常開發是存在較大的關聯的。能夠這麼說,凡是使用了MQ的,機會都要考慮這個問題。固然也有一些原始數據採集,日誌數據收集等應用場景對此沒有太高要求。可是大多數的業務場景,對此仍是有着較高要求的。好比訂單系統,支付系統,消息系統等,你弄丟一條消息,嘿嘿。api

網上對於這方面的博客,大多從單一MQ,或者乾脆就是在論述MQ。我不喜歡這樣的論述,這樣的論述太過侷限,也過於拖沓。服務器

此次,主要從理論方面論證消息的可靠性傳遞的落實。具體技術,都是依據這些理論的,具體實現都差很少。不過爲了便於你們理解,我在文中會以RabbitMq,Kafka這兩個主流MQ稍做舉例。網絡

在平常開發中,我更傾向於在具體開發前,先整理思路,走通理論,再開始編碼。畢竟,若是連理論都走不一樣,還談什麼編碼。併發

另外,我按照消息可靠性層次逐步推動,造成相應的目錄,但願你們喜歡(由於我認爲,相較網上這方面現有博客的目錄,這樣的目錄更合理,更人性化)。框架

概述

這裏簡單談一些有關消息可靠性傳遞的理論。異步

消息傳遞次數

消息在消息系統(生產者+MQ+消費者),其消費的次數,無非一下三種狀況:分佈式

  • 最多一次
  • 最少一次
  • 很少很多一次

消息可靠性層次

這也表明着消息系統的消息可靠性的三個層次:ide

  • 最多一次:上游服務的消息發出了,至於下游能不能收到服務,就無論了。結果就是下游服務,可能根本就沒有接收到消息。
  • 最少一次:上游服務的消息發出了,並經過某些機制,確保下游服務必定收到了該消息。可是收到了幾回,就無論了。結果就是下游服務,可能屢次收到同一條消息。
  • 很少很多一次:上游服務的消息發出了,並確保下游服務必定收到了消息。下游服務經過某些機制,確保屢次收到該消息與單次收到該消息,對其系統狀態的影響是相同的。

方案落實

實現上述三個層次,須要逐步從三個方面考慮:工具

  • 最多一次:會用消息隊列便可,只要確保消息的連通性便可
  • 最少一次:經過MQ提供的確認機制,確保消息的傳遞
  • 很少很多一次:經過外部應用程序,確保消息的單次消費與屢次消費對系統狀態影響是一致的

上述三個層次,對系統的性能損耗,系統複雜度等都是逐步上升的。

固然,咱們首先,須要瞭解這三個層次分別如何實現。
再在實際開發中,根據須要,靈活選取合適方案。

最多一次的消息傳遞

這個方案是最簡單的,只要確保消息系統的正確運做,以及系統的連通性便可。在正常狀況下,能夠保證絕大部分數據的可靠性傳遞。可是仍舊存在極小數據的丟失,而且數據的丟失會由於消息隊列的選擇,以及消息併發量,而受到影響。

優勢

  • 實現簡單。只要搭建對應的MQ服務器,寫出對應的生產者與消費者,以及相應配置,便可正常工做。

缺點

  • 沒法保證數據的可靠性,會存在必定的數據丟失狀況,尤爲是在併發量較大時

實際應用

能夠應用於日誌上傳這樣對消息可靠性要求低的應用場景。

總結

若是數據量不大的狀況下,推薦使用RabbitMQ,其消息可靠性在地數據量下,是最可靠的。可是在達到萬級併發時,會存在消息丟失,丟失的比例能夠達到千分之一。

若是數據量較大的狀況下,要麼採用集羣。要麼就採用Kafk(Kafka可支持十萬級併發)

通常來講,這種消息可靠性多見於項目初建,或相似日誌採集,原始數據採集這樣的特定場景。

最少一次的消息傳遞

這個方案開始利用MQ提供的特定機制,來提升消息傳遞的可靠性。

優勢

  • 不錯的消息可靠性。確保不會出現消息丟失的狀況
  • 實現並不複雜。只須要合理使用MQ的API,設置合理參數(如重試次數)便可

缺點

  • 會出現消息重複消費的狀況
  • 參數的設置須要合理。如重試次數,通常設置爲5次,也可根據狀況,進行調整
  • 資源佔用的提高。如帶寬(每次消息成功生產,消費都須要返回一條數據進行確認)等

方案落實

該方案的實現組成,由如下三個方面構成:

  • 消息的可靠生產
  • 消息的可靠存儲
  • 消息的可靠消費

經過以上三個方面的落實,確保可消息必定被下游服務消費。

消息的可靠生產

消息的可靠生產,是經過回調確認機制,確保消息必定被消息服務器接收。

消息生產,發送給消息服務器後,消息服務器會返回一個確認信息,表示數據正常接收。

若是生產者在必定時間內沒有接收到確認信息,就會觸發重試機制,進行消息的重發。

如RabbitMq的comfirm機制,Kafka的acks機制等。

RabbitMq的confirm機制存在三個模式:

  • 普通模式:channel.waitForConfirms()
  • 批量模式:channel.waitForConfirmsOrDie()
  • 異步模式:channel.addConfirmListener()

這三個模式,看名稱就能夠知道具體做用了。

至於Kafka的acks機制,一樣存在三個模式:

  • acks = 0 :不須要Kafka的任何Partition確認,即確認發送成功(這個之確保消息發送出去了,並不保證消息服務器是否成功接收)
  • acks = 1 :(默認)須要Kafka的Partition Leader確認,即被Kafka的一個Partition(Leader)接收。可是這樣依舊存在極小機率的消息丟失,即Partition Leader獲取了對應消息,並給了acks確認回覆。可是在其餘Partition同步前,Partition Leader宕機,數據丟失。那麼這就形成了消息丟失。
  • acks = all :須要Kafka對應ISR中的所有Partition確認,才確認消息發送成功(固然,這裏假定Kafka是多節點集羣,若是隻有一個分區,那就毫無心義了)。

說到這裏,簡單說一下,上述的操做可能形成消息的重複生產。

最簡單的例子,消息成功發送,可是對應的消息確認信息因爲網絡波動而丟失。那麼生產者就會重複發送該消息,因此消息服務器接收到了兩條相同消息,故產生了消息的重複生產。

另外,上述的重試,都是存在響應時長判斷(超出1min,就認爲數據丟失),以及重試次數限制(超過5次,就不進行重試。不然,大量重試數據可能會拖垮整個服務)。

消息的可靠存儲

消息的可靠存儲,是確保消息在消息服務器通過,或者說堆積時不會由於宕機,網絡等情況,丟失消息。

網上不少博客在論述消息的可靠性傳遞時,經常把這點遺漏。由於他們理所固然地認爲消息隊列已經經過集羣等實現了消息隊列服務的可用性,故消息的可靠性存儲也就實現了。

可是這裏存在兩個問題。第一,可靠性不等於可用性。第二,消息的可靠存儲,做爲消息可靠性傳遞的一部分,是不可缺失的。

可用性:確保服務的可用。即對應的服務,能夠提供服務。

可靠性:確保服務的正確。即對應的服務,提供的是正確的服務。

區別:我瀏覽淘寶,淘寶頁面打不開,這就涉及了可用性問題(可用性計算公式:可用時間/所有時長*100%)。而我瀏覽淘寶,查詢訂單,給我顯示的是別人的訂單,這就涉及了可靠性問題。

另外這裏再糾正一點,可靠性並不依賴於可用性。即便我打不開淘寶頁面,我也不能說淘寶提供訂單查詢就有問題(只是若是沒有了可用性,談論可靠性是很是沒有意義的。畢竟都用不了了,誰還關心其內容是否正確呢,都看不到)

消息隊列的可用性,是經過多個節點構成集羣,避免單點故障,從而提高可用性。

消息隊列的可靠存儲,是經過備份實現(這裏不糾結備份如何確保正確)的。如RabbitMq集羣的MemNode與DiskNode,又或者Kafka的replication機制等。

消息的可靠消費

消息的可靠消費,就是確保消息被消費者獲取,並被成功消費。避免因爲消息丟失,或者消費者宕機而形成消息消費不成功,最終形成消息的丟失(由於RabbitMq服務器在認爲消息被成功消費後,將對應數據刪除或標記爲「已消費」)。

至於消息的可靠消費,核心理念仍是重試,重試,再重試。不過具體的實現就八仙過海,各顯神通了。

這裏分別說一下RabbitMq,Kafka,Rocket三者對於可靠消費的處理:

RabbitMq

提供ack機制。默認是auto,直接在拿到消息時,直接ack。確保了消息到達了消費者,可是沒法解決消費者消費失敗這樣的問題。

實際開發中,爲了確保消息的可靠消費,通常會設置爲munal,只有在程序正確運行後,纔會調用對應api,表示消息正確消費。

Kafka

因爲Kafka的消息是落地到硬盤文件的,並且Kafka的消息分發方式是pull的,因此消息的拉取是經過offset機制去確認對應位置消息的。

固然,Kafka的offset默認是自動提交的(可經過nable_auto_commit與auto_commit_interval_ms控制)。

因此消費者調用服務失敗等緣由,能夠經過手動offset提交,來實現對數據的重複消費(甚至是歷史數據的消費),也就能夠在消費失敗時對同一消息進行再消費。

若是是消費者宕機等緣由,因爲Kafka服務器沒有收到對應的offset提交,因此認爲那條消息沒有被消費成功,故返回的依舊是那條消息。

RocketMq

其實RocketMq的處理有些相似Kafka確認機制+RabbitMq死信隊列的感受。

首先,消費者從RocketMq拉取消息,若是成功消費,就返回確認消息。

若是未成功消費,就嘗試從新消費。

嘗試消費必定次數後(如5次),就會將該消息發送之RocketMq中的重試隊列。

若是遇到消費者宕機的狀況,RocketMq會認爲該消息未成功消費,會被其餘消費者繼續消費。

其實在RabbitMq的可靠性消費時,咱們也會將屢次消費失敗的數據保存下來,便於後期修復等。不過保存的方式由不少種,日誌,數據庫,消息隊列等。而RocketMq則給出了具體的落實方案。

上述的操做,可能形成消息的重複消費。

最簡單的例子,消息成功被消費者消費,可是消費者還沒來得及發送確認信息,就宕機了。

消息隊列因爲沒有收到確認消息,認爲該條消息還沒有被消息,就將該消息交由其餘消費者繼續消費。

很少很多一次的消息傳遞

這個方案,就是經過MQ之外的應用程序,來進行擴展,最終達到消息準確消費的目的。

那麼爲何不將這個功能,囊括在MQ中呢?

我的認爲有四個方面的考慮:

  • 消息中間件,應該明確其功能域,而消息生產與消息消費每每涉及業務,因此避免與業務的耦合。因此消息中間件只完善了可靠存儲。
  • 準確消費,每每涉及MQ之外的部分,須要其餘部分的配合。就相似與XA接口同樣。這樣會帶來編碼的約束,系統的耦合性等。
  • 準確消費的實現能夠經過一個工具,模塊去實現,可是不應硬編碼。畢竟現有的處理方案並不必定就是最優解(尤爲是在調控中心,TCC框架展示的如今)。
  • 性能影響。爲了一個不通用的功能,會帶來消息中間件的性能大幅降低

優點

  • 確保消息被準確消費(很少很多一次)

缺點

  • 實現複雜(生產者與消費者都須要創建對應數據庫)
  • 須要創建對應規範(可是通用規範肯定後,實現就會變得快速)
  • 資源佔用的提高。如帶寬(每次消息成功生產,消費都須要返回一條數據進行確認)等

存在的問題

消息存儲部分的準確存儲,不應咱們來操心,因此只闡述消息生產與消息消費兩個部分。

消息的重複生產

  • 消息發給了消息隊列服務器,消息隊列服務器的確認信息因爲網絡波動等,沒有及時到達生產者
  • 消息發送給了消息隊列服務器,生產者在接收消息前,宕機
  • 消息發送給了消息隊列服務器,生產者在接收消息後,還沒來得及進行確認邏輯,宕機

綜上來看,就是消息發出後,到生產者消息確認信息的處理之間,出現各類意外,致使重複生產。

消息的重複消費

  • 消息已經被消費,消費者還沒來得及發送確認信息,就宕機了
  • 消息已經被消費,消費者發出確認信息,確認信息因爲網絡波動等,沒有及時到達消息隊列服務器
  • 消息已經被消費,消費者發出確認信息,消息隊列服務器對應實例在接收到確認信息前,宕機
  • 消息已經被消費,消費者發出確認信息,消息隊列服務器接受到了確認信息,還沒來得及進行確認邏輯,宕機

綜上來看,就是消息已經被消費後,到消息隊列服務器進行確認消息處理之間,出現各類意外,致使重複消費。

解決方案

解決方案:messageId+冪等

準確來講,解決方案的核心是冪等,而messageId是做爲輔助手段的。

冪等

冪等,簡單說明一下,就是屢次操做與單次操做對系統狀態的影響是一致的。

i = 1;

就是冪等操做,由於不管進行幾回,i的值都沒有變化。

i++;

則不是冪等操做,由於i的值與執行次數息息相關。

故經過冪等操做來確保同一條消息,不被執行屢次。

messageId

可是,消費者如何肯定是否爲同一條消息呢?

有的消息體存在惟一性字段,如orderId等。但有的消息並無這樣的惟一性字段。

因此須要一個專門的字段,來表示惟一性,而且與業務消息解耦。這就是messageId。

既能夠採用消息體的惟一性字段(能夠是單一字段,也能夠是組合字段),也能夠經過特定方式生成對應標識。

具體的生成狀況,就不在這裏贅述了。

方案落實

先來一張大圖(這種事情,圖片展現最直觀了),展現一下流程:

消息中間件-消息的可靠性傳遞

(圖片是絕對清晰的。看不清圖片的朋友,請將圖片在新頁面打開,或下載。說實話,來到新公司,首先提高的就是畫圖能力。囧)

簡單說一下流程,你們能夠對照着上圖,看一下:

生產者到消息中間件服務器

  1. 生產者根據須要發送的消息,生成對應messageId。並封裝對應message至生產者數據庫(該操做應該利用事務性,確保生產者事件處理與message保存至數據庫的原子性),同時標註message狀態爲sending(發送中狀態)
  2. 將對應message發送給消息隊列服務器
  3. 若是沒有收到生產確認信息,則從新發送message(若是這個時候遇到生產者實例宕機,也不用擔憂。由於後續會有補償程序,進行補償重發操做)
  4. 當收到消息中間件服務器的消息生產確認消息(即肯定消息已經達到消息中間件服務器),將數據庫中對應message的狀態修改成sended(已發送狀態)

上述中提到的補償機制,實際上是相似事務中的一個操做。經過一個定時任務,定時巡檢數據庫處於sending狀態的message,並經過生產者極性發送(因此message通常都保存source,target等信息)。

之因此會有sending狀態的message,就是由於存在生產者消息發送出去了,還沒收到生產確認信息,結果生產者實例本身宕機的狀況。

至於補償機制的定時任務,是一個很是簡單的實現,這裏就再也不贅述了。

消息中間件到消費者

這裏進行的操做是針對非冪等的操做。

若是是冪等操做,則能夠直接進行。畢竟屢次執行與單次執行對數據庫的影響是一致的。

可是注意冪等操做在部分場景下無效的問題(時間影響上),如「餘額 = 1k」的操做對於數據庫而言是冪等的,可是在兩次「餘額 = 1k」操做間,有一個「餘額 = 2k」的操做,則會發生問題(丟失了「餘額 = 2k」操做)。固然,這種相似ABA問題,徹底能夠引入版本號,來進行解決。

綜上,仍是推薦採用如下解決方法,流程較爲簡單:

  1. 消費者獲取數據
  2. 消費者判斷數據庫是否有對應message
  3. 若是存在對應message,則放棄執行(由於這是一個重複操做)
  4. 若是不存在,則進行相關消息處理。並經過事務控制,在消費者數據庫中添加message(確保消息的處理與數據庫添加message是原子操做)

至此,消息的準確傳遞就完成了。

總結

消息可靠性傳遞的發展過程,也體現了人們對消息中間件功能的一步步追求,更是體現了工程師們解決問題的思路。

不少時候,咱們會遇到不少問題,甚至使人感到雜亂不堪,無從下手。這個時候,最好的辦法就是靜下心來,對它們進行劃分(按照重要程度,緊迫度,實現難度),再進行一個長期規劃,一步步來解決。每每這個時候,動動筆,在筆記上列下清單,會是一個不錯的辦法。

其中消息的準確傳遞,涉及一些事務相關的內容。也許有人已經聯想到,消息隊列是否能夠做爲分佈式事務的一種手段呢?我會在以後的博客中,來闡述分佈式事務這一重要主題。

相關文章
相關標籤/搜索