「查缺補漏」鞏固你的RocketMQ知識體系

Windows安裝部署

下載

地址:[https://www.apache.org/dyn/cl...]java

選擇‘Binary’進行下載git

解壓已下載工程github

配置

新增系統變量
ROCKETMQ_HOME -> F:RocketMQrocketmq-4.5.2

JAVA_HOME -> F:Java_JDKJDK1.8算法

Path 系統變量新增:Maven/bin目錄數據庫

PS:RocketMQ 消息存儲在C:UsersAdministratorstore store目錄中 文件佔用較大,注意刪除沒必要要的內容apache

啓動

start mqnamesrv.cmd

start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true緩存

Rocket集成可視化監控插件

  1. 任意目錄(拉取項目,隨便哪裏都行)git clone https://github.com/apache/roc...
  2. 進入‘rocketmq-externalsrocketmq-consolesrcmainresources’文件夾,打開‘application.properties’進行配置
  3. 其實就是一個SpringBoot服務,肯定好端口,別重複便可

    server.port=8100服務器

    rocketmq.config.namesrvAddr=127.0.0.1:9876網絡

  4. 進入‘rocketmq-externalsrocketmq-console’文件夾

    執行‘mvn clean package -Dmaven.test.skip=true’,編譯生成target數據結構

    java -jar rocketmq-console-ng-1.0.1.jar

  5. 根據配置地址訪問: http://127.0.0.1:8100

Rocket可視化監控插件 增長Topic | 自動增長Topic(4.5.2版本)

4.5.2 版本 支持自動建立Topic

4.3.0 版本 必須經過監控程序配置Topic,不然執行程序報錯,沒有此路由

SpringBoot集成 RocketMQ

<!--RocketMQ-->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.2</version>
</dependency>

RocketMQ基本概念

<br/>

概覽

基於RocketMQ的分佈式系統,通常能夠分爲四個集羣:Name server、broker、producer、consumer

  1. name server

    • 提供輕量級的服務發現和路由服務;
    • 每一個節點都存放了所有的路由信息和對應的讀寫服務;
    • 存儲支持水平擴展
  2. broker

    • 提供知足TOPIC和QUEUE機制的消息存儲服務;
    • 有推和拉兩種模式;
    • 經過2或3拷貝實現高可用;
    • 提供上億消息的堆積能力;
    • 提供故障恢復、統計功能和告警功能;
  3. producer

    • 支持分佈式部署,經過負載平衡模塊給broker發消息
    • 支持快速失敗
    • 低延遲
  4. consumer

    1. 支持推和拉兩種模式
    2. 支持集羣消費和廣播消費

<br/>

核心模塊

<br/>

Name Server

提供Broker管理;Routing管理(路由管理)

NameServer,不少時候稱爲命名發現服務,其在RocketMQ中起着中轉承接的做用,是一個無狀態的服務,多個NameServer之間不通訊。任何Producer,Consumer,Broker與全部NameServer通訊,向NameServer請求或者發送數據。並且都是單向的,Producer和Consumer請求數據,Broker發送數據。正是由於這種單向的通訊,RocketMQ水平擴容變得很容易

  • 提供輕量級的服務發現和路由服務;
  • 每一個節點都存放了所有的路由信息和對應的讀寫服務;
  • 存儲支持水平擴展

總結:相比於ZooKeeper提供的分佈式鎖,發佈和訂閱,數據一致性,選舉等,在RocketMQ是不適用的,所以重寫了一套更加輕量級的發現服務,主要用以存儲 Broker相關信息以及當前Broker上的topic信息,路由信息等

Broker Server

提供Remoting Module、客戶端管理、存儲服務、HA服務(主從)、索引服務
  • 提供知足TOPIC和QUEUE機制的消息存儲服務;
  • 有推和拉兩種模式;
  • 經過2或3拷貝實現高可用;
  • 提供上億消息的堆積能力;
  • 提供故障恢復、統計功能和告警功能;

producer

  • 支持分佈式部署,經過負載平衡模塊給broker發消息
  • 支持快速失敗
  • 低延遲

consumer

  • 支持推和拉兩種模式
  • 支持集羣消費和廣播消費

<br/>

核心角色介紹

<br/>

生產者

生產者發送業務系統產生的消息給broker, RocketMQ提供了多種發送方式:同步的、異步的、單向的

<br/>

生產者組

具備相同角色的生產者被分到一組, 假如原始的生產者在事務後崩潰,broker會聯繫 同一輩子產者組中的不一樣生產者實例,繼續提交或回滾事務

<br/>

消費者

一個消費者從broker拉取信息,並將信息返還給應用。爲了咱們應用的正確性,提供了兩種消費者類型:

拉式消費者:拉式消費者從broker拉取消息,一旦一批消息被拉取,用戶應用系統將發起消費過程。

推式消費者:推式消費者,從另外一方面講,囊括了消息的拉取、消費過程,並保持了內部的其餘工做,留下了一個回調 接口給終端用戶去實現,實如今消息到達時要執行的內容。

<br/>

消費者組

具備相同角色的消費者被組在一塊兒,稱爲消費者組,它完成了負載均衡和容錯的目標

一個消費組中的消費者實例必須有肯定的相同的訂閱topic

<br/>

Topic(主題)

Topic是一個消息的目錄,在這個目錄中,生產者傳送消息,消費者拉取消息,能夠多個消費者訂閱同一個topic,一個生產者也能夠發送多個topic

PS:RocketMQ 基於發佈訂閱模式,發佈訂閱的核心即 Topic 主題

<br/>

Message(消息)

消息是被傳遞的信息。一個消息必須有一個Topic,它能夠理解爲信件上的地址。一個消息也能夠有一個可選的tag,和額外的key-value對。 例如:你能夠設置業務中的鍵到你的消息中,在broker服務中查找消息,以便在開發期間診斷問題

<br/>

消息隊列

Topic被分割成一個或多個消息隊列。隊列分爲3中角色:異步主、同步主、從。若是你不能容忍消息丟失,咱們建議你部署同步主,並加一個從隊列。 若是你容忍丟失,但你但願隊列老是可用,你能夠部署異步主和從隊列。若是你想最簡單,你只須要一個異步主,不須要從隊列。 消息保存磁盤的方式也有兩種,推薦使用的是異步保存,同步保存是昂貴的並會致使性能損失,若是你想要可靠性,咱們推薦你使用同步主+從的方式。

<br/>

Tag(標籤)

標籤,用另一個詞來講,就是子主題,爲用戶提供額外的靈活性。具備相同Topic的消息能夠有不一樣的tag。

<br/>

Broker(隊列)

Broker是RocketMQ的一個主要組件,它接收生產者發送的消息,存儲它們並準備處理消費者的拉取請求。它也存儲消息相關的元數據, 包括消費組,消費成功的偏移量,主題、隊列的信息。

<br/>

名稱服務

名稱服務主要提供路由信息。生產者/消費者客戶端尋找topic,並找到通訊的隊列列表。

<br/>

消息的順序

DefaultMQPushConsumer被使用,你就要決定消費消息時,是順序消費仍是同時消費

  • 順序消費

  順序消費消息的意思是 消息將按照生產者發送到隊列時的順序被消費掉。若是你被強制要求使用全局的順序,你要確保你的topic只有一個消息隊列。

若是指定順序消費,消息被同時消費的數量就是訂閱這個topic的消費組的數量。

  • 同時消費

  當同時消費消息時,消息同時消費的最大數量取決於消費客戶端指定的線程池的大小。

<br/>

最佳實踐

Producer最佳實踐
  1. 一個應用盡量用一個 Topic,消息子類型用 tags 來標識,tags 能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用 tags 在 broker 作消息過濾。
  2. 每一個消息在業務層面的惟一標識碼,要設置到 keys 字段,方便未來定位消息丟失問題。因爲是哈希索引,請務必保證 key 儘量惟一,這樣能夠避免潛在的哈希衝突。

    消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。

  3. 對於消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。
  4. 某些應用若是不關注消息是否發送成功,請直接使用sendOneWay方法發送消息。
Consumer最佳實踐
  1. 消費過程要作到冪等(即消費端去重)
  2. 儘可能使用批量方式消費方式,能夠很大程度上提升消費吞吐量。
  3. 優化每條消息消費過程

MQ核心問題

<br/>

1.消息隊列適合解決的問題

解決的核心問題主要是:異步、解耦、削峯

可是引入消息隊列也會有不少額外的問題,好比系統複雜性會大大增長,同時須要解決重複下發,重複消費,消費順序,消息丟失,重試機制等等問題,所以不能濫用,合適的場景用合適的技術

<br/>

2.消息模型:主題和隊列的區別

1、消息隊列的演進

一、初始階段

最初的消息隊列,就是一個嚴格意義上的隊列。隊列是一種數據結構,先進先出,在消息入隊出隊過程當中,保證這些消息嚴格有序。早期的消息隊列就是按照「隊列」的數據結構設計的

隊列模型:

生產者(Producer)發消息就是入隊操做,消費者(Consumer)收消息就是出隊也就是刪除操做,服務端存放消息的容器天然就稱爲「隊列」。

  • 若是有多個生產者往同一個隊列裏面發送消息,這個隊列中能夠消費到的消息,就是這些生產者生產的全部消息的合集。消息的順序就是這些生產者發送消息的天然順序
  • 若是有多個消費者接收同一個隊列的消息,這些消費者之間其實是競爭的關係,每一個消費者只能收到隊列中的一部分消息,也就是說任何一條消息只能被其中的一個消費者收到

二、發佈 - 訂閱模型階段

若是須要將一份消息數據分發給多個消費者,要求每一個消費者都能收到全量的消息,例如,對於一份訂單數據,風控系統、分析系統、支付系統等都須要接收消息。

這個時候,單個隊列就知足不了需求,一個可行的解決方式是,爲每一個消費者建立一個單獨的隊列,讓生產者發送多份。可是一樣的一份消息數據被複制到多個隊列中會浪費資源,更重要的是,生產者必須知道有多少個消費者。爲每一個消費者單獨發送一份消息,這實際上違背了消息隊列「解耦」這個設計初衷。

爲了解決這個問題,演化出了另一種消息模型:發佈 - 訂閱模型(Publish-Subscribe Pattern)

消息的發送方稱爲發佈者(Publisher),消息的接收方稱爲訂閱者(Subscriber),服務端存放消息的容器稱爲主題(Topic)。

  • 發佈者將消息發送到主題中,訂閱者在接收消息以前須要先「訂閱主題」。
  • 每份訂閱中,訂閱者均可以接收到主題的全部消息。

三、總結:

  • 在很長的一段時間,隊列模式和發佈 - 訂閱模式是並存的。
  • 有些消息隊列同時支持這兩種消息模型,好比 ActiveMQ。
  • 對比這兩種模型,生產者就是發佈者,消費者就是訂閱者,隊列就是主題,並無本質的區別。它們最大的區別是:一份消息數據能不能被消費屢次的問題
  • 實際上,在這種發佈 - 訂閱模型中,若是隻有一個訂閱者,那它和隊列模型就基本是同樣的了。也就是說,發佈 - 訂閱模型在功能層面上是能夠兼容隊列模型的。

2、RabbitMQ 的消息模型

少數依然堅持使用隊列模型的產品之一。

RabbitMQ 使用 Exchange 模塊解決多個消費者的問題。Exchange 位於生產者和隊列之間,生產者並不關心將消息發送給哪一個隊列,而是將消息發送給 Exchange,由 Exchange 上配置的策略來決定將消息投遞到哪些隊列中。

  • 同一份消息若是須要被多個消費者來消費,須要配置 Exchange 將消息發送到多個隊列,每一個隊列中都存放一份完整的消息數據,能夠爲一個消費者提供消費服務。

3、RocketMQ 的消息模型

RocketMQ 使用的消息模型是標準的發佈 - 訂閱模型。在 RocketMQ 也有隊列(Queue)這個概念。

消息隊列的消費機制:

幾乎全部的消息隊列產品都使用一種很是樸素的「請求 - 確認」機制,確保消息不會在傳遞過程當中因爲網絡或服務器故障丟失。

在生產端,生產者先將消息發送給服務端,也就是 Broker,服務端在收到消息並將消息寫入主題或者隊列中後,會給生產者發送確認的響應。若是生產者沒有收到服務端的確認或者收到失敗的響應,則會從新發送消息

在消費端,消費者在收到消息並完成本身的消費業務邏輯(好比,將數據保存到數據庫中)後,也會給服務端發送消費成功的確認,服務端只有收到消費確認後,才認爲一條消息被成功消費,不然它會給消費者從新發送這條消息,直到收到對應的消費成功確認。

這個確認機制很好地保證了消息傳遞過程當中的可靠性,可是,引入這個機制在消費端帶來了一個問題:爲了確保消息的有序性,在某一條消息被成功消費以前,下一條消息是不能被消費的,也就是說,每一個主題在任意時刻,至多隻能有一個消費者實例在進行消費,那就無法經過水平擴展消費者的數量來提高消費端整體的消費性能

爲了解決這個問題,RocketMQ 在主題下面增長了隊列的概念:

  • 每一個主題包含多個隊列,經過多個隊列來實現多實例並行生產和消費。須要注意的是,RocketMQ 只在隊列上保證消息的有序性,主題層面是沒法保證消息的嚴格順序的。
  • 生產者會往全部隊列發消息,但不是「同一條消息每一個隊列都發一次」,每條消息只會往某個隊列裏面發送一次
  • 一個消費組,每一個隊列上只能串行消費,多個隊列加一塊兒就是並行消費了,並行度就是隊列數量,隊列數量越多並行度越大,因此水平擴展能夠提高消費性能。
  • 每隊列每消費組維護一個消費位置(offset),記錄這個消費組在這個隊列上消費到哪兒了。
  • 訂閱者是經過消費組(Consumer Group)來體現的。每一個消費組都消費主題中一份完整的消息,不一樣消費組之間消費進度彼此不受影響,也就是說,一條消息被 Consumer Group1 消費過,也會再給 Consumer Group2 消費。
  • 消費組中包含多個消費者,同一個組內的消費者是競爭消費的關係,每一個消費者負責消費組內的一部分消息。若是一條消息被消費者 Consumer1 消費了,那同組的其餘消費者就不會再收到這條消息。
  • 因爲消息須要被不一樣的組進行屢次消費,因此消費完的消息並不會當即被刪除,這就須要 RocketMQ 爲每一個消費組在每一個隊列上維護一個消費位置(Consumer Offset),這個位置以前的消息都被消費過,以後的消息都沒有被消費過,每成功消費一條消息,消費位置就加一。咱們在使用消息隊列的時候,丟消息的緣由大可能是因爲消費位置處理不當致使的

4、Kafka 的消息模型

Kafka 的消息模型和 RocketMQ 是徹底同樣的,惟一的區別是,在 Kafka 中,隊列這個概念的名稱不同,Kafka 中對應的名稱是「分區(Partition)」,含義和功能是沒有任何區別的。

5、總結

  • 經常使用的消息隊列中,RabbitMQ 採用的是隊列模型,可是它同樣能夠實現發佈 - 訂閱的功能。RocketMQ 和 Kafka 採用的是發佈 - 訂閱模型,而且兩者的消息模型是基本一致的。

<br/>

3.消息丟失怎麼辦? 如何保證消息的可靠性傳輸?

首先如何驗證消息是否丟失?

  • 若是是 IT 基礎設施比較完善的公司,通常都有分佈式鏈路追蹤系統,使用相似的追蹤系統能夠很方便地追蹤每一條消息。
  • 若是沒有這樣的追蹤系統,咱們能夠利用消息隊列的有序性來驗證是否有消息丟失

即保證消息消費順序的狀況下,根據消息的序號,在消費段判斷是否連續

解決方案:

消息從生產到消費的過程當中,能夠劃分三個階段:

一、生產階段

消息隊列經過最經常使用的請求確認機制,來保證消息的可靠傳遞:當你代碼調用發消息方法時,消息隊列客戶端會把消息發送到Broker,Broker收到消息後,會給客戶端返回一個確認響應,代表消息已收到。客戶端收到響應後,完成了一次正常消息的發送。

有些消息隊列在長時間沒收到發送確認響應後,會自動重試,若是重試失敗,就會以返回值或者異常的方式告知用戶。在編寫發送消息的代碼時,須要注意,正確處理返回值或者捕獲異常,就能夠保證這個階段的消息不會丟失。

同步發送時,只要注意捕獲異常便可。

異步發送時,則須要在回調方法裏進行檢查。這個地方須要特別注意,不少丟消息的緣由就是,咱們使用了異步發送,卻沒有在回調中檢查發送結果。

二、存儲階段

在存儲階段正常狀況下,只要Broker在正常運行,就不會出現丟消息的問題;可是若是Broker出現故障,好比進程死掉或者服務器宕機,仍是可能會丟失消息的。

若是對消息的可靠性要求很是高,能夠經過配置Broker參數來避免由於宕機丟消息:

  • 對於單個節點的 Broker,須要配置 Broker 參數,在收到消息後,將消息寫入磁盤後再給 Producer 返回確認響應,這樣即便發生宕機,因爲消息已經被寫入磁盤,就不會丟失消息,恢復後還能夠繼續消費。例如,在 RocketMQ 中,須要將刷盤方式 flushDiskType 配置爲 SYNC_FLUSH 同步刷盤。
  • 對於 Broker 是由多個節點組成的集羣,須要將 Broker 集羣配置成:至少將消息發送到 2 個以上的節點,再給客戶端回覆發送確認響應。這樣當某個 Broker 宕機時,其餘的 Broker 能夠替代宕機的 Broker,也不會發生消息丟失。

三、消息階段

消費階段採用和生產階段相似的確認機制來保證消息的可靠傳遞,客戶端從 Broker 拉取消息後,執行用戶的消費業務邏輯,成功後,纔會給 Broker 發送消費確認響應。若是 Broker 沒有收到消費確認響應,下次拉消息的時候還會返回同一條消息,確保消息不會在網絡傳輸過程當中丟失,也不會由於客戶端在執行消費邏輯中出錯致使丟失。

在編寫消費代碼時須要注意的是:不要在收到消息後就當即發送消費確認,而是應該在執行完全部消費業務邏輯以後,再發送消費確認

<br/>

4.處理消費過程當中的重複消息

在消息傳遞過程當中,若是出現傳遞失敗的狀況,發送方會執行重試,重試過程當中就有可能產生重複的消息。若是沒有對重複消息進行處理,就可能致使系統的數據出現錯誤。

好比,一個消費訂單消息,統計下單金額的微服務,若是沒有正確處理重複消息,那就會出現重複統計,致使統計結果錯誤。

1、消息重複的狀況必然存在

在MQTT協議中,給出了三種傳遞消息時可以提供的服務質量標準:

  • At most once:至多一次。最多會被送達一次,也就是說沒有消息可靠性保證,容許丟消息。通常都是一些對消息可靠性要求不高的監控場景使用,好比每分鐘上報一次機房溫度數據,能夠接受數據少許丟失。
  • At least once:至少一次。至少會被送達一次,也就是說不容許丟消息,可是容許有少許重複消息出現
  • Exactly once:剛好一次。只會被送達一次,不容許丟失也不容許重複,這個是最高等級。

這個服務質量標準不只適用於 MQTT,對全部的消息隊列都是適用的。經常使用的絕大部分消息隊列提供的服務質量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 。也就是說,消息隊列很難保證消息不重複。

注意:Kafka 支持的「Exactly once」和咱們剛剛提到的消息傳遞的服務質量標準「Exactly once」是不同的,它是 Kafka 提供的另一個特性,Kafka 中支持的事務也和咱們一般意義理解的事務有必定的差別。在 Kafka 中,事務和 Excactly once 主要是爲了配合流計算使用的特性。

2、用冪等性解決重複消息問題

冪等原本是一個數學上的概念,它的定義是:若是一個函數f(x)知足:f(f(x)) = f(x),則函數f(x)知足米冪等性。擴展到計算機領域,被用來描述一個操做、方法或者服務。

  • 一個冪等操做的特色是,其任意屢次執行所產生的影響均與一次執行的影響相同
  • 一個冪等方法,使用一樣的參數,對它進行屢次調用和一次調用,對系統產生的影響是同樣的。因此不用擔憂重複執行會對系統形成任何改變。

舉例:

一、在不考慮併發的狀況下,「將帳戶 X 的餘額設置爲 100 元」,執行一次後對系統的影響是,帳戶 X 的餘額變成了 100 元。只要提供的參數 100 元不變,那即便再執行多少次,帳戶 X 的餘額始終都是 100 元,不會變化,這個操做就是一個冪等的操做。

二、「將帳戶 X 的餘額加 100 元」,這個操做它就不是冪等的,每執行一次,帳戶餘額就會增長 100 元,執行屢次和執行一次對系統的影響(也就是帳戶的餘額)是不同的。

若是消費消息的業務邏輯具有冪等性,那就不用擔憂消息重複的問題,由於同一條消息,消費一次和消費屢次對系統的影響是徹底同樣的。消費屢次等於消費一次。從對系統的影響結果來講:At least once + 冪等消費 = Exactly once。

實現冪等操做最好的方式是,從業務邏輯設計上入手,將消費的業務邏輯設計成具有冪等性的操做

經常使用的設計冪等操做的方法

(1)利用數據庫的惟一約束實現冪等

上面提到的那個不具有冪等特性的轉帳的例子:將帳戶 X 的餘額加 100 元。在這個例子中,咱們能夠經過改造業務邏輯,讓它具有冪等性。

首先,咱們能夠限定,對於每一個轉帳單每一個帳戶只能夠執行一次變動操做,在分佈式系統中,這個限制實現的方法很是多,最簡單的是咱們在數據庫中建一張轉帳流水錶,這個表有三個字段:轉帳單 ID、帳戶 ID 和變動金額,而後給轉帳單 ID 和帳戶 ID 這兩個字段聯合起來建立一個惟一約束,這樣對於相同的轉帳單 ID 和帳戶 ID,表裏至多隻能存在一條記錄。

這樣,咱們消費消息的邏輯能夠變爲:「在轉帳流水錶中增長一條轉帳記錄,而後再根據轉帳記錄,異步操做更新用戶餘額便可。」在轉帳流水錶增長一條轉帳記錄這個操做中,因爲咱們在這個表中預先定義了「帳戶 ID 轉帳單 ID」的惟一約束,對於同一個轉帳單同一個帳戶只能插入一條記錄,後續重複的插入操做都會失敗,這樣就實現了一個冪等的操做。

基於這個思路,不光是可使用關係型數據庫,只要是支持相似「INSERT IF NOT EXIST」語義的存儲類系統均可以用於實現冪等,好比,你能夠用 Redis 的 SETNX 命令來替代數據庫中的惟一約束,來實現冪等消費。

(2)爲更新的數據設置前置條件

給數據變動設置一個前置條件,若是知足條件就更新數據,不然拒絕更新數據,在更新數據的時候,同時變動前置條件中須要判斷的數據。這樣,重複執行這個操做時,因爲第一次更新數據的時候已經變動了前置條件中須要判斷的數據,不知足前置條件,則不會重複執行更新數據操做。

好比,「將帳戶 X 的餘額增長 100 元」這個操做並不知足冪等性,咱們能夠把這個操做加上一個前置條件,變爲:「若是帳戶 X 當前的餘額爲 500 元,將餘額加 100 元」,這個操做就具有了冪等性。對應到消息隊列中的使用時,能夠在發消息時在消息體中帶上當前的餘額,在消費的時候進行判斷數據庫中,當前餘額是否與消息中的餘額相等,只有相等才執行變動操做。

可是,若是咱們要更新的數據不是數值,或者咱們要作一個比較複雜的更新操做怎麼辦?用什麼做爲前置判斷條件呢?更加通用的方法是,給你的數據增長一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致,若是不一致就拒絕更新數據,更新數據的同時將版本號 +1,同樣能夠實現冪等更新。

(3)記錄並檢查操做

若是上面提到的兩種實現冪等方法都不能適用於你的場景,還有一種通用性最強,適用範圍最廣的實現冪等性方法:記錄並檢查操做,也稱爲「Token 機制或者 GUID(全局惟一 ID)機制」,實現的思路特別簡單:在執行數據更新操做以前,先檢查一下是否執行過這個更新操做。這種方法適用範圍最廣,可是實現難度和複雜度也比較高,通常不推薦使用

具體的實現方法是,在發送消息時,給每條消息指定一個全局惟一的 ID,消費時,先根據這個 ID 檢查這條消息是否有被消費過,若是沒有消費過,才更新數據,而後將消費狀態置爲已消費

在分佈式系統中,這個方法實際上是很是難實現的。首先,給每一個消息指定一個全局惟一的 ID 就是一件不那麼簡單的事兒,方法有不少,但都不太好同時知足簡單、高可用和高性能,或多或少都要有些犧牲。更加麻煩的是,在「檢查消費狀態,而後更新數據而且設置消費狀態」中,三個操做必須做爲一組操做保證原子性,才能真正實現冪等,不然就會出現 Bug。

好比說,對於同一條消息:「全局 ID 爲 8,操做爲:給 ID 爲 666 帳戶增長 100 元」,有可能出現這樣的狀況:

  • t0 時刻:Consumer A 收到條消息,檢查消息執行狀態,發現消息未處理過,開始執行「帳戶增長 100 元」;
  • t1 時刻:Consumer B 收到條消息,檢查消息執行狀態,發現消息未處理過,由於這個時刻,Consumer A 還將來得及更新消息執行狀態。

這樣就會致使帳戶被錯誤地增長了兩次 100 元,這是一個在分佈式系統中很是容易犯的錯誤,必定要引覺得戒。對於這個問題,固然咱們能夠用事務來實現,也能夠用鎖來實現,可是在分佈式系統中,不管是分佈式事務仍是分佈式鎖都是比較難解決問題。

<br/>

5.利用事務消息實現分佈式事務

1、消息事務

其實不少場景下,咱們「發消息」這個過程,目的每每是通知另一個系統或者模塊去更新數據,消息隊列中的「事務」,主要解決消息生產者和消息消費者的數據一致性問題

用戶在電商APP上購物時,先把商品加到購物車裏,而後幾件商品一塊兒下單,最後支付,完成購物流程。

這個過程當中有一個須要用到消息隊列的步驟,訂單系統建立訂單後,發消息給購物車系統,將已下單的商品從購物車中刪除。由於從購物車刪除已下單商品這個步驟,並非用戶下單支付這個主要流程中必要的步驟,使用消息隊列來異步清理購物車是更加合理。

對於訂單系統,它建立訂單的過程實際執行了2個步驟的操做:

  • 在訂單庫中插入一條訂單數據,建立訂單;
  • 發消息給消息隊列,消息的內容就是剛剛建立的訂單

對於購物車系統:

  • 訂閱相應的主題,接收訂單建立的消息,而後清理購物車,在購物車中刪除訂單的商品。

在分佈式系統中,上面提到的步驟,任何一個都有可能失敗,若是不作任何處理,那就有可能出現訂單數據與購物車數據不一致的狀況,好比:

  • 建立了訂單,沒有清理購物車;
  • 訂單沒建立成功,購物車裏面的商品卻被清掉了。

因此咱們須要解決的問題爲:在上述任意步驟都有可能失敗的狀況下,還要保證訂單庫和購物車庫這兩個庫的數據一致性。

2、分佈式事務

分佈式事務就是要在分佈式系統中實現事務。在分佈式系統中,在保證可用性和不嚴重犧牲性能的前提下,光是要實現數據的一致性就已經很是困難了,顯然實現嚴格的分佈式事務是更加不可能完成的任務。因此目前你們所說的分佈式事務,更多狀況下,是在分佈式系統中事務的不完整實現,在不一樣的應用場景中,有不一樣的實現,目的都是經過一些妥協來解決實際問題。

常見的分佈式事務實現:

  • 2PC(Two-phase Commit,也叫二階段提交)
  • TCC(Try-Confirm-Cancel)
  • 事務消息

每一種實現都有其特定的使用場景,也有各自的問題,都不是完美的解決方案。

事務消息適用的場景主要是那些須要異步更新數據,而且對數據實時性要求不過高的場景。好比在建立訂單後,若是出現短暫的幾秒,購物車裏的商品沒有被及時狀況,也不是徹底不可接受的,只要最終購物車的數據和訂單數據保持一致就可。

3、消息隊列實現分佈式事務

事務消息須要消息隊列提供相應的功能才能實現,kafka和RocketMQ都提供了事務相關功能。

對於訂單系統:

  • 首先,訂單系統在消息隊列上開啓一個事務。
  • 而後訂單系統給消息服務器發送一個「半消息」,這個半消息不是說消息內容不完整,它包含的內容就是完整的消息內容,半消息和普通消息的惟一區別是,在事務提交以前,對於消費者來講,這個消息是不可見的。
  • 半消息發送成功後,訂單系統就能夠執行本地事務了,在訂單庫中建立一條訂單記錄,並提交訂單庫的數據庫事務。
  • 而後根據本地事務的執行結果決定提交或者回滾事務消息。若是訂單建立成功,那就提交事務消息,購物車系統就能夠消費到這條消息繼續後續的流程。若是訂單建立失敗,那就回滾事務消息,購物車系統就不會收到這條消息。這樣就基本實現了「要麼都成功,要麼都失敗」的一致性要求。

對於購物車系統:

  • 對於購物車系統收到訂單建立成功消息清理購物車這個操做來講,失敗的處理比較簡單,只要成功執行購物車清理後再提交消費確認便可,若是失敗,因爲沒有提交消費確認,消息隊列會自動重試

若是在第四步提交事務消息時失敗了怎麼辦?Kafka 和 RocketMQ 給出了 2 種不一樣的解決方案:

一、Kafka 的解決方案:

直接拋出異常,讓用戶自行處理。咱們能夠在業務代碼中反覆重試提交,直到提交成功,或者刪除以前建立的訂單進行補償。

二、RocketMQ 的解決方案:

在 RocketMQ 中的事務實現中,增長了事務反查的機制來解決事務消息提交失敗的問題。若是 Producer 也就是訂單系統,在提交或者回滾事務消息時發生網絡異常,RocketMQ 的 Broker 沒有收到提交或者回滾的請求,Broker 會按期去 Producer 上反查這個事務對應的本地事務的狀態,而後根據反查結果決定提交或者回滾這個事務。爲了支撐這個事務反查機制,咱們的業務代碼須要實現一個反查本地事務狀態的接口,告知 RocketMQ 本地事務是成功仍是失敗。

綜合上面講的通用事務消息的實現和 RocketMQ 的事務反查機制,使用 RocketMQ 事務消息功能實現分佈式事務的流程以下圖:

<br/>

6.消息隊列中的順序問題

當咱們說順序時,咱們在說什麼?

平常思惟中,順序大部分狀況會和時間關聯起來,即時間的前後表示事件的順序關係。

好比事件A發生在下午3點一刻,而事件B發生在下午4點,那麼咱們認爲事件A發生在事件B以前,他們的順序關係爲先A後B。

上面的例子之因此成立是由於他們有相同的參考系,即他們的時間是對應的同一個物理時鐘的時間。若是A發生的時間是北京時間,而B依賴的時間是東京時間,那麼先A後B的順序關係還成立嗎?

若是沒有一個絕對的時間參考,那麼A和B之間還有順序嗎,或者說怎麼判定A和B的順序?

顯而易見的,若是A、B兩個事件之間若是是有因果關係的,那麼A必定發生在B以前(來龍去脈,有因纔有果)。相反,在沒有一個絕對的時間的參考的狀況下,若A、B之間沒有因果關係,那麼A、B之間就沒有順序關係。

那麼,咱們在說順序時,其實說的是:

  • 有絕對時間參考的狀況下,事件的發生時間的關係;
  • 和沒有時間參考下的,一種由因果關係推斷出來的happening before的關係;

在分佈式環境中討論順序

當把順序放到分佈式環境(多線程、多進程均可以認爲是一個分佈式的環境)中去討論時:

  • 同一線程上的事件順序是肯定的,能夠認爲他們有相同的時間做爲參考
  • 不一樣線程間的順序只能經過因果關係去推斷

(點表示事件,波浪線箭頭表示事件間的消息)

上圖中,進程P中的事件順序爲p1->p2->p3->p4(時間推斷)。而由於p1給進程Q的q2發了消息,那麼p1必定在q2以前(因果推斷)。可是沒法肯定p1和q1之間的順序關係。

推薦閱讀《Time, Clocks, and the Ordering of Events in a Distributed System》,會透徹的分析分佈式系統中的順序問題。

消息中間件中的順序消息

什麼是順序消息

有了上述的基礎以後,咱們回到本篇文章的主題中,聊一聊消息中間件中的順序消息。

順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發佈和消費的消息類型。順序消息由兩個部分組成:順序發佈和順序消費。

順序消息包含兩種類型:

分區順序:一個Partition內全部的消息按照先進先出的順序進行發佈和消費

全局順序:一個Topic內全部的消息按照先進先出的順序進行發佈和消費

這是阿里雲上對順序消息的定義,把順序消息拆分紅了順序發佈和順序消費。那麼多線程中發送消息算不算順序發佈?

如上一部分介紹的,多線程中若沒有因果關係則沒有順序。那麼用戶在多線程中去發消息就意味着用戶不關心那些在不一樣線程中被髮送的消息的順序。即多線程發送的消息,不一樣線程間的消息不是順序發佈的,同一線程的消息是順序發佈的。這是須要用戶本身去保障的。

而對於順序消費,則須要保證哪些來自同一個發送線程的消息在消費時是按照相同的順序被處理的(爲何不說他們應該在一個線程中被消費呢?)。

全局順序實際上是分區順序的一個特例,即便Topic只有一個分區(如下不在討論全局順序,由於全局順序將面臨性能的問題,並且絕大多數場景都不須要全局順序)。

如何保證順序

在MQ的模型中,順序須要由3個階段去保障:

  1. 消息被髮送時保持順序
  2. 消息被存儲時保持和發送的順序一致
  3. 消息被消費時保持和存儲的順序一致

發送時保持順序意味着對於有順序要求的消息,用戶應該在同一個線程中採用同步的方式發送。存儲保持和發送的順序一致則要求在同一線程中被髮送出來的消息A和B,存儲時在空間上A必定在B以前。而消費保持和存儲一致則要求消息A、B到達Consumer以後必須按照先A後B的順序被處理。

以下圖所示:

對於兩個訂單的消息的原始數據:a一、b一、b二、a二、a三、b3(絕對時間下發生的順序):

  • 在發送時,a訂單的消息須要保持a一、a二、a3的順序,b訂單的消息也相同,可是a、b訂單之間的消息沒有順序關係,這意味着a、b訂單的消息能夠在不一樣的線程中被髮送出去
  • 在存儲時,須要分別保證a、b訂單的消息的順序,可是a、b訂單之間的消息的順序能夠不保證
    • a一、b一、b二、a二、a三、b3是能夠接受的
    • a一、a二、b一、b二、a三、b3也是能夠接受的
    • a一、a三、b一、b二、a二、b3是不能接受的
  • 消費時保證順序的簡單方式就是「什麼都不作」,不對收到的消息的順序進行調整,即只要一個分區的消息只由一個線程處理便可;固然,若是a、b在一個分區中,在收到消息後也能夠將他們拆分到不一樣線程中處理,不過要權衡一下收益

開源RocketMQ中順序的實現

上圖是RocketMQ順序消息原理的介紹,將不一樣訂單的消息路由到不一樣的分區中。文檔只是給出了Producer順序的處理,Consumer消費時經過一個分區只能有一個線程消費的方式來保證消息順序,具體實現以下。

Producer端

Producer端確保消息順序惟一要作的事情就是將消息路由到特定的分區,在RocketMQ中,經過MessageQueueSelector來實現分區的選擇。

  • List<MessageQueue> mqs:消息要發送的Topic下全部的分區
  • Message msg:消息對象
  • 額外的參數:用戶能夠傳遞本身的參數

好比以下實現就能夠保證相同的訂單的消息被路由到相同的分區:

long orderId = ((Order) object).getOrderId;
return mqs.get(orderId % mqs.size());

Consumer端

RocketMQ消費端有兩種類型:MQPullConsumer和MQPushConsumer。

MQPullConsumer由用戶控制線程,主動從服務端獲取消息,每次獲取到的是一個MessageQueue中的消息。PullResult中的List msgFoundList天然和存儲順序一致,用戶須要再拿到這批消息後本身保證消費的順序。

對於PushConsumer,由用戶註冊MessageListener來消費消息,在客戶端中須要保證調用MessageListener時消息的順序性。RocketMQ中的實現以下:

  1. PullMessageService單線程的從Broker獲取消息
  2. PullMessageService將消息添加到ProcessQueue中(ProcessMessage是一個消息的緩存),以後提交一個消費任務到ConsumeMessageOrderService
  3. ConsumeMessageOrderService多線程執行,每一個線程在消費消息時須要拿到MessageQueue的鎖
  4. 拿到鎖以後從ProcessQueue中獲取消息

保證消費順序的核心思想是:

  • 獲取到消息後添加到ProcessQueue中,單線程執行,因此ProcessQueue中的消息是順序的
  • 提交的消費任務時提交的是「對某個MQ進行一次消費」,此次消費請求是從ProcessQueue中獲取消息消費,因此也是順序的(不管哪一個線程獲取到鎖,都是按照ProcessQueue中消息的順序進行消費)

順序和異常的關係

順序消息須要Producer和Consumer都保證順序。Producer須要保證消息被路由到正確的分區,消息須要保證每一個分區的數據只有一個線程消息,那麼就會有一些缺陷:

  • 發送順序消息沒法利用集羣的Failover特性,由於不能更換MessageQueue進行重試
  • 由於發送的路由策略致使的熱點問題,可能某一些MessageQueue的數據量特別大
  • 消費的並行讀依賴於分區數量
  • 消費失敗時沒法跳過

不能更換MessageQueue重試就須要MessageQueue有本身的副本,經過Raft、Paxos之類的算法保證有可用的副本,或者經過其餘高可用的存儲設備來存儲MessageQueue。

熱點問題好像沒有什麼好的解決辦法,只能經過拆分MessageQueue和優化路由方法來儘可能均衡的將消息分配到不一樣的MessageQueue。

消費並行度理論上不會有太大問題,由於MessageQueue的數量能夠調整。

消費失敗的沒法跳過是不可避免的,由於跳過可能致使後續的數據處理都是錯誤的。不過能夠提供一些策略,由用戶根據錯誤類型來決定是否跳過,而且提供重試隊列之類的功能,在跳過以後用戶能夠在「其餘」地方從新消費到這條消息。

<br/>

鳴謝

感謝極客時間所屬的《消息隊列高手課連接

<br/>

最後

本篇是一篇大合集,中間確定參考了許多其餘人的文章內容或圖片,但因爲時間比較久遠,當時並無一一記錄,爲此表示歉意,若是有做者發現了本身的文章或圖片,能夠私聊我,我會進行補充。

若是你發現寫的還不錯,能夠搜索公衆號「是Kerwin啊」,一塊兒進步!

也能夠查看Kerwin的GitHub主頁

相關文章
相關標籤/搜索