地址:[https://www.apache.org/dyn/cl...]java
選擇‘Binary’進行下載git
解壓已下載工程github
新增系統變量
ROCKETMQ_HOME -> F:RocketMQrocketmq-4.5.2JAVA_HOME -> F:Java_JDKJDK1.8算法
Path 系統變量新增:Maven/bin目錄數據庫
PS:RocketMQ 消息存儲在C:UsersAdministratorstore store目錄中
文件佔用較大,注意刪除沒必要要的內容
apache
start mqnamesrv.cmdstart mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true緩存
server.port=8100服務器
rocketmq.config.namesrvAddr=127.0.0.1:9876網絡
執行‘mvn clean package -Dmaven.test.skip=true’,編譯生成target數據結構
java -jar rocketmq-console-ng-1.0.1.jar
Rocket可視化監控插件 增長Topic | 自動增長Topic(4.5.2版本)
4.5.2 版本 支持自動建立Topic4.3.0 版本 必須經過監控程序配置Topic,不然執行程序報錯,沒有此路由
<!--RocketMQ--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.5.2</version> </dependency>
<br/>
基於RocketMQ的分佈式系統,通常能夠分爲四個集羣:Name server、broker、producer、consumer
name server
broker
producer
consumer
<br/>
<br/>
提供Broker管理;Routing管理(路由管理)
NameServer,不少時候稱爲命名發現服務,其在RocketMQ中起着中轉承接的做用,是一個無狀態的服務,多個NameServer之間不通訊。任何Producer,Consumer,Broker與全部NameServer通訊,向NameServer請求或者發送數據。並且都是單向的,Producer和Consumer請求數據,Broker發送數據。正是由於這種單向的通訊,RocketMQ水平擴容變得很容易
總結:相比於ZooKeeper提供的分佈式鎖,發佈和訂閱,數據一致性,選舉等,在RocketMQ是不適用的,所以重寫了一套更加輕量級的發現服務,主要用以存儲 Broker相關信息以及當前Broker上的topic信息,路由信息等
提供Remoting Module、客戶端管理、存儲服務、HA服務(主從)、索引服務
<br/>
<br/>
生產者發送業務系統產生的消息給broker, RocketMQ提供了多種發送方式:同步的、異步的、單向的
<br/>
具備相同角色的生產者被分到一組, 假如原始的生產者在事務後崩潰,broker會聯繫 同一輩子產者組中的不一樣生產者實例,繼續提交或回滾事務
<br/>
一個消費者從broker拉取信息,並將信息返還給應用。爲了咱們應用的正確性,提供了兩種消費者類型:
拉式消費者:拉式消費者從broker拉取消息,一旦一批消息被拉取,用戶應用系統將發起消費過程。
推式消費者:推式消費者,從另外一方面講,囊括了消息的拉取、消費過程,並保持了內部的其餘工做,留下了一個回調 接口給終端用戶去實現,實如今消息到達時要執行的內容。
<br/>
具備相同角色的消費者被組在一塊兒,稱爲消費者組,它完成了負載均衡和容錯的目標
一個消費組中的消費者實例必須有肯定的相同的訂閱topic
<br/>
Topic是一個消息的目錄,在這個目錄中,生產者傳送消息,消費者拉取消息,能夠多個消費者訂閱同一個topic,一個生產者也能夠發送多個topic
PS:RocketMQ 基於發佈訂閱模式,發佈訂閱的核心即 Topic 主題
<br/>
消息是被傳遞的信息。一個消息必須有一個Topic,它能夠理解爲信件上的地址。一個消息也能夠有一個可選的tag,和額外的key-value對。 例如:你能夠設置業務中的鍵到你的消息中,在broker服務中查找消息,以便在開發期間診斷問題
<br/>
Topic被分割成一個或多個消息隊列。隊列分爲3中角色:異步主、同步主、從。若是你不能容忍消息丟失,咱們建議你部署同步主,並加一個從隊列。 若是你容忍丟失,但你但願隊列老是可用,你能夠部署異步主和從隊列。若是你想最簡單,你只須要一個異步主,不須要從隊列。 消息保存磁盤的方式也有兩種,推薦使用的是異步保存,同步保存是昂貴的並會致使性能損失,若是你想要可靠性,咱們推薦你使用同步主+從的方式。
<br/>
標籤,用另一個詞來講,就是子主題,爲用戶提供額外的靈活性。具備相同Topic的消息能夠有不一樣的tag。
<br/>
Broker是RocketMQ的一個主要組件,它接收生產者發送的消息,存儲它們並準備處理消費者的拉取請求。它也存儲消息相關的元數據, 包括消費組,消費成功的偏移量,主題、隊列的信息。
<br/>
名稱服務主要提供路由信息。生產者/消費者客戶端尋找topic,並找到通訊的隊列列表。
<br/>
當DefaultMQPushConsumer
被使用,你就要決定消費消息時,是順序消費仍是同時消費
順序消費消息的意思是 消息將按照生產者發送到隊列時的順序被消費掉。若是你被強制要求使用全局的順序,你要確保你的topic只有一個消息隊列。
若是指定順序消費,消息被同時消費的數量就是訂閱這個topic的消費組的數量。
當同時消費消息時,消息同時消費的最大數量取決於消費客戶端指定的線程池的大小。
<br/>
消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。
<br/>
解決的核心問題主要是:異步、解耦、削峯
可是引入消息隊列也會有不少額外的問題,好比系統複雜性會大大增長,同時須要解決重複下發,重複消費,消費順序,消息丟失,重試機制等等問題,所以不能濫用,合適的場景用合適的技術
<br/>
1、消息隊列的演進
一、初始階段
最初的消息隊列,就是一個嚴格意義上的隊列。隊列是一種數據結構,先進先出,在消息入隊出隊過程當中,保證這些消息嚴格有序。早期的消息隊列就是按照「隊列」的數據結構設計的。
隊列模型:
生產者(Producer)發消息就是入隊操做,消費者(Consumer)收消息就是出隊也就是刪除操做,服務端存放消息的容器天然就稱爲「隊列」。
二、發佈 - 訂閱模型階段
若是須要將一份消息數據分發給多個消費者,要求每一個消費者都能收到全量的消息,例如,對於一份訂單數據,風控系統、分析系統、支付系統等都須要接收消息。
這個時候,單個隊列就知足不了需求,一個可行的解決方式是,爲每一個消費者建立一個單獨的隊列,讓生產者發送多份。可是一樣的一份消息數據被複制到多個隊列中會浪費資源,更重要的是,生產者必須知道有多少個消費者。爲每一個消費者單獨發送一份消息,這實際上違背了消息隊列「解耦」這個設計初衷。
爲了解決這個問題,演化出了另一種消息模型:發佈 - 訂閱模型(Publish-Subscribe Pattern)
消息的發送方稱爲發佈者(Publisher),消息的接收方稱爲訂閱者(Subscriber),服務端存放消息的容器稱爲主題(Topic)。
三、總結:
2、RabbitMQ 的消息模型
少數依然堅持使用隊列模型的產品之一。
RabbitMQ 使用 Exchange 模塊解決多個消費者的問題。Exchange 位於生產者和隊列之間,生產者並不關心將消息發送給哪一個隊列,而是將消息發送給 Exchange,由 Exchange 上配置的策略來決定將消息投遞到哪些隊列中。
3、RocketMQ 的消息模型
RocketMQ 使用的消息模型是標準的發佈 - 訂閱模型。在 RocketMQ 也有隊列(Queue)這個概念。
消息隊列的消費機制:
幾乎全部的消息隊列產品都使用一種很是樸素的「請求 - 確認」機制,確保消息不會在傳遞過程當中因爲網絡或服務器故障丟失。
在生產端,生產者先將消息發送給服務端,也就是 Broker,服務端在收到消息並將消息寫入主題或者隊列中後,會給生產者發送確認的響應。若是生產者沒有收到服務端的確認或者收到失敗的響應,則會從新發送消息。
在消費端,消費者在收到消息並完成本身的消費業務邏輯(好比,將數據保存到數據庫中)後,也會給服務端發送消費成功的確認,服務端只有收到消費確認後,才認爲一條消息被成功消費,不然它會給消費者從新發送這條消息,直到收到對應的消費成功確認。
這個確認機制很好地保證了消息傳遞過程當中的可靠性,可是,引入這個機制在消費端帶來了一個問題:爲了確保消息的有序性,在某一條消息被成功消費以前,下一條消息是不能被消費的,也就是說,每一個主題在任意時刻,至多隻能有一個消費者實例在進行消費,那就無法經過水平擴展消費者的數量來提高消費端整體的消費性能。
爲了解決這個問題,RocketMQ 在主題下面增長了隊列的概念:
4、Kafka 的消息模型
Kafka 的消息模型和 RocketMQ 是徹底同樣的,惟一的區別是,在 Kafka 中,隊列這個概念的名稱不同,Kafka 中對應的名稱是「分區(Partition)」,含義和功能是沒有任何區別的。
5、總結
<br/>
首先如何驗證消息是否丟失?
即保證消息消費順序的狀況下,根據消息的序號,在消費段判斷是否連續
解決方案:
消息從生產到消費的過程當中,能夠劃分三個階段:
一、生產階段
消息隊列經過最經常使用的請求確認機制,來保證消息的可靠傳遞:當你代碼調用發消息方法時,消息隊列客戶端會把消息發送到Broker,Broker收到消息後,會給客戶端返回一個確認響應,代表消息已收到。客戶端收到響應後,完成了一次正常消息的發送。
有些消息隊列在長時間沒收到發送確認響應後,會自動重試,若是重試失敗,就會以返回值或者異常的方式告知用戶。在編寫發送消息的代碼時,須要注意,正確處理返回值或者捕獲異常,就能夠保證這個階段的消息不會丟失。
同步發送時,只要注意捕獲異常便可。
異步發送時,則須要在回調方法裏進行檢查。這個地方須要特別注意,不少丟消息的緣由就是,咱們使用了異步發送,卻沒有在回調中檢查發送結果。
二、存儲階段
在存儲階段正常狀況下,只要Broker在正常運行,就不會出現丟消息的問題;可是若是Broker出現故障,好比進程死掉或者服務器宕機,仍是可能會丟失消息的。
若是對消息的可靠性要求很是高,能夠經過配置Broker參數來避免由於宕機丟消息:
三、消息階段
消費階段採用和生產階段相似的確認機制來保證消息的可靠傳遞,客戶端從 Broker 拉取消息後,執行用戶的消費業務邏輯,成功後,纔會給 Broker 發送消費確認響應。若是 Broker 沒有收到消費確認響應,下次拉消息的時候還會返回同一條消息,確保消息不會在網絡傳輸過程當中丟失,也不會由於客戶端在執行消費邏輯中出錯致使丟失。
在編寫消費代碼時須要注意的是:不要在收到消息後就當即發送消費確認,而是應該在執行完全部消費業務邏輯以後,再發送消費確認。
<br/>
在消息傳遞過程當中,若是出現傳遞失敗的狀況,發送方會執行重試,重試過程當中就有可能產生重複的消息。若是沒有對重複消息進行處理,就可能致使系統的數據出現錯誤。
好比,一個消費訂單消息,統計下單金額的微服務,若是沒有正確處理重複消息,那就會出現重複統計,致使統計結果錯誤。
1、消息重複的狀況必然存在
在MQTT協議中,給出了三種傳遞消息時可以提供的服務質量標準:
這個服務質量標準不只適用於 MQTT,對全部的消息隊列都是適用的。經常使用的絕大部分消息隊列提供的服務質量都是 At least once,包括 RocketMQ、RabbitMQ 和 Kafka 。也就是說,消息隊列很難保證消息不重複。
注意:Kafka 支持的「Exactly once」和咱們剛剛提到的消息傳遞的服務質量標準「Exactly once」是不同的,它是 Kafka 提供的另一個特性,Kafka 中支持的事務也和咱們一般意義理解的事務有必定的差別。在 Kafka 中,事務和 Excactly once 主要是爲了配合流計算使用的特性。
2、用冪等性解決重複消息問題
冪等原本是一個數學上的概念,它的定義是:若是一個函數f(x)知足:f(f(x)) = f(x),則函數f(x)知足米冪等性。擴展到計算機領域,被用來描述一個操做、方法或者服務。
舉例:
一、在不考慮併發的狀況下,「將帳戶 X 的餘額設置爲 100 元」,執行一次後對系統的影響是,帳戶 X 的餘額變成了 100 元。只要提供的參數 100 元不變,那即便再執行多少次,帳戶 X 的餘額始終都是 100 元,不會變化,這個操做就是一個冪等的操做。
二、「將帳戶 X 的餘額加 100 元」,這個操做它就不是冪等的,每執行一次,帳戶餘額就會增長 100 元,執行屢次和執行一次對系統的影響(也就是帳戶的餘額)是不同的。
若是消費消息的業務邏輯具有冪等性,那就不用擔憂消息重複的問題,由於同一條消息,消費一次和消費屢次對系統的影響是徹底同樣的。消費屢次等於消費一次。從對系統的影響結果來講:At least once + 冪等消費 = Exactly once。
實現冪等操做最好的方式是,從業務邏輯設計上入手,將消費的業務邏輯設計成具有冪等性的操做。
經常使用的設計冪等操做的方法:
(1)利用數據庫的惟一約束實現冪等
上面提到的那個不具有冪等特性的轉帳的例子:將帳戶 X 的餘額加 100 元。在這個例子中,咱們能夠經過改造業務邏輯,讓它具有冪等性。
首先,咱們能夠限定,對於每一個轉帳單每一個帳戶只能夠執行一次變動操做,在分佈式系統中,這個限制實現的方法很是多,最簡單的是咱們在數據庫中建一張轉帳流水錶,這個表有三個字段:轉帳單 ID、帳戶 ID 和變動金額,而後給轉帳單 ID 和帳戶 ID 這兩個字段聯合起來建立一個惟一約束,這樣對於相同的轉帳單 ID 和帳戶 ID,表裏至多隻能存在一條記錄。
這樣,咱們消費消息的邏輯能夠變爲:「在轉帳流水錶中增長一條轉帳記錄,而後再根據轉帳記錄,異步操做更新用戶餘額便可。」在轉帳流水錶增長一條轉帳記錄這個操做中,因爲咱們在這個表中預先定義了「帳戶 ID 轉帳單 ID」的惟一約束,對於同一個轉帳單同一個帳戶只能插入一條記錄,後續重複的插入操做都會失敗,這樣就實現了一個冪等的操做。
基於這個思路,不光是可使用關係型數據庫,只要是支持相似「INSERT IF NOT EXIST」語義的存儲類系統均可以用於實現冪等,好比,你能夠用 Redis 的 SETNX 命令來替代數據庫中的惟一約束,來實現冪等消費。
(2)爲更新的數據設置前置條件
給數據變動設置一個前置條件,若是知足條件就更新數據,不然拒絕更新數據,在更新數據的時候,同時變動前置條件中須要判斷的數據。這樣,重複執行這個操做時,因爲第一次更新數據的時候已經變動了前置條件中須要判斷的數據,不知足前置條件,則不會重複執行更新數據操做。
好比,「將帳戶 X 的餘額增長 100 元」這個操做並不知足冪等性,咱們能夠把這個操做加上一個前置條件,變爲:「若是帳戶 X 當前的餘額爲 500 元,將餘額加 100 元」,這個操做就具有了冪等性。對應到消息隊列中的使用時,能夠在發消息時在消息體中帶上當前的餘額,在消費的時候進行判斷數據庫中,當前餘額是否與消息中的餘額相等,只有相等才執行變動操做。
可是,若是咱們要更新的數據不是數值,或者咱們要作一個比較複雜的更新操做怎麼辦?用什麼做爲前置判斷條件呢?更加通用的方法是,給你的數據增長一個版本號屬性,每次更數據前,比較當前數據的版本號是否和消息中的版本號一致,若是不一致就拒絕更新數據,更新數據的同時將版本號 +1,同樣能夠實現冪等更新。
(3)記錄並檢查操做
若是上面提到的兩種實現冪等方法都不能適用於你的場景,還有一種通用性最強,適用範圍最廣的實現冪等性方法:記錄並檢查操做,也稱爲「Token 機制或者 GUID(全局惟一 ID)機制」,實現的思路特別簡單:在執行數據更新操做以前,先檢查一下是否執行過這個更新操做。這種方法適用範圍最廣,可是實現難度和複雜度也比較高,通常不推薦使用。
具體的實現方法是,在發送消息時,給每條消息指定一個全局惟一的 ID,消費時,先根據這個 ID 檢查這條消息是否有被消費過,若是沒有消費過,才更新數據,而後將消費狀態置爲已消費。
在分佈式系統中,這個方法實際上是很是難實現的。首先,給每一個消息指定一個全局惟一的 ID 就是一件不那麼簡單的事兒,方法有不少,但都不太好同時知足簡單、高可用和高性能,或多或少都要有些犧牲。更加麻煩的是,在「檢查消費狀態,而後更新數據而且設置消費狀態」中,三個操做必須做爲一組操做保證原子性,才能真正實現冪等,不然就會出現 Bug。
好比說,對於同一條消息:「全局 ID 爲 8,操做爲:給 ID 爲 666 帳戶增長 100 元」,有可能出現這樣的狀況:
這樣就會致使帳戶被錯誤地增長了兩次 100 元,這是一個在分佈式系統中很是容易犯的錯誤,必定要引覺得戒。對於這個問題,固然咱們能夠用事務來實現,也能夠用鎖來實現,可是在分佈式系統中,不管是分佈式事務仍是分佈式鎖都是比較難解決問題。
<br/>
1、消息事務
其實不少場景下,咱們「發消息」這個過程,目的每每是通知另一個系統或者模塊去更新數據,消息隊列中的「事務」,主要解決消息生產者和消息消費者的數據一致性問題。
用戶在電商APP上購物時,先把商品加到購物車裏,而後幾件商品一塊兒下單,最後支付,完成購物流程。
這個過程當中有一個須要用到消息隊列的步驟,訂單系統建立訂單後,發消息給購物車系統,將已下單的商品從購物車中刪除。由於從購物車刪除已下單商品這個步驟,並非用戶下單支付這個主要流程中必要的步驟,使用消息隊列來異步清理購物車是更加合理。
對於訂單系統,它建立訂單的過程實際執行了2個步驟的操做:
對於購物車系統:
在分佈式系統中,上面提到的步驟,任何一個都有可能失敗,若是不作任何處理,那就有可能出現訂單數據與購物車數據不一致的狀況,好比:
因此咱們須要解決的問題爲:在上述任意步驟都有可能失敗的狀況下,還要保證訂單庫和購物車庫這兩個庫的數據一致性。
2、分佈式事務
分佈式事務就是要在分佈式系統中實現事務。在分佈式系統中,在保證可用性和不嚴重犧牲性能的前提下,光是要實現數據的一致性就已經很是困難了,顯然實現嚴格的分佈式事務是更加不可能完成的任務。因此目前你們所說的分佈式事務,更多狀況下,是在分佈式系統中事務的不完整實現,在不一樣的應用場景中,有不一樣的實現,目的都是經過一些妥協來解決實際問題。
常見的分佈式事務實現:
每一種實現都有其特定的使用場景,也有各自的問題,都不是完美的解決方案。
事務消息適用的場景主要是那些須要異步更新數據,而且對數據實時性要求不過高的場景。好比在建立訂單後,若是出現短暫的幾秒,購物車裏的商品沒有被及時狀況,也不是徹底不可接受的,只要最終購物車的數據和訂單數據保持一致就可。
3、消息隊列實現分佈式事務
事務消息須要消息隊列提供相應的功能才能實現,kafka和RocketMQ都提供了事務相關功能。
對於訂單系統:
對於購物車系統:
若是在第四步提交事務消息時失敗了怎麼辦?Kafka 和 RocketMQ 給出了 2 種不一樣的解決方案:
一、Kafka 的解決方案:
直接拋出異常,讓用戶自行處理。咱們能夠在業務代碼中反覆重試提交,直到提交成功,或者刪除以前建立的訂單進行補償。
二、RocketMQ 的解決方案:
在 RocketMQ 中的事務實現中,增長了事務反查的機制來解決事務消息提交失敗的問題。若是 Producer 也就是訂單系統,在提交或者回滾事務消息時發生網絡異常,RocketMQ 的 Broker 沒有收到提交或者回滾的請求,Broker 會按期去 Producer 上反查這個事務對應的本地事務的狀態,而後根據反查結果決定提交或者回滾這個事務。爲了支撐這個事務反查機制,咱們的業務代碼須要實現一個反查本地事務狀態的接口,告知 RocketMQ 本地事務是成功仍是失敗。
綜合上面講的通用事務消息的實現和 RocketMQ 的事務反查機制,使用 RocketMQ 事務消息功能實現分佈式事務的流程以下圖:
<br/>
當咱們說順序時,咱們在說什麼?
平常思惟中,順序大部分狀況會和時間關聯起來,即時間的前後表示事件的順序關係。
好比事件A發生在下午3點一刻,而事件B發生在下午4點,那麼咱們認爲事件A發生在事件B以前,他們的順序關係爲先A後B。
上面的例子之因此成立是由於他們有相同的參考系,即他們的時間是對應的同一個物理時鐘的時間。若是A發生的時間是北京時間,而B依賴的時間是東京時間,那麼先A後B的順序關係還成立嗎?
若是沒有一個絕對的時間參考,那麼A和B之間還有順序嗎,或者說怎麼判定A和B的順序?
顯而易見的,若是A、B兩個事件之間若是是有因果關係的,那麼A必定發生在B以前(來龍去脈,有因纔有果)。相反,在沒有一個絕對的時間的參考的狀況下,若A、B之間沒有因果關係,那麼A、B之間就沒有順序關係。
那麼,咱們在說順序時,其實說的是:
在分佈式環境中討論順序
當把順序放到分佈式環境(多線程、多進程均可以認爲是一個分佈式的環境)中去討論時:
(點表示事件,波浪線箭頭表示事件間的消息)
上圖中,進程P中的事件順序爲p1->p2->p3->p4(時間推斷)。而由於p1給進程Q的q2發了消息,那麼p1必定在q2以前(因果推斷)。可是沒法肯定p1和q1之間的順序關係。
推薦閱讀《Time, Clocks, and the Ordering of Events in a Distributed System》,會透徹的分析分佈式系統中的順序問題。
消息中間件中的順序消息
什麼是順序消息
有了上述的基礎以後,咱們回到本篇文章的主題中,聊一聊消息中間件中的順序消息。
順序消息(FIFO 消息)是 MQ 提供的一種嚴格按照順序進行發佈和消費的消息類型。順序消息由兩個部分組成:順序發佈和順序消費。順序消息包含兩種類型:
分區順序:一個Partition內全部的消息按照先進先出的順序進行發佈和消費
全局順序:一個Topic內全部的消息按照先進先出的順序進行發佈和消費
這是阿里雲上對順序消息的定義,把順序消息拆分紅了順序發佈和順序消費。那麼多線程中發送消息算不算順序發佈?
如上一部分介紹的,多線程中若沒有因果關係則沒有順序。那麼用戶在多線程中去發消息就意味着用戶不關心那些在不一樣線程中被髮送的消息的順序。即多線程發送的消息,不一樣線程間的消息不是順序發佈的,同一線程的消息是順序發佈的。這是須要用戶本身去保障的。
而對於順序消費,則須要保證哪些來自同一個發送線程的消息在消費時是按照相同的順序被處理的(爲何不說他們應該在一個線程中被消費呢?)。
全局順序實際上是分區順序的一個特例,即便Topic只有一個分區(如下不在討論全局順序,由於全局順序將面臨性能的問題,並且絕大多數場景都不須要全局順序)。
如何保證順序
在MQ的模型中,順序須要由3個階段去保障:
發送時保持順序意味着對於有順序要求的消息,用戶應該在同一個線程中採用同步的方式發送。存儲保持和發送的順序一致則要求在同一線程中被髮送出來的消息A和B,存儲時在空間上A必定在B以前。而消費保持和存儲一致則要求消息A、B到達Consumer以後必須按照先A後B的順序被處理。
以下圖所示:
對於兩個訂單的消息的原始數據:a一、b一、b二、a二、a三、b3(絕對時間下發生的順序):
開源RocketMQ中順序的實現
上圖是RocketMQ順序消息原理的介紹,將不一樣訂單的消息路由到不一樣的分區中。文檔只是給出了Producer順序的處理,Consumer消費時經過一個分區只能有一個線程消費的方式來保證消息順序,具體實現以下。
Producer端
Producer端確保消息順序惟一要作的事情就是將消息路由到特定的分區,在RocketMQ中,經過MessageQueueSelector來實現分區的選擇。
好比以下實現就能夠保證相同的訂單的消息被路由到相同的分區:
long orderId = ((Order) object).getOrderId; return mqs.get(orderId % mqs.size());
Consumer端
RocketMQ消費端有兩種類型:MQPullConsumer和MQPushConsumer。
MQPullConsumer由用戶控制線程,主動從服務端獲取消息,每次獲取到的是一個MessageQueue中的消息。PullResult中的List msgFoundList天然和存儲順序一致,用戶須要再拿到這批消息後本身保證消費的順序。
對於PushConsumer,由用戶註冊MessageListener來消費消息,在客戶端中須要保證調用MessageListener時消息的順序性。RocketMQ中的實現以下:
保證消費順序的核心思想是:
順序和異常的關係
順序消息須要Producer和Consumer都保證順序。Producer須要保證消息被路由到正確的分區,消息須要保證每一個分區的數據只有一個線程消息,那麼就會有一些缺陷:
不能更換MessageQueue重試就須要MessageQueue有本身的副本,經過Raft、Paxos之類的算法保證有可用的副本,或者經過其餘高可用的存儲設備來存儲MessageQueue。
熱點問題好像沒有什麼好的解決辦法,只能經過拆分MessageQueue和優化路由方法來儘可能均衡的將消息分配到不一樣的MessageQueue。
消費並行度理論上不會有太大問題,由於MessageQueue的數量能夠調整。
消費失敗的沒法跳過是不可避免的,由於跳過可能致使後續的數據處理都是錯誤的。不過能夠提供一些策略,由用戶根據錯誤類型來決定是否跳過,而且提供重試隊列之類的功能,在跳過以後用戶能夠在「其餘」地方從新消費到這條消息。
<br/>
感謝極客時間所屬的《消息隊列高手課》 連接
<br/>
本篇是一篇大合集,中間確定參考了許多其餘人的文章內容或圖片,但因爲時間比較久遠,當時並無一一記錄,爲此表示歉意,若是有做者發現了本身的文章或圖片,能夠私聊我,我會進行補充。
若是你發現寫的還不錯,能夠搜索公衆號「是Kerwin啊」,一塊兒進步!
也能夠查看Kerwin的GitHub主頁