分佈式開放消息系統(RocketMQ)的原理與實踐
分佈式消息系統做爲實現分佈式系統可擴展、可伸縮性的關鍵組件,須要具備高吞吐量、高可用等特色。而談到消息系統的設計,就回避不了兩個問題:java
- 消息的順序問題
- 消息的重複問題
RocketMQ做爲阿里開源的一款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關鍵特性?其實現原理是怎樣的?mysql
關鍵特性以及其實現原理git
1、順序消息github
消息有序指的是一類消息消費時,能按照發送的順序來消費。例如:一個訂單產生了 3 條消息,分別是訂單建立、訂單付款、訂單完成。消費時,要按照這個順序消費纔有意義。但同時訂單之間又是能夠並行消費的。算法
假如生產者產生了2條消息:M一、M2,要保證這兩條消息的順序,應該怎樣作?你腦中想到的多是這樣:sql
你可能會採用這種方式保證消息順序數據庫
M1發送到S1後,M2發送到S2,若是要保證M1先於M2被消費,那麼須要M1到達消費端後,通知S2,而後S2再將M2發送到消費端。緩存
這個模型存在的問題是,若是M1和M2分別發送到兩臺Server上,就不能保證M1先達到,也就不能保證M1被先消費,那麼就須要在MQ Server集羣維護消息的順序。那麼如何解決?一種簡單的方式就是將M一、M2發送到同一個Server上:安全
保證消息順序,你改進後的方法服務器
這樣能夠保證M1先於M2到達MQServer(客戶端等待M1成功後再發送M2),根據先達到先被消費的原則,M1會先於M2被消費,這樣就保證了消息的順序。
這個模型,理論上能夠保證消息的順序,但在實際運用中你應該會遇到下面的問題:
網絡延遲問題
只要將消息從一臺服務器發往另外一臺服務器,就會存在網絡延遲問題。如上圖所示,若是發送M1耗時大於發送M2的耗時,那麼M2就先被消費,仍然不能保證消息的順序。即便M1和M2同時到達消費端,因爲不清楚消費端1和消費端2的負載狀況,仍然有可能出現M2先於M1被消費。如何解決這個問題?將M1和M2發往同一個消費者便可,且發送M1後,須要消費端響應成功後才能發送M2。
但又會引入另一個問題,若是發送M1後,消費端1沒有響應,那是繼續發送M2呢,仍是從新發送M1?通常爲了保證消息必定被消費,確定會選擇重發M1到另一個消費端2,就以下圖所示。
保證消息順序的正確姿式
這樣的模型就嚴格保證消息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種狀況,一種是M1確實沒有到達,另一種狀況是消費端1已經響應,可是Server端沒有收到。若是是第二種狀況,重發M1,就會形成M1被重複消費。也就是咱們後面要說的第二個問題,消息重複問題。
回過頭來看消息順序問題,嚴格的順序消息很是容易理解,並且處理問題也比較容易,要實現嚴格的順序消息,簡單且可行的辦法就是:
保證
生產者 - MQServer - 消費者
是一對一對一的關係
可是這樣設計,並行度就成爲了消息系統的瓶頸(吞吐量不夠),也會致使更多的異常處理,好比:只要消費端出現問題,就會致使整個處理流程阻塞,咱們不得不花費更多的精力來解決阻塞的問題。
但咱們的最終目標是要集羣的高容錯性和高吞吐量。這彷佛是一對不可調和的矛盾,那麼阿里是如何解決的?
世界上解決一個計算機問題最簡單的方法:「剛好」不須要解決它!
有些問題,看起來很重要,但實際上咱們能夠經過合理的設計或者將問題分解來規避。若是硬要把時間花在解決它們身上,其實是浪費的,效率低下的。從這個角度來看消息的順序問題,咱們能夠得出兩個結論:
一、不關注亂序的應用實際大量存在
二、隊列無序並不意味着消息無序
最後咱們從源碼角度分析RocketMQ怎麼實現發送順序消息。
通常消息是經過輪詢全部隊列來發送的(負載均衡策略),順序消息能夠根據業務,好比說訂單號相同的消息發送到同一個隊列。下面的示例中,OrderId相同的消息,會發送到同一個隊列:
// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash SendResult sendResult = producer.send(msg, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, orderId);
在獲取到路由信息之後,會根據MessageQueueSelector
實現的算法來選擇一個隊列,同一個OrderId獲取到的隊列是同一個隊列。
private SendResult send() { // 獲取topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); if (topicPublishInfo != null && topicPublishInfo.ok()) { MessageQueue mq = null; // 根據咱們的算法,選擇一個發送隊列 // 這裏的arg = orderId mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg); if (mq != null) { return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); } } }
2、消息重複
上面在解決消息順序問題時,引入了一個新的問題,就是消息重複。那麼RocketMQ是怎樣解決消息重複的問題呢?仍是「剛好」不解決。
形成消息的重複的根本緣由是:網絡不可達。只要經過網絡交換數據,就沒法避免這個問題。因此解決這個問題的辦法就是不解決,轉而繞過這個問題。那麼問題就變成了:若是消費端收到兩條同樣的消息,應該怎樣處理?
一、消費端處理消息的業務邏輯保持冪等性
二、保證每條消息都有惟一編號且保證消息處理成功與去重表的日誌同時出現
第1條很好理解,只要保持冪等性,無論來多少條重複消息,最後處理的結果都同樣。第2條原理就是利用一張日誌表來記錄已經處理成功的消息的ID,若是新到的消息ID已經在日誌表中,那麼就再也不處理這條消息。
咱們能夠看到第1條的解決方式,很明顯應該在消費端實現,不屬於消息系統要實現的功能。第2條能夠消息系統實現,也能夠業務端實現。正常狀況下出現重複消息的機率不必定大,且由消息系統實現的話,確定會對消息系統的吞吐量和高可用有影響,因此最好仍是由業務端本身處理消息重複的問題,這也是RocketMQ不解決消息重複的問題的緣由。
RocketMQ不保證消息不重複,若是你的業務須要保證嚴格的不重複消息,須要你本身在業務端去重。
3、事務消息
RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。首先討論一下什麼是事務消息以及支持事務消息的必要性。咱們以一個轉賬的場景爲例來講明這個問題:Bob向Smith轉帳100塊。
在單機環境下,執行事務的狀況,大概是下面這個樣子:
單機環境下轉帳事務示意圖
當用戶增加到必定程度,Bob和Smith的帳戶及餘額信息已經不在同一臺服務器上了,那麼上面的流程就變成了這樣:
集羣環境下轉帳事務示意圖
這時候你會發現,一樣是一個轉帳的業務,在集羣環境下,耗時竟然成倍的增加,這顯然是不可以接受的。那咱們如何來規避這個問題?
大事務 = 小事務 + 異步
將大事務拆分紅多個小事務異步執行。這樣基本上可以將跨機事務的執行效率優化到與單機一致。轉帳的事務就能夠分解成以下兩個小事務:
小事務+異步消息
圖中執行本地事務(Bob帳戶扣款)和發送異步消息應該保持同時成功或者失敗中,也就是扣款成功了,發送消息必定要成功,若是扣款失敗了,就不能再發送消息。那問題是:咱們是先扣款仍是先發送消息呢?
首先咱們看下,先發送消息,大體的示意圖以下:
事務消息:先發送消息
存在的問題是:若是消息發送成功,可是扣款失敗,消費端就會消費此消息,進而向Smith帳戶加錢。
先發消息不行,那咱們就先扣款唄,大體的示意圖以下:
事務消息-先扣款
存在的問題跟上面相似:若是扣款成功,發送消息失敗,就會出現Bob扣錢了,可是Smith帳戶未加錢。
可能你們會有不少的方法來解決這個問題,好比:直接將發消息放到Bob扣款的事務中去,若是發送失敗,拋出異常,事務回滾。這樣的處理方式也符合「剛好」不須要解決的原則。RocketMQ支持事務消息,下面咱們來看看RocketMQ是怎樣來實現的。
RocketMQ實現發送事務消息
RocketMQ第一階段發送Prepared消息
時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改狀態。細心的你可能又發現問題了,若是確認消息發送失敗了怎麼辦?RocketMQ會按期掃描消息集羣中的事物消息,這時候發現了Prepared消息
,它會向消息發送者確認,Bob的錢究竟是減了仍是沒減呢?若是減了是回滾仍是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
那咱們來看下RocketMQ源碼,是否是這樣來處理事務消息的。客戶端發送事務消息的部分(完整代碼請查看:rocketmq-example
工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer
):
// 未決事務,MQ服務器回查客戶端 // 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務 TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); // 構造事務消息的生產者 TransactionMQProducer producer = new TransactionMQProducer("groupName"); // 設置事務決斷處理類 producer.setTransactionCheckListener(transactionCheckListener); // 本地事務的處理邏輯,至關於示例中檢查Bob帳戶並扣錢的邏輯 TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); producer.start() // 構造MSG,省略構造參數 Message msg = new Message(......); // 發送消息 SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); producer.shutdown();
接着查看sendMessageInTransaction
方法的源碼,總共分爲3個階段:發送Prepared消息
、執行本地事務、發送確認消息。
public TransactionSendResult sendMessageInTransaction(.....) { // 邏輯代碼,非實際代碼 // 1.發送消息 sendResult = this.send(msg); // sendResult.getSendStatus() == SEND_OK // 2.若是消息發送成功,處理與消息關聯的本地事務單元 LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg); // 3.結束事務 this.endTransaction(sendResult, localTransactionState, localException); }
endTransaction
方法會將請求發往broker(mq server)
去更新事物消息的最終狀態:
- 根據
sendResult
找到Prepared消息
- 根據
localTransaction
更新消息的最終狀態
若是endTransaction
方法執行失敗,致使數據沒有發送到broker
,broker
會有回查線程定時(默認1分鐘)掃描每一個存儲事務狀態的表格文件,若是是已經提交或者回滾的消息直接跳過,若是是prepared狀態
則會向Producer
發起CheckTransaction
請求,Producer
會調用DefaultMQProducerImpl.checkTransactionState()
方法來處理broker
的定時回調請求,而checkTransactionState
會調用咱們的事務設置的決斷方法,最後調用endTransactionOneway
讓broker
來更新消息的最終狀態。
再回到轉帳的例子,若是Bob的帳戶的餘額已經減小,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題?解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程當中有可能會出現消息重複的問題,按照前面的思路解決便可。
消費事務消息
這樣基本上能夠解決超時問題,可是若是消費失敗怎麼辦?阿里提供給咱們的解決方法是:人工解決。你們能夠考慮一下,按照事務的流程,由於某種緣由Smith加款失敗,須要回滾整個流程。若是消息系統要實現這個回滾流程的話,系統複雜度將大大提高,且很容易出現Bug,估計出現Bug的機率會比消費失敗的機率大不少。咱們須要衡量是否值得花這麼大的代價來解決這樣一個出現機率很是小的問題,這也是你們在解決疑難問題時須要多多思考的地方。
20160321補充:在3.2.6版本中移除了事務消息的實現,因此此版本不支持事務消息,具體狀況請參考rocketmq的issues:
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156
4、Producer如何發送消息
Producer
輪詢某topic下的全部隊列的方式來實現發送方的負載均衡,以下圖所示:
producer發送消息負載均衡
首先分析一下RocketMQ的客戶端發送消息的源碼:
// 構造Producer DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName"); // 初始化Producer,整個應用生命週期內,只須要初始化1次 producer.start(); // 構造Message Message msg = new Message("TopicTest1",// topic "TagA",// tag:給消息打標籤,用於區分一類消息,可爲null "OrderID188",// key:自定義Key,能夠用於去重,可爲null ("Hello MetaQ").getBytes());// body:消息內容 // 發送消息並返回結果 SendResult sendResult = producer.send(msg); // 清理資源,關閉網絡鏈接,註銷本身 producer.shutdown();
在整個應用生命週期內,生產者須要調用一次start方法來初始化,初始化主要完成的任務有:
- 若是沒有指定
namesrv
地址,將會自動尋址- 啓動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清理已經掛掉的broker、向全部broker發送心跳...
- 啓動負載均衡的服務
初始化完成後,開始發送消息,發送消息的主要代碼以下:
private SendResult sendDefaultImpl(Message msg,......) { // 檢查Producer的狀態是不是RUNNING this.makeSureStateOK(); // 檢查msg是否合法:是否爲null、topic,body是否爲空、body是否超長 Validators.checkMessage(msg, this.defaultMQProducer); // 獲取topic路由信息 TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic()); // 從路由信息中選擇一個消息隊列 MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName); // 將消息發送到該隊列上去 sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout); }
代碼中須要關注的兩個方法tryToFindTopicPublishInfo
和selectOneMessageQueue
。前面說過在producer初始化時,會啓動定時任務獲取路由信息並更新到本地緩存,因此tryToFindTopicPublishInfo
會首先從緩存中獲取topic路由信息,若是沒有獲取到,則會本身去namesrv
獲取路由信息。selectOneMessageQueue
方法經過輪詢的方式,返回一個隊列,以達到負載均衡的目的。
若是Producer發送消息失敗,會自動重試,重試的策略:
- 重試次數 < retryTimesWhenSendFailed(可配置)
- 總的耗時(包含重試n次的耗時) < sendMsgTimeout(發送消息時傳入的參數)
- 同時知足上面兩個條件後,Producer會選擇另一個隊列發送消息
5、消息存儲
RocketMQ的消息存儲是由consume queue
和commit log
配合完成的。
一、Consume Queue
consume queue是消息的邏輯隊列,至關於字典的目錄,用來指定消息在物理文件commit log
上的位置。
咱們能夠在配置中指定consume queue與commitlog存儲的目錄
每一個topic
下的每一個queue
都有一個對應的consumequeue
文件,好比:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue文件組織,如圖所示:
Consume Queue文件組織示意圖
- 根據
topic
和queueId
來組織文件,圖中TopicA有兩個隊列0,1,那麼TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另外一個ConsumeQueue。 - 按照消費端的
GroupName
來分組重試隊列,若是消費端消費失敗,消息將被髮往重試隊列中,好比圖中的%RETRY%ConsumerGroupA
。 - 按照消費端的
GroupName
來分組死信隊列,若是消費端消費失敗,並重試指定次數後,仍然失敗,則發往死信隊列,好比圖中的%DLQ%ConsumerGroupA
。
死信隊列(Dead Letter Queue)通常用於存放因爲某種緣由沒法傳遞的消息,好比處理失敗或者已通過期的消息。
Consume Queue中存儲單元是一個20字節定長的二進制數據,順序寫順序讀,以下圖所示:
consumequeue文件存儲單元格式
- CommitLog Offset是指這條消息在Commit Log文件中的實際偏移量
- Size存儲中消息的大小
- Message Tag HashCode存儲消息的Tag的哈希值:主要用於訂閱時消息過濾(訂閱時若是指定了Tag,會根據HashCode來快速查找到訂閱的消息)
二、Commit Log
CommitLog:消息存放的物理文件,每臺broker
上的commitlog
被本機全部的queue
共享,不作任何區分。
文件的默認位置以下,仍然可經過配置文件修改:
${user.home} \store\${commitlog}\${fileName}
CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構以下表所示,按照編號順序以及編號對應的內容依次存儲。
Commit Log存儲單元結構圖
三、消息存儲實現
消息存儲實現,比較複雜,也值得你們深刻了解,後面會單獨成文來分析,這小節只以代碼說明一下具體的流程。
// Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting msg.setBodyCRC(UtilAll.crc32(msg.getBody())); StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); synchronized (this) { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); // Here settings are stored timestamp, in order to ensure an orderly global msg.setStoreTimestamp(beginLockTimestamp); // MapedFile:操做物理文件在內存中的映射以及將內存數據持久化到物理文件中 MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile(); // 將Message追加到文件commitlog result = mapedFile.appendMessage(msg, this.appendMessageCallback); switch (result.getStatus()) { case PUT_OK:break; case END_OF_FILE: // Create a new file, re-write the message mapedFile = this.mapedFileQueue.getLastMapedFile(); result = mapedFile.appendMessage(msg, this.appendMessageCallback); break; DispatchRequest dispatchRequest = new DispatchRequest( topic,// 1 queueId,// 2 result.getWroteOffset(),// 3 result.getWroteBytes(),// 4 tagsCode,// 5 msg.getStoreTimestamp(),// 6 result.getLogicsOffset(),// 7 msg.getKeys(),// 8 /** * Transaction */ msg.getSysFlag(),// 9 msg.getPreparedTransactionOffset());// 10 // 1.分發消息位置到ConsumeQueue // 2.分發到IndexService創建索引 this.defaultMessageStore.putDispatchRequest(dispatchRequest); }
四、消息的索引文件
若是一個消息包含key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:
消息索引
索引文件主要用於根據key來查詢消息的,流程主要是:
- 根據查詢的 key 的 hashcode%slotNum 獲得具體的槽的位置(slotNum 是一個索引文件裏面包含的最大槽的數目,例如圖中所示 slotNum=5000000)
- 根據 slotValue(slot 位置對應的值)查找到索引項列表的最後一項(倒序排列,slotValue 老是指向最新的一個索引項)
- 遍歷索引項列表返回查詢時間範圍內的結果集(默認一次最大返回的 32 條記錄)
6、消息訂閱
RocketMQ消息訂閱有兩種模式,一種是Push模式,即MQServer主動向消費端推送;另一種是Pull模式,即消費端在須要時,主動到MQServer拉取。但在具體實現時,Push和Pull模式都是採用消費端主動拉取的方式。
首先看下消費端的負載均衡:
消費端負載均衡
消費端會經過RebalanceService線程,10秒鐘作一次基於topic下的全部隊列負載:
- 遍歷Consumer下的全部topic,而後根據topic訂閱全部的消息
- 獲取同一topic和Consumer Group下的全部Consumer
- 而後根據具體的分配策略來分配消費隊列,分配的策略包含:平均分配、消費端配置等
如同上圖所示:若是有 5 個隊列,2 個 consumer,那麼第一個 Consumer 消費 3 個隊列,第二 consumer 消費 2 個隊列。這裏採用的就是平均分配策略,它相似於咱們的分頁,TOPIC下面的全部queue就是記錄,Consumer的個數就至關於總的頁數,那麼每頁有多少條記錄,就相似於某個Consumer會消費哪些隊列。
經過這樣的策略來達到大致上的平均消費,這樣的設計也能夠很方面的水平擴展Consumer來提升消費能力。
消費端的Push模式是經過長輪詢的模式來實現的,就如同下圖:
Push模式示意圖
Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求後,若是有消息就當即返回數據,Consumer端收到返回的消息後,再回調消費者設置的Listener方法。若是broker在收到Pull請求時,消息隊列裏沒有數據,broker端會阻塞請求直到有數據傳遞或超時才返回。
固然,Consumer端是經過一個線程將阻塞隊列LinkedBlockingQueue<PullRequest>
中的PullRequest
發送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest
時,若是發現沒有消息,就會把PullRequest
扔到ConcurrentHashMap中緩存起來。broker在啓動時,會啓動一個線程不停的從ConcurrentHashMap取出PullRequest
檢查,直到有數據返回。
7、RocketMQ的其餘特性
前面的6個特性都是基本上都是點到爲止,想要深刻了解,還須要你們多多查看源碼,多多在實際中運用。固然除了已經提到的特性外,RocketMQ還支持:
- 定時消息
- 消息的刷盤策略
- 主動同步策略:同步雙寫、異步複製
- 海量消息堆積能力
- 高效通訊
- .......
其中涉及到的不少設計思路和解決方法都值得咱們深刻研究:
- 消息的存儲設計:既要知足海量消息的堆積能力,又要知足極快的查詢效率,還要保證寫入的效率。
- 高效的通訊組件設計:高吞吐量,毫秒級的消息投遞能力都離不開高效的通訊。
- .......
RocketMQ最佳實踐
1、Producer最佳實踐
一、一個應用盡量用一個 Topic,消息子類型用 tags 來標識,tags 能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用 tags 在 broker 作消息過濾。
二、每一個消息在業務層面的惟一標識碼,要設置到 keys 字段,方便未來定位消息丟失問題。因爲是哈希索引,請務必保證 key 儘量惟一,這樣能夠避免潛在的哈希衝突。
三、消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。
四、對於消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。
五、某些應用若是不關注消息是否發送成功,請直接使用sendOneWay
方法發送消息。
2、Consumer最佳實踐
一、消費過程要作到冪等(即消費端去重)
二、儘可能使用批量方式消費方式,能夠很大程度上提升消費吞吐量。
三、優化每條消息消費過程
3、其餘配置
線上應該關閉autoCreateTopicEnable
,即在配置文件中將其設置爲false
。
RocketMQ在發送消息時,會首先獲取路由信息。若是是新的消息,因爲MQServer上面尚未建立對應的Topic
,這個時候,若是上面的配置打開的話,會返回默認TOPIC的(RocketMQ會在每臺broker
上面建立名爲TBW102
的TOPIC)路由信息,而後Producer
會選擇一臺Broker
發送消息,選中的broker
在存儲消息時,發現消息的topic
尚未建立,就會自動建立topic
。後果就是:之後全部該TOPIC的消息,都將發送到這臺broker
上,達不到負載均衡的目的。
因此基於目前RocketMQ的設計,建議關閉自動建立TOPIC的功能,而後根據消息量的大小,手動建立TOPIC。
RocketMQ設計相關
RocketMQ的設計假定:
每臺PC機器均可能宕機不可服務
任意集羣都有可能處理能力不足
最壞的狀況必定會發生
內網環境須要低延遲來提供最佳用戶體驗
RocketMQ的關鍵設計:
分佈式集羣化
強數據安全
海量數據堆積
毫秒級投遞延遲(推拉模式)
這是RocketMQ在設計時的假定前提以及須要到達的效果。我想這些假定適用於全部的系統設計。隨着咱們系統的服務的增多,每位開發者都要注意本身的程序是否存在單點故障,若是掛了應該怎麼恢復、能不能很好的水平擴展、對外的接口是否足夠高效、本身管理的數據是否足夠安全...... 多多規範本身的設計,才能開發出高效健壯的程序。
附錄:RocketMQ涉及到的幾個專業術語和總體架構介紹
1、RocketMQ中的專業術語
Topictopic
表示消息的第一級類型,好比一個電商系統的消息能夠分爲:交易消息、物流消息...... 一條消息必須有一個Topic
。
TagTag
表示消息的第二級類型,好比交易消息又能夠分爲:交易建立消息,交易完成消息..... 一條消息能夠沒有Tag
。RocketMQ提供2級消息分類,方便你們靈活控制。
Queue
一個topic
下,咱們能夠設置多個queue(消息隊列)
。當咱們發送消息時,須要要指定該消息的topic
。RocketMQ會輪詢該topic
下的全部隊列,將消息發送出去。
Producer 與 Producer GroupProducer
表示消息隊列的生產者。消息隊列的本質就是實現了publish-subscribe模式,生產者生產消息,消費者消費消息。因此這裏的Producer
就是用來生產和發送消息的,通常指業務系統。
Producer Group
是一類Producer
的集合名稱,這類Producer
一般發送一類消息,且發送邏輯一致。
Consumer 與 Consumer Group
消息消費者,通常由後臺系統異步消費消息。
Push Consumer
Consumer 的一種,應用一般向 Consumer 對象註冊一個 Listener 接口,一旦收到消息,Consumer 對象馬上回調 Listener 接口方法。Pull Consumer
Consumer 的一種,應用一般主動調用 Consumer 的拉消息方法從 Broker 拉消息,主動權由應用控制。
Consumer Group
是一類Consumer
的集合名稱,這類Consumer
一般消費一類消息,且消費邏輯一致。
Broker
消息的中轉者,負責存儲和轉發消息。能夠理解爲消息隊列服務器,提供了消息的接收、存儲、拉取和轉發服務。broker
是RocketMQ的核心,它不不能掛的,因此須要保證broker
的高可用。
廣播消費
一條消息被多個Consumer
消費,即便這些Consumer
屬於同一個Consumer Group
,消息也會被Consumer Group
中的每一個Consumer
都消費一次。在廣播消費中的Consumer Group
概念能夠認爲在消息劃分方面無心義。
集羣消費
一個Consumer Group
中的Consumer
實例平均分攤消費消息。例如某個Topic
有 9 條消息,其中一個Consumer Group
有 3 個實例(多是 3 個進程,或者 3 臺機器),那麼每一個實例只消費其中的 3 條消息。
NameServerNameServer
即名稱服務,兩個功能:
- 接收
broker
的請求,註冊broker
的路由信息 - 接口
client
的請求,根據某個topic
獲取其到broker
的路由信息NameServer
沒有狀態,能夠橫向擴展。每一個broker
在啓動的時候會到NameServer
註冊;Producer
在發送消息前會根據topic
到NameServer
獲取路由(到broker
)信息;Consumer
也會定時獲取topic
路由信息。
2、RocketMQ Overview
rocketmq overview
Producer
向一些隊列輪流發送消息,隊列集合稱爲Topic
,Consumer
若是作廣播消費,則一個consumer
實例消費這個Topic
對應的全部隊列;若是作集羣消費,則多個Consumer
實例平均消費這個Topic
對應的隊列集合。
再看下RocketMQ物理部署結構圖:
RocketMQ網絡部署圖
RocketMQ網絡部署特色:
-
Name Server
是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。- Broker部署相對複雜,Broker分爲
Master
與Slave
,一個Master
能夠對應多個Slave
,可是一個Slave
只能對應一個Master
,Master
與Slave
的對應關係經過指定相同的BrokerName
,不一樣的BrokerId
來定義,BrokerId=0
表示Master
,非0表示Slave
。Master
也能夠部署多個。每一個Broker
與Name Server
集羣中的全部節點創建長鏈接,定時註冊Topic
信息到全部Name Server
。 Producer
與Name Server
集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server
取Topic路由信息,並向提供Topic 服務的Master創建長鏈接,且定時向Master發送心跳。Producer 徹底無狀態,可集羣部署。Consumer
與Name Server
集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server
取Topic 路由信息,並向提供Topic
服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer
既能夠從Master訂閱消息,也能夠從Slave訂閱消息,訂閱規則由Broker配置決定。
RocketMQ事務消費和順序消費詳解
1、RocketMq有3中消息類型
1.普通消費
2. 順序消費
3.事務消費
- 順序消費場景
在網購的時候,咱們須要下單,那麼下單須要假若有三個順序,第1、建立訂單 ,第二:訂單付款,第三:訂單完成。也就是這個三個環節要有順序,這個訂單纔有意義。RocketMQ能夠保證順序消費。
- rocketMq實現順序消費的原理
produce在發送消息的時候,把消息發到同一個隊列(queue)中,消費者註冊消息監聽器爲MessageListenerOrderly,這樣就能夠保證消費端只有一個線程去消費消息
注意:是把把消息發到同一個隊列(queue),不是同一個topic,默認狀況下一個topic包括4個queue
單個節點(Producer端1個、Consumer端1個)
一、Producer.java
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,發送順序消息 */ public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 0); System.out.println(sendResult); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
二、Consumer.java
package order; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeOrderlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerOrderly; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * 順序消息消費,帶事務方式(應用可控制Offset何時提交) */ public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費<br> * 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設置自動提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer1 Started."); } }
結果以下圖所示:
這個五條數據被順序消費了
- 多個節點(Producer端1個、Consumer端2個)
Producer.java
package order; import java.util.List; import com.alibaba.rocketmq.client.exception.MQBrokerException; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.DefaultMQProducer; import com.alibaba.rocketmq.client.producer.MessageQueueSelector; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.common.message.Message; import com.alibaba.rocketmq.common.message.MessageQueue; import com.alibaba.rocketmq.remoting.exception.RemotingException; /** * Producer,發送順序消息 */ public class Producer { public static void main(String[] args) { try { DefaultMQProducer producer = new DefaultMQProducer("order_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", // "TagE" }; for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_1", "KEY" + i, ("order_1 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 0); System.out.println(sendResult); } for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_2", "KEY" + i, ("order_2 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 1); System.out.println(sendResult); } for (int i = 1; i <= 5; i++) { Message msg = new Message("TopicOrderTest", "order_3", "KEY" + i, ("order_3 " + i).getBytes()); SendResult sendResult = producer.send(msg, new MessageQueueSelector() { public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { Integer id = (Integer) arg; int index = id % mqs.size(); return mqs.get(index); } }, 2); System.out.println(sendResult); } producer.shutdown(); } catch (MQClientException e) { e.printStackTrace(); } catch (RemotingException e) { e.printStackTrace(); } catch (MQBrokerException e) { e.printStackTrace(); } catch (InterruptedException e) { e.printStackTrace(); } } }
Consumer1.java
/** * 順序消息消費,帶事務方式(應用可控制Offset何時提交) */ public class Consumer1 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費<br> * 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); /** * 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到 *,第二個線程沒法訪問這個隊列 */ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設置自動提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer1 Started."); } }
Consumer2.java
/** * 順序消息消費,帶事務方式(應用可控制Offset何時提交) */ public class Consumer2 { public static void main(String[] args) throws MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費<br> * 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicOrderTest", "*"); /** * 實現了MessageListenerOrderly表示一個隊列只會被一個線程取到 *,第二個線程沒法訪問這個隊列 */ consumer.registerMessageListener(new MessageListenerOrderly() { AtomicLong consumeTimes = new AtomicLong(0); public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { // 設置自動提交 context.setAutoCommit(true); for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } try { TimeUnit.SECONDS.sleep(5L); } catch (InterruptedException e) { e.printStackTrace(); } ; return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.out.println("Consumer2 Started."); } }
先啓動Consumer1和Consumer2,而後啓動Producer,Producer會發送15條消息
Consumer1消費狀況如圖,都按照順序執行了
Consumer2消費狀況如圖,都按照順序執行了
2、事務消費
這裏說的主要是分佈式事物。下面的例子的數據庫分別安裝在不一樣的節點上。
事物消費須要先說說什麼是事務。好比說:咱們跨行轉帳,從工商銀行轉到建設銀行,也就是我從工商銀行扣除1000元以後,個人建設銀行也必須加1000元。這樣才能保證數據的一致性。假如工商銀行轉1000元以後,建設銀行的服務器忽然宕機,那麼我扣除了1000,可是並無在建設銀行給我加1000,就出現了數據的不一致。所以加1000和減1000才行,減1000和減1000必須一塊兒成功,一塊兒失敗。
再好比,咱們進行網購的時候,咱們下單以後,訂單提交成功,倉庫商品的數量必須減一。可是訂單多是一個數據庫,倉庫數量可能又是在另個數據庫裏面。有可能訂單提交成功以後,倉庫數量服務器忽然宕機。這樣也出現了數據不一致的問題。
使用消息隊列來解決分佈式事物:
如今咱們去外面飯店吃飯,不少時候都不會直接給了錢以後直接在付款的窗口遞飯菜,而是付款以後他會給你一張小票,你拿着這個小票去出飯的窗口取飯。這裏和咱們的系統相似,提升了吞吐量。即便你到第二個窗口,師傅告訴你已經沒飯了,你能夠拿着這個憑證去退款,即便中途因爲出了意外你沒法到達窗口進行取飯,可是隻要憑證還在,能夠將錢退給你。這樣就保證了數據的一致性。
如何保證憑證(消息)有2種方法:
一、在工商銀行扣款的時候,餘額表扣除1000,同時記錄日誌,並且這2個表是在同一個數據庫實例中,可使用本地事物解決。而後咱們通知建設銀行須要加1000給該用戶,建設銀行收到以後給我返回已經加了1000給用戶的確認信息以後,我再標記日誌表裏面的日誌爲已經完成。
二、經過消息中間件
原文地址:http://www.jianshu.com/p/453c6e7ff81c
RocketMQ第一階段發送Prepared消息時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改消息的狀態。
細心的你可能又發現問題了,若是確認消息發送失敗了怎麼辦?RocketMQ會按期掃描消息集羣中的事物消息,若是發現了Prepared消息,它會向消息發送端(生產者)確認,Bob的錢究竟是減了仍是沒減呢?若是減了是回滾仍是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
例子:
Consumer.java
package transaction; import java.util.List; import com.alibaba.rocketmq.client.consumer.DefaultMQPushConsumer; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext; import com.alibaba.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus; import com.alibaba.rocketmq.client.consumer.listener.MessageListenerConcurrently; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.common.consumer.ConsumeFromWhere; import com.alibaba.rocketmq.common.message.MessageExt; /** * Consumer,訂閱消息 */ public class Consumer { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("transaction_Consumer"); consumer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); consumer.setConsumeMessageBatchMaxSize(10); /** * 設置Consumer第一次啓動是從隊列頭部開始消費仍是隊列尾部開始消費<br> * 若是非第一次啓動,那麼按照上次消費的位置繼續消費 */ consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTransactionTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println(msg + ",內容:" + new String(msg.getBody())); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER;// 重試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;// 成功 } }); consumer.start(); System.out.println("transaction_Consumer Started."); } }
Producer.java
package transaction; import com.alibaba.rocketmq.client.exception.MQClientException; import com.alibaba.rocketmq.client.producer.SendResult; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.client.producer.TransactionMQProducer; import com.alibaba.rocketmq.common.message.Message; /** * 發送事務消息例子 * */ public class Producer { public static void main(String[] args) throws MQClientException, InterruptedException { TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl(); TransactionMQProducer producer = new TransactionMQProducer("transaction_Producer"); producer.setNamesrvAddr("192.168.100.145:9876;192.168.100.146:9876;192.168.100.149:9876;192.168.100.239:9876"); // 事務回查最小併發數 producer.setCheckThreadPoolMinSize(2); // 事務回查最大併發數 producer.setCheckThreadPoolMaxSize(2); // 隊列數 producer.setCheckRequestHoldMax(2000); producer.setTransactionCheckListener(transactionCheckListener); producer.start(); // String[] tags = new String[] { "TagA", "TagB", "TagC", "TagD", "TagE" // }; TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl(); for (int i = 1; i <= 2; i++) { try { Message msg = new Message("TopicTransactionTest", "transaction" + i, "KEY" + i, ("Hello RocketMQ " + i).getBytes()); SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null); System.out.println(sendResult); Thread.sleep(10); } catch (MQClientException e) { e.printStackTrace(); } } for (int i = 0; i < 100000; i++) { Thread.sleep(1000); } producer.shutdown(); } }
TransactionExecuterImpl .java --執行本地事務
package transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionExecuter; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.common.message.Message; /** * 執行本地事務 */ public class TransactionExecuterImpl implements LocalTransactionExecuter { // private AtomicInteger transactionIndex = new AtomicInteger(1); public LocalTransactionState executeLocalTransactionBranch(final Message msg, final Object arg) { System.out.println("執行本地事務msg = " + new String(msg.getBody())); System.out.println("執行本地事務arg = " + arg); String tags = msg.getTags(); if (tags.equals("transaction2")) { System.out.println("======個人操做============,失敗了 -進行ROLLBACK"); return LocalTransactionState.ROLLBACK_MESSAGE; } return LocalTransactionState.COMMIT_MESSAGE; // return LocalTransactionState.UNKNOW; } }
TransactionCheckListenerImpl--未決事務,服務器回查客戶端(目前已經被閹割啦)
package transaction; import com.alibaba.rocketmq.client.producer.LocalTransactionState; import com.alibaba.rocketmq.client.producer.TransactionCheckListener; import com.alibaba.rocketmq.common.message.MessageExt; /** * 未決事務,服務器回查客戶端 */ public class TransactionCheckListenerImpl implements TransactionCheckListener { // private AtomicInteger transactionIndex = new AtomicInteger(0); //在這裏,咱們能夠根據由MQ回傳的key去數據庫查詢,這條數據究竟是成功了仍是失敗了。 public LocalTransactionState checkLocalTransactionState(MessageExt msg) { System.out.println("未決事務,服務器回查客戶端msg =" + new String(msg.getBody().toString())); // return LocalTransactionState.ROLLBACK_MESSAGE; return LocalTransactionState.COMMIT_MESSAGE; // return LocalTransactionState.UNKNOW; } }
producer端:發送數據到MQ,而且處理本地事物。這裏模擬了一個成功一個失敗。Consumer只會接收到本地事物成功的數據。第二個數據失敗了,不會被消費。
Consumer只會接收到一個,第二個數據不會被接收到
分佈式消息隊列RocketMQ&Kafka -- 消息的「順序消費」
在說到消息中間件的時候,咱們一般都會談到一個特性:消息的順序消費問題。這個問題看起來很簡單:Producer發送消息1, 2, 3。。。 Consumer按1, 2, 3。。。順序消費。
但實際狀況倒是:不管RocketMQ,仍是Kafka,缺省都不保證消息的嚴格有序消費!
這個特性看起來很簡單,但爲何缺省他們都不保證呢?
「嚴格的順序消費」有多麼困難
下面就從3個方面來分析一下,對於一個消息中間件來講,」嚴格的順序消費」有多麼困難,或者說不可能。
發送端
發送端不能異步發送,異步發送在發送失敗的狀況下,就沒辦法保證消息順序。
好比你連續發了1,2,3。 過了一會,返回結果1失敗,2, 3成功。你把1再從新發送1遍,這個時候順序就亂掉了。
存儲端
對於存儲端,要保證消息順序,會有如下幾個問題:
(1)消息不能分區。也就是1個topic,只能有1個隊列。在Kafka中,它叫作partition;在RocketMQ中,它叫作queue。 若是你有多個隊列,那同1個topic的消息,會分散到多個分區裏面,天然不能保證順序。
(2)即便只有1個隊列的狀況下,會有第2個問題。該機器掛了以後,可否切換到其餘機器?也就是高可用問題。
好比你當前的機器掛了,上面還有消息沒有消費完。此時切換到其餘機器,可用性保證了。但消息順序就亂掉了。
要想保證,一方面要同步複製,不能異步複製;另1方面得保證,切機器以前,掛掉的機器上面,全部消息必須消費完了,不能有殘留。很明顯,這個很難!!!
接收端
對於接收端,不能並行消費,也即不能開多線程或者多個客戶端消費同1個隊列。
總結
從上面的分析能夠看出,要保證消息的嚴格有序,有多麼困難!
發送端和接收端的問題,還好解決一點,限制異步發送,限制並行消費。但對於存儲端,機器掛了以後,切換的問題,就很難解決了。
你切換了,可能消息就會亂;你不切換,那就暫時不可用。這2者之間,就須要權衡了。
業務須要全局有序嗎?
經過上面分析能夠看出,要保證一個topic內部,消息嚴格的有序,是很困難的,或者說條件是很苛刻的。
那怎麼辦呢?咱們必定要使出全部力氣、用盡全部辦法,來保證消息的嚴格有序嗎?
這裏就須要從另一個角度去考慮這個問題:業務角度。正如在下面這篇博客中所說的:
http://www.jianshu.com/p/453c6e7ff81c
實際狀況中:
(1)不關注順序的業務大量存在;
(2) 隊列無序不表明消息無序。
第(2)條的意思是說:咱們不保證隊列的全局有序,但能夠保證消息的局部有序。
舉個例子:保證來自同1個order id的消息,是有序的!
下面就看一下在Kafka和RocketMQ中,分別是如何對待這個問題的:
Kafka中:發送1條消息的時候,能夠指定(topic, partition, key) 3個參數。partiton和key是可選的。
若是你指定了partition,那就是全部消息發往同1個partition,就是有序的。而且在消費端,Kafka保證,1個partition只能被1個consumer消費。
或者你指定key(好比order id),具備同1個key的全部消息,會發往同1個partition。也是有序的。
RocketMQ: RocketMQ在Kafka的基礎上,把這個限制更放寬了一步。只指定(topic, key),不指定具體發往哪一個隊列。也就是說,它更加不但願業務方,非要去要一個全局的嚴格有序。
消費模式
1、集羣消費
以前的博客中,啓動的都是單個Consumer,若是啓動多個呢?
RocketMQ-集羣消費
其實,對於RocketMQ而言,經過ConsumeGroup的機制,實現了自然的消息負載均衡!通俗點來講,RocketMQ中的消息經過ConsumeGroup實現了將消息分發到C1/C2/C3/……的機制,這意味着咱們將很是方便的經過加機器來實現水平擴展!
咱們考慮一下這種狀況:好比C2發生了重啓,一條消息發往C3進行消費,可是這條消息的處理須要0.1S,而此時C2恰好完成重啓,那麼C2是否可能會收到這條消息呢?答案是確定的,也就是consume broker的重啓,或者水平擴容,或者不遵照先訂閱後生產消息,均可能致使消息的重複消費!關於去重的話題會在後續中予以介紹!
至於消息分發到C1/C2/C3,其實也是能夠設置策略的:
RocketMQ-消息負載策略
使用哪一種策略,只須要實例化對應的對象便可,如:
AllocateMessageQueueStrategy aqs = new AllocateMessageQueueAveragelyByCircle(); consumer.setAllocateMessageQueueStrategy(aqs);
上面內容,實際上是一種消費模式——集羣消費。
RocketMQ的消費模式有2種,查看一下源碼:
public enum MessageModel { /** * broadcast */ BROADCASTING, /** * clustering */ CLUSTERING; }
在默認狀況下,就是集羣消費(CLUSTERING),也就是上面說起的消息的負載均衡消費。另外一種消費模式,是廣播消費(BROADCASTING)。
2、廣播消費
廣播消費,相似於ActiveMQ中的發佈訂閱模式,消息會發給Consume Group中的每個消費者進行消費。
RocketMQ-廣播消費模式設置
/** * Consumer,訂閱消息 */ public class Consumer2 { public static void main(String[] args) throws InterruptedException, MQClientException { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("group_name"); consumer.setNamesrvAddr("192.168.2.222:9876;192.168.2.223:9876"); consumer.setConsumeMessageBatchMaxSize(10); // 設置爲廣播消費模式 consumer.setMessageModel(MessageModel.BROADCASTING); consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET); consumer.subscribe("TopicTest", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { try { for (MessageExt msg : msgs) { System.out.println(" Receive New Messages: " + msg); } } catch (Exception e) { e.printStackTrace(); return ConsumeConcurrentlyStatus.RECONSUME_LATER; // 重試 } return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; // 成功 } }); consumer.start(); System.out.println("Consumer Started."); } }
內容補充
《RocketMQ(三)——HelloWorld》那篇博客的最後提到了單批次消息消費數量 ,本文既然提到了集羣消費,那就針對這兩個內容再進行一次補充吧。
若是咱們有2臺節點,Producerw往MQ上寫入20條數據 其中Consumer1中拉取了12條 。Consumer2中拉取了8 條,這種狀況下,假如Consumer1宕機,那麼咱們消費數據的時候,只能消費到Consumer2中的8條,Consumer1中的12條已經持久化了。須要Consumer1恢復以後這12條數據才能繼續被消費。其實這種先啓動producer往MQ上寫數據,而後再啓動Consumer的狀況原本就是違規操做,正確的狀況應該是先啓動Consumer後再啓動producer。
集羣消費和廣播消費
基本概念
MQ 是基於發佈訂閱模型的消息系統。在 MQ 消息系統中消息的訂閱方訂閱關注的 Topic,以獲取並消費消息。因爲訂閱方應用通常是分佈式系統,以集羣方式部署有多臺機器。所以 MQ 約定如下概念。
集羣:MQ 約定使用相同 Consumer ID 的訂閱者屬於同一個集羣,同一個集羣下的訂閱者消費邏輯必須徹底一致(包括 Tag 的使用),這些訂閱者在邏輯上能夠認爲是一個消費節點。
集羣消費:當使用集羣消費模式時,MQ 認爲任意一條消息只須要被集羣內的任意一個消費者處理便可。
廣播消費:當使用廣播消費模式時,MQ 會將每條消息推送給集羣內全部註冊過的客戶端,保證消息至少被每臺機器消費一次。
場景對比
集羣消費模式:
適用場景&注意事項
- 消費端集羣化部署,每條消息只須要被處理一次。
- 因爲消費進度在服務端維護,可靠性更高。
- 集羣消費模式下,每一條消息都只會被分發到一臺機器上處理,若是須要被集羣下的每一臺機器都處理,請使用廣播模式。
- 集羣消費模式下,不保證消息的每一次失敗重投等邏輯都能路由到同一臺機器上,所以處理消息時不該該作任何肯定性假設。
廣播消費模式:
適用場景&注意事項
- 每條消息都須要被相同邏輯的多臺機器處理。
- 消費進度在客戶端維護,出現重複的機率稍大於集羣模式。
- 廣播模式下,MQ 保證每條消息至少被每臺客戶端消費一次,可是並不會對消費失敗的消息進行失敗重投,所以業務方須要關注消費失敗的狀況。
- 廣播模式下,第一次啓動時默認從最新消息消費,客戶端的消費進度是被持久化在客戶端本地的隱藏文件中,所以不建議刪除該隱藏文件,不然會丟失部分消息。
- 廣播模式下,每條消息都會被大量的客戶端重複處理,所以推薦儘量使用集羣模式。
- 目前僅 Java 客戶端支持廣播模式。
- 廣播模式下服務端不維護消費進度,因此服務端不提供堆積查詢和報警功能。
使用集羣模式模擬廣播:
適用場景&注意事項
- 每條消息都須要被多臺機器處理,每臺機器的邏輯能夠相同也能夠不同。
- 消費進度在服務端維護,可靠性高於廣播模式。