彥幀 關注html
2.0 2018.08.05 15:48* 字數 3451 閱讀 22742評論 4喜歡 34linux
消息隊列做爲高併發系統的核心組件之一,可以幫助業務系統解構提高開發效率和系統穩定性。主要具備如下優點:sql
目前主流的MQ主要是Rocketmq、kafka、Rabbitmq,Rocketmq相比於Rabbitmq、kafka具備主要優點特性有:
• 支持事務型消息(消息發送和DB操做保持兩方的最終一致性,rabbitmq和kafka不支持)
• 支持結合rocketmq的多個系統之間數據最終一致性(多方事務,二方事務是前提)
• 支持18個級別的延遲消息(rabbitmq和kafka不支持)
• 支持指定次數和時間間隔的失敗消息重發(kafka不支持,rabbitmq須要手動確認)
• 支持consumer端tag過濾,減小沒必要要的網絡傳輸(rabbitmq和kafka不支持)
• 支持重複消費(rabbitmq不支持,kafka支持)數據庫
Rocketmq、kafka、Rabbitmq的詳細對比,請參照下表格:segmentfault
image.png服務器
image.png網絡
Name Server是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。併發
Broker部署相對複雜,Broker分爲Master與Slave,一個Master能夠對應多個Slave,可是一個Slave只能對應一個Master,Master與Slave的對應關係經過指定相同的Broker Name,不一樣的Broker Id來定義,BrokerId爲0表示Master,非0表示Slave。Master也能夠部署多個。異步
每一個Broker與Name Server集羣中的全部節點創建長鏈接,定時(每隔30s)註冊Topic信息到全部Name Server。Name Server定時(每隔10s)掃描全部存活broker的鏈接,若是Name Server超過2分鐘沒有收到心跳,則Name Server斷開與Broker的鏈接。分佈式
Producer與Name Server集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master創建長鏈接,且定時向Master發送心跳。Producer徹底無狀態,可集羣部署。
Producer每隔30s(由ClientConfig的pollNameServerInterval)從Name server獲取全部topic隊列的最新狀況,這意味着若是Broker不可用,Producer最多30s可以感知,在此期間內發往Broker的全部消息都會失敗。
Producer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向全部關聯的broker發送心跳,Broker每隔10s中掃描全部存活的鏈接,若是Broker在2分鐘內沒有收到心跳數據,則關閉與Producer的鏈接。
Consumer與Name Server集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer既能夠從Master訂閱消息,也能夠從Slave訂閱消息,訂閱規則由Broker配置決定。
Consumer每隔30s從Name server獲取topic的最新隊列狀況,這意味着Broker不可用時,Consumer最多最須要30s才能感知。
Consumer每隔30s(由ClientConfig中heartbeatBrokerInterval決定)向全部關聯的broker發送心跳,Broker每隔10s掃描全部存活的鏈接,若某個鏈接2分鐘內沒有發送心跳數據,則關閉鏈接;並向該Consumer Group的全部Consumer發出通知,Group內的Consumer從新分配隊列,而後繼續消費。
當Consumer獲得master宕機通知後,轉向slave消費,slave不能保證master的消息100%都同步過來了,所以會有少許的消息丟失。可是一旦master恢復,未同步過去的消息會被最終消費掉。
消費者對列是消費者鏈接以後(或者以前有鏈接過)才建立的。咱們將原生的消費者標識由 {IP}@{消費者group}擴展爲 {IP}@{消費者group}{topic}{tag},(例如xxx.xxx.xxx.xxx@mqtest_producer-group_2m2sTest_tag-zyk)。任何一個元素不一樣,都認爲是不一樣的消費端,每一個消費端會擁有一份本身消費對列(默認是broker對列數量*broker數量)。新掛載的消費者對列中擁有commitlog中的全部數據。
A(存在DB操做)、B(存在DB操做)兩方須要保證分佈式事務一致性,經過引入中間層MQ,A和MQ保持事務一致性(異常狀況下經過MQ反查A接口實現check),B和MQ保證事務一致(經過重試),從而達到最終事務一致性。
原理:大事務 = 小事務 + 異步
流程圖
image.png
上圖是RocketMQ提供的保證MQ消息、DB事務一致性的方案。
MQ消息、DB操做一致性方案:
1)發送消息到MQ服務器,此時消息狀態爲SEND_OK。此消息爲consumer不可見。
2)執行DB操做;DB執行成功Commit DB操做,DB執行失敗Rollback DB操做。
3)若是DB執行成功,回覆MQ服務器,將狀態爲COMMIT_MESSAGE;若是DB執行失敗,回覆MQ服務器,將狀態改成ROLLBACK_MESSAGE。注意此過程有可能失敗。
4)MQ內部提供一個名爲「事務狀態服務」的服務,此服務會檢查事務消息的狀態,若是發現消息未COMMIT,則經過Producer啓動時註冊的TransactionCheckListener來回調業務系統,業務系統在checkLocalTransactionState方法中檢查DB事務狀態,若是成功,則回覆COMMIT_MESSAGE,不然回覆ROLLBACK_MESSAGE。
說明:
上面以DB爲例,其實此處能夠是任何業務或者數據源。
以上SEND_OK、COMMIT_MESSAGE、ROLLBACK_MESSAGE均是client jar提供的狀態,在MQ服務器內部是一個數字。
TransactionCheckListener 是在消息的commit或者rollback消息丟失的狀況下才會回調(上圖中灰色部分)。這種消息丟失只存在於斷網或者rocketmq集羣掛了的狀況下。當rocketmq集羣掛了,若是採用異步刷盤,存在1s內數據丟失風險,異步刷盤場景下保障事務沒有意義。因此若是要核心業務用Rocketmq解決分佈式事務問題,建議選擇同步刷盤模式。
image.png
當須要保證多方(超過2方)的分佈式一致性,上面的兩方事務一致性(經過Rocketmq的事務性消息解決)已經沒法支持。這個時候須要引入TCC模式思想(Try-Confirm-Cancel,不清楚的自行百度)。
以上圖交易系統爲例:
1)交易系統建立訂單(往DB插入一條記錄),同時發送訂單建立消息。經過RocketMq事務性消息保證一致性
2)接着執行完成訂單所需的同步核心RPC服務(非核心的系統經過監聽MQ消息自行處理,處理結果不會影響交易狀態)。執行成功更改訂單狀態,同時發送MQ消息。
3)交易系統接受本身發送的訂單建立消息,經過定時調度系統建立延時回滾任務(或者使用RocketMq的重試功能,設置第二次發送時間爲定時任務的延遲建立時間。在非消息堵塞的狀況下,消息第一次到達延遲爲1ms左右,這時可能RPC還未執行完,訂單狀態還未設置爲完成,第二次消費時間能夠指定)。延遲任務先經過查詢訂單狀態判斷訂單是否完成,完成則不建立回滾任務,不然建立。 PS:多個RPC能夠建立一個回滾任務,經過一個消費組接受一次消息就能夠;也能夠經過建立多個消費組,一個消息消費屢次,每次消費建立一個RPC的回滾任務。 回滾任務失敗,經過MQ的重發來重試。
以上是交易系統和其餘系統之間保持最終一致性的解決方案。
1) 單機環境下的事務示意圖
以下爲A給B轉帳的例子。
步驟 | 動做 |
---|---|
1 | 鎖定A的帳戶 |
2 | 鎖定B的帳戶 |
3 | 檢查A帳戶是否有1元 |
4 | A的帳戶扣減1元 |
5 | 給B的帳戶加1元 |
6 | 解鎖B的帳戶 |
7 | 解鎖A的帳戶 |
以上過程在代碼層面甚至能夠簡化到在一個事物中執行兩條sql語句。
2) 分佈式環境下事務
和單機事務不一樣,A、B帳戶可能不在同一個DB中,此時沒法像在單機狀況下使用事物來實現。此時能夠經過一下方式實現,將轉帳操做分紅兩個操做。
a) A帳戶
步驟 | 動做 |
---|---|
1 | 鎖定A的帳戶 |
2 | 檢查A帳戶是否有1元 |
3 | A的帳戶扣減1元 |
4 | 解鎖A的帳戶 |
b) MQ消息
A帳戶數據發生變化時,發送MQ消息,MQ服務器將消息推送給轉帳系統,轉帳系統來給B帳號加錢。
c) B帳戶
步驟 | 動做 |
---|---|
1 | 鎖定B的帳戶 |
2 | 給B的帳戶加1元 |
3 | 解鎖B的帳戶 |
發送順序消息沒法利用集羣Fail Over特性消費順序消息的並行度依賴於隊列數量隊列熱點問題,個別隊列因爲哈希不均致使消息過多,消費速度跟不上,產生消息堆積問題遇到消息失敗的消息,沒法跳過,當前隊列消費暫停。
produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者註冊消息監聽器爲MessageListenerOrderly,這樣就能夠保證消費端只有一個線程去消費消息。
注意:把消息發到同一個隊列(queue),不是同一個topic,默認狀況下一個topic包括4個queue
能夠經過實現發送消息的對列選擇器方法,實現部分順序消息。
舉例:好比一個數據庫經過MQ來同步,只須要保證每一個表的數據是同步的就能夠。解析binlog,將表名做爲對列選擇器的參數,這樣就能夠保證每一個表的數據到同一個對列裏面,從而保證表數據的順序消費
一個應用盡量用一個Topic,消息子類型用tags來標識,tags能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用tags 在broker作消息過濾。
每一個消息在業務層面的惟一標識碼,要設置到 keys 字段,方便未來定位消息丟失問題。服務器會爲每一個消息建立索引(哈希索引),應用能夠經過 topic,key來查詢這條消息內容,以及消息被誰消費。因爲是哈希索引,請務必保證key 儘量惟一,這樣能夠避免潛在的哈希衝突。
//訂單Id
String orderId= "20034568923546";
message.setKeys(orderId);
消息發送成功或者失敗,要打印消息日誌,務必要打印 send result 和key 字段。
send消息方法,只要不拋異常,就表明發送成功。可是發送成功會有多個狀態,在sendResult裏定義。
SEND_OK:消息發送成功
FLUSH_DISK_TIMEOUT:消息發送成功,可是服務器刷盤超時,消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失
FLUSH_SLAVE_TIMEOUT:消息發送成功,可是服務器同步到Slave時超時,消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失
SLAVE_NOT_AVAILABLE:消息發送成功,可是此時slave不可用,消息已經進入服務器隊列,只有此時服務器宕機,消息纔會丟失
RocketMQ使用的消息原語是At Least Once,因此consumer可能屢次收到同一個消息,此時務必作好冪等。
消費時記錄日誌,以便後續定位問題。
儘可能使用批量方式消費方式,能夠很大程度上提升消費吞吐量。
RocketMQ_design.pdf
RocketMQ_experience.pdf
分佈式開放消息系統(RocketMQ)的原理與實踐
http://www.jianshu.com/p/453c6e7ff81c
RocketMQ事務消費和順序消費詳解
http://www.cnblogs.com/520playboy/p/6750023.html
ZeroCopy
http://www.linuxjournal.com/article/6345
IO方式的性能數據