消息中間件學習-摘抄

Rocketmq原理&最佳實踐

96 彥幀 關注html

 2.0 2018.08.05 15:48* 字數 3451 閱讀 22742評論 4喜歡 34linux

1、 MQ背景&選型

消息隊列做爲高併發系統的核心組件之一,可以幫助業務系統解構提高開發效率和系統穩定性。主要具備如下優點:sql

  • 削峯填谷(主要解決瞬時寫壓力大於應用服務能力致使消息丟失、系統奔潰等問題)
  • 系統解耦(解決不一樣重要程度、不一樣能力級別系統之間依賴致使一死全死)
  • 提高性能(當存在一對多調用時,能夠發一條消息給消息系統,讓消息系統通知相關係統)
  • 蓄流壓測(線上有些鏈路很差壓測,能夠經過堆積必定量消息再放開來壓測)

目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比於Rabbitmq、kafka具備主要優點特性有:
• 支持事務型消息(消息發送和DB操做保持兩方的最終一致性,rabbitmq和kafka不支持)
• 支持結合rocketmq的多個系統之間數據最終一致性(多方事務,二方事務是前提)
• 支持18個級別的延遲消息(rabbitmq和kafka不支持)
• 支持指定次數和時間間隔的失敗消息重發(kafka不支持,rabbitmq須要手動確認)
• 支持consumer端tag過濾,減小沒必要要的網絡傳輸(rabbitmq和kafka不支持)
• 支持重複消費(rabbitmq不支持,kafka支持)數據庫

Rocketmq、kafka、Rabbitmq的詳細對比,請參照下表格:segmentfault

image.png服務器

2、RocketMQ集羣概述

1. RocketMQ集羣部署結構

image.png網絡

1) Name Server

Name Server是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。併發

2) Broker

Broker部署相對複雜,Broker分爲Master與Slave,一個Master能夠對應多個Slave,可是一個Slave只能對應一個Master,Master與Slave的對應關係經過指定相同的Broker Name,不一樣的Broker Id來定義,BrokerId爲0表示Master,非0表示Slave。Master也能夠部署多個。異步

每一個Broker與Name Server集羣中的全部節點創建長鏈接,定時(每隔30s)註冊Topic信息到全部Name Server。Name Server定時(每隔10s)掃描全部存活broker的鏈接,若是Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的鏈接。分佈式

3) Producer

Producer與Name Server集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master創建長鏈接,且定時向Master發送心跳。Producer徹底無狀態,可集羣部署。

Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取全部topic隊列的最新狀況,這意味着若是Broker不可用,Producer最多30s可以感知,在此期間內發往Broker的全部消息都會失敗。

Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向全部關聯的broker發送心跳,Broker每隔10s中掃描全部存活的鏈接,若是Broker在2分鐘內沒有收到心跳數據,則關閉與Producer的鏈接。

4) Consumer

Consumer與Name Server集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer既能夠從Master訂閱消息,也能夠從Slave訂閱消息,訂閱規則由Broker配置決定。

Consumer每隔30s從Name server獲取topic的最新隊列狀況,這意味着Broker不可用時,Consumer最多最須要30s才能感知。

Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向全部關聯的broker發送心跳,Broker每隔10s掃描全部存活的鏈接,若某個鏈接2分鐘內沒有發送心跳數據,則關閉鏈接;並向該Consumer Group的全部Consumer發出通知,Group內的Consumer從新分配隊列,而後繼續消費。

當Consumer獲得master宕機通知後,轉向slave消費,slave不能保證master的消息100%都同步過來了,所以會有少許的消息丟失。可是一旦master恢復,未同步過去的消息會被最終消費掉。

消費者對列是消費者鏈接以後(或者以前有鏈接過)才建立的。咱們將原生的消費者標識由 {IP}@{消費者group}擴展爲 {IP}@{消費者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一個元素不一樣,都認爲是不一樣的消費端,每一個消費端會擁有一份本身消費對列(默認是broker對列數量*broker數量)。新掛載的消費者對列中擁有commitlog中的全部數據。

若是有須要,能夠查看Rocketmq更多源碼解析

3、 Rocketmq如何支持分佈式事務消息

場景

A(存在DB操做)、B(存在DB操做)兩方須要保證分佈式事務一致性,經過引入中間層MQ,A和MQ保持事務一致性(異常狀況下經過MQ反查A接口實現check),B和MQ保證事務一致(經過重試),從而達到最終事務一致性。

原理:大事務 = 小事務 + 異步

1. MQ與DB一致性原理(兩方事務)

流程圖

image.png

上圖是RocketMQ提供的保證MQ消息、DB事務一致性的方案。

MQ消息、DB操做一致性方案:

1)發送消息到MQ服務器,此時消息狀態爲SEND_OK。此消息爲consumer不可見。

2)執行DB操做;DB執行成功Commit DB操做,DB執行失敗Rollback DB操做。

3)若是DB執行成功,回覆MQ服務器,將狀態爲COMMIT_MESSAGE;若是DB執行失敗,回覆MQ服務器,將狀態改成ROLLBACK_MESSAGE。注意此過程有可能失敗。

4)MQ內部提供一個名爲「事務狀態服務」的服務,此服務會檢查事務消息的狀態,若是發現消息未COMMIT,則經過Producer啓動時註冊的TransactionCheckListener來回調業務系統,業務系統在checkLocalTransactionState方法中檢查DB事務狀態,若是成功,則回覆COMMIT_MESSAGE,不然回覆ROLLBACK_MESSAGE。

說明:

上面以DB爲例,其實此處能夠是任何業務或者數據源。

以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的狀態,在MQ服務器內部是一個數字。

TransactionCheckListener 是在消息的commit或者rollback消息丟失的狀況下才會回調(上圖中灰色部分)。這種消息丟失只存在於斷網或者rocketmq集羣掛了的狀況下。當rocketmq集羣掛了,若是採用異步刷盤,存在1s內數據丟失風險,異步刷盤場景下保障事務沒有意義。因此若是要核心業務用Rocketmq解決分佈式事務問題,建議選擇同步刷盤模式。

2. 多系統之間數據一致性(多方事務)

image.png

當須要保證多方(超過2方)的分佈式一致性,上面的兩方事務一致性(經過Rocketmq的事務性消息解決)已經沒法支持。這個時候須要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。

以上圖交易系統爲例:

1)交易系統建立訂單(往DB插入一條記錄),同時發送訂單建立消息。經過RocketMq事務性消息保證一致性

2)接着執行完成訂單所需的同步核心RPC服務(非核心的系統經過監聽MQ消息自行處理,處理結果不會影響交易狀態)。執行成功更改訂單狀態,同時發送MQ消息。

3)交易系統接受本身發送的訂單建立消息,經過定時調度系統建立延時回滾任務(或者使用RocketMq的重試功能,設置第二次發送時間爲定時任務的延遲建立時間。在非消息堵塞的狀況下,消息第一次到達延遲爲1ms左右,這時可能RPC還未執行完,訂單狀態還未設置爲完成,第二次消費時間能夠指定)。延遲任務先經過查詢訂單狀態判斷訂單是否完成,完成則不建立回滾任務,不然建立。 PS:多個RPC能夠建立一個回滾任務,經過一個消費組接受一次消息就能夠;也能夠經過建立多個消費組,一個消息消費屢次,每次消費建立一個RPC的回滾任務。 回滾任務失敗,經過MQ的重發來重試。

以上是交易系統和其餘系統之間保持最終一致性的解決方案。

3.案例分析

1) 單機環境下的事務示意圖

以下爲A給B轉帳的例子。

步驟 動做
1 鎖定A的帳戶
2 鎖定B的帳戶
3 檢查A帳戶是否有1元
4 A的帳戶扣減1元
5 給B的帳戶加1元
6 解鎖B的帳戶
7 解鎖A的帳戶

以上過程在代碼層面甚至能夠簡化到在一個事物中執行兩條sql語句。

2) 分佈式環境下事務

和單機事務不一樣,A、B帳戶可能不在同一個DB中,此時沒法像在單機狀況下使用事物來實現。此時能夠經過一下方式實現,將轉帳操做分紅兩個操做。

a) A帳戶

步驟 動做
1 鎖定A的帳戶
2 檢查A帳戶是否有1元
3 A的帳戶扣減1元
4 解鎖A的帳戶

b) MQ消息
A帳戶數據發生變化時,發送MQ消息,MQ服務器將消息推送給轉帳系統,轉帳系統來給B帳號加錢。

c) B帳戶

步驟 動做
1 鎖定B的帳戶
2 給B的帳戶加1元
3 解鎖B的帳戶

4、 順序消息

1. 順序消息缺陷

發送順序消息沒法利用集羣Fail Over特性消費順序消息的並行度依賴於隊列數量隊列熱點問題,個別隊列因爲哈希不均致使消息過多,消費速度跟不上,產生消息堆積問題遇到消息失敗的消息,沒法跳過,當前隊列消費暫停。

2. 原理

produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者註冊消息監聽器爲MessageListenerOrderly,這樣就能夠保證消費端只有一個線程去消費消息。

注意:把消息發到同一個隊列(queue),不是同一個topic,默認狀況下一個topic包括4個queue

3. 擴展

能夠經過實現發送消息的對列選擇器方法,實現部分順序消息。

舉例:好比一個數據庫經過MQ來同步,只須要保證每一個表的數據是同步的就能夠。解析binlog,將表名做爲對列選擇器的參數,這樣就能夠保證每一個表的數據到同一個對列裏面,從而保證表數據的順序消費

5、 最佳實踐

1. Producer

1) Topic

一個應用盡量用一個Topic,消息子類型用tags來標識,tags能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用tags 在broker作消息過濾。

2) key

每一個消息在業務層面的惟一標識碼,要設置到 keys 字段,方便未來定位消息丟失問題。服務器會爲每一個消息建立索引(哈希索引),應用能夠經過 topic,key來查詢這條消息內容,以及消息被誰消費。因爲是哈希索引,請務必保證key 儘量惟一,這樣能夠避免潛在的哈希衝突。

//訂單Id

String orderId= "20034568923546";

message.setKeys(orderId);

3) 日誌

消息發送成功或者失敗,要打印消息日誌,務必要打印 send result 和key 字段。

4) send

send消息方法,只要不拋異常,就表明發送成功。可是發送成功會有多個狀態,在sendResult裏定義。

SEND_OK:消息發送成功

FLUSH_DISK_TIMEOUT:消息發送成功,可是服務器刷盤超時,消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失

FLUSH_SLAVE_TIMEOUT:消息發送成功,可是服務器同步到Slave時超時,消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失

SLAVE_NOT_AVAILABLE:消息發送成功,可是此時slave不可用,消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失

2. Consumer

1) 冪等

RocketMQ使用的消息原語是At Least Once,因此consumer可能屢次收到同一個消息,此時務必作好冪等。

2) 日誌

消費時記錄日誌,以便後續定位問題。

3) 批量消費

儘可能使用批量方式消費方式,能夠很大程度上提升消費吞吐量。

6、 參考資料

1. 文檔

RocketMQ_design.pdf
RocketMQ_experience.pdf

2. 博客

分佈式開放消息系統(RocketMQ)的原理與實踐

http://www.jianshu.com/p/453c6e7ff81c

RocketMQ事務消費和順序消費詳解

http://www.cnblogs.com/520playboy/p/6750023.html

ZeroCopy

http://www.linuxjournal.com/article/6345

IO方式的性能數據

http://stblog.baidu-tech.com/?p=851

相關文章
相關標籤/搜索