RocketMQ做爲阿里開源的一款高性能、高吞吐量的消息中間件,它是怎樣來解決這兩個問題的?RocketMQ 有哪些關鍵特性?其實現原理是怎樣的?html
消息有序指的是能夠按照消息的發送順序來消費。例如:一筆訂單產生了 3 條消息,分別是訂單建立、訂單付款、訂單完成。消費時,要按照順序依次消費纔有意義。與此同時多筆訂單之間又是能夠並行消費的。首先已兩筆消息來看以下示例:java
假如生產者產生了2條消息:M一、M2,要保證這兩條消息的順序,應該怎樣作?你腦中想到的多是這樣:git
若是M1發送到S1,M2發送到S2,若是要保證M1先於M2被消費,那麼須要M1到達消費端被消費後,通知S2,而後S2再將M2發送到消費端。github
這個模型存在的問題是,若是M1和M2分別發送到兩臺Server上,就不能保證M1先達到MQ集羣,也不能保證M1被先消費。換個角度看,若是M2先於M1達到MQ集羣,甚至M2被消費後,M1才達到消費端,這時消息也就亂序了,說明以上模型是不能保證消息的順序的。如何才能在MQ集羣保證消息的順序?一種簡單的方式就是將M一、M2發送到同一個Server上:算法
這樣能夠保證M1先於M2到達MQServer(生產者等待M1發送成功後再發送M2),根據先達到先被消費的原則,M1會先於M2被消費,這樣就保證了消息的順序。數據庫
但這個模型也僅僅是理論上能夠保證消息的順序,但在實際運用中你應該會遇到下面的問題:緩存
只要將消息從一臺服務器發往另外一臺服務器,就會存在網絡延遲問題。如上圖所示,若是發送M1耗時大於發送M2的耗時,那麼M2就仍將被先消費,仍然不能保證消息的順序。即便M1和M2同時到達消費端,因爲不清楚消費端1和消費端2的負載狀況,仍然有可能出現M2先於M1被消費的狀況。安全
那如何解決這個問題?將M1和M2發往同一個消費者,且發送M1後,須要消費端響應成功後才能發送M2。bash
聰明的你可能已經想到另外的問題:若是M1被髮送到消費端後,消費端1沒有響應,那是繼續發送M2呢,仍是從新發送M1?通常爲了保證消息必定被消費,確定會選擇重發M1到另一個消費端2,就以下圖所示。服務器
這樣的模型就嚴格保證消息的順序,細心的你仍然會發現問題,消費端1沒有響應Server時有兩種狀況,一種是M1確實沒有到達(數據在網絡傳送中丟失),另一種消費端已經消費M1且已經發送響應消息,只是MQ Server端沒有收到。若是是第二種狀況,重發M1,就會形成M1被重複消費。也就引入了咱們要說的第二個問題,消息重複問題,這個後文會詳細講解。
回過頭來看消息順序問題,嚴格的順序消息很是容易理解,也能夠經過文中所描述的方式來簡單處理。總結起來,要實現嚴格的順序消息,簡單且可行的辦法就是:
保證
生產者 - MQServer - 消費者
是一對一對一的關係
這樣的設計雖然簡單易行,但也會存在一些很嚴重的問題,好比:
- 並行度就會成爲消息系統的瓶頸(吞吐量不夠)
- 更多的異常處理,好比:只要消費端出現問題,就會致使整個處理流程阻塞,咱們不得不花費更多的精力來解決阻塞的問題。
但咱們的最終目標是要集羣的高容錯性和高吞吐量。這彷佛是一對不可調和的矛盾,那麼阿里是如何解決的?
世界上解決一個計算機問題最簡單的方法:「剛好」不須要解決它!—— 沈詢
有些問題,看起來很重要,但實際上咱們能夠經過合理的設計或者將問題分解來規避。若是硬要把時間花在解決問題自己,實際上不只效率低下,並且也是一種浪費。從這個角度來看消息的順序問題,咱們能夠得出兩個結論:
- 不關注亂序的應用實際大量存在
- 隊列無序並不意味着消息無序
因此從業務層面來保證消息的順序而不只僅是依賴於消息系統,是否是咱們應該尋求的一種更合理的方式?
最後咱們從源碼角度分析RocketMQ怎麼實現發送順序消息。
RocketMQ經過輪詢全部隊列的方式來肯定消息被髮送到哪個隊列(負載均衡策略)。好比下面的示例中,訂單號相同的消息會被前後發送到同一個隊列中:
// RocketMQ經過MessageQueueSelector中實現的算法來肯定消息發送到哪個隊列上
// RocketMQ默認提供了兩種MessageQueueSelector實現:隨機/Hash
// 固然你能夠根據業務實現本身的MessageQueueSelector來決定消息按照何種策略發送到消息隊列中
SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
Integer id = (Integer) arg;
int index = id % mqs.size();
return mqs.get(index);
}
}, orderId);
在獲取到路由信息之後,會根據MessageQueueSelector
實現的算法來選擇一個隊列,同一個OrderId獲取到的確定是同一個隊列。
private SendResult send() {
// 獲取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok()) {
MessageQueue mq = null;
// 根據咱們的算法,選擇一個發送隊列
// 這裏的arg = orderId
mq = selector.select(topicPublishInfo.getMessageQueueList(), msg, arg);
if (mq != null) {
return this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
}
}
上面在解決消息順序問題時,引入了一個新的問題,就是消息重複。那麼RocketMQ是怎樣解決消息重複的問題呢?仍是「剛好」不解決。
形成消息重複的根本緣由是:網絡不可達。只要經過網絡交換數據,就沒法避免這個問題。因此解決這個問題的辦法就是繞過這個問題。那麼問題就變成了:若是消費端收到兩條同樣的消息,應該怎樣處理?
- 消費端處理消息的業務邏輯保持冪等性
- 保證每條消息都有惟一編號且保證消息處理成功與去重表的日誌同時出現
第1條很好理解,只要保持冪等性,無論來多少條重複消息,最後處理的結果都同樣。第2條原理就是利用一張日誌表來記錄已經處理成功的消息的ID,若是新到的消息ID已經在日誌表中,那麼就再也不處理這條消息。
第1條解決方案,很明顯應該在消費端實現,不屬於消息系統要實現的功能。第2條能夠消息系統實現,也能夠業務端實現。正常狀況下出現重複消息的機率其實很小,若是由消息系統來實現的話,確定會對消息系統的吞吐量和高可用有影響,因此最好仍是由業務端本身處理消息重複的問題,這也是RocketMQ不解決消息重複的問題的緣由。
RocketMQ不保證消息不重複,若是你的業務須要保證嚴格的不重複消息,須要你本身在業務端去重。
RocketMQ除了支持普通消息,順序消息,另外還支持事務消息。首先討論一下什麼是事務消息以及支持事務消息的必要性。咱們以一個轉賬的場景爲例來講明這個問題:Bob向Smith轉帳100塊。
在單機環境下,執行事務的狀況,大概是下面這個樣子:
當用戶增加到必定程度,Bob和Smith的帳戶及餘額信息已經不在同一臺服務器上了,那麼上面的流程就變成了這樣:
這時候你會發現,一樣是一個轉帳的業務,在集羣環境下,耗時竟然成倍的增加,這顯然是不可以接受的。那如何來規避這個問題?
大事務 = 小事務 + 異步
將大事務拆分紅多個小事務異步執行。這樣基本上可以將跨機事務的執行效率優化到與單機一致。轉帳的事務就能夠分解成以下兩個小事務:
圖中執行本地事務(Bob帳戶扣款)和發送異步消息應該保證同時成功或者同時失敗,也就是扣款成功了,發送消息必定要成功,若是扣款失敗了,就不能再發送消息。那問題是:咱們是先扣款仍是先發送消息呢?
首先看下先發送消息的狀況,大體的示意圖以下:
存在的問題是:若是消息發送成功,可是扣款失敗,消費端就會消費此消息,進而向Smith帳戶加錢。
先發消息不行,那就先扣款吧,大體的示意圖以下:
存在的問題跟上面相似:若是扣款成功,發送消息失敗,就會出現Bob扣錢了,可是Smith帳戶未加錢。
可能你們會有不少的方法來解決這個問題,好比:直接將發消息放到Bob扣款的事務中去,若是發送失敗,拋出異常,事務回滾。這樣的處理方式也符合「剛好」不須要解決的原則。
這裏須要說明一下:若是使用Spring來管理事物的話,大能夠將發送消息的邏輯放到本地事物中去,發送消息失敗拋出異常,Spring捕捉到異常後就會回滾此事物,以此來保證本地事物與發送消息的原子性。
RocketMQ支持事務消息,下面來看看RocketMQ是怎樣來實現的。
RocketMQ第一階段發送Prepared消息
時,會拿到消息的地址,第二階段執行本地事物,第三階段經過第一階段拿到的地址去訪問消息,並修改消息的狀態。
細心的你可能又發現問題了,若是確認消息發送失敗了怎麼辦?RocketMQ會按期掃描消息集羣中的事物消息,若是發現了Prepared消息
,它會向消息發送端(生產者)確認,Bob的錢究竟是減了仍是沒減呢?若是減了是回滾仍是繼續發送確認消息呢?RocketMQ會根據發送端設置的策略來決定是回滾仍是繼續發送確認消息。這樣就保證了消息發送與本地事務同時成功或同時失敗。
那咱們來看下RocketMQ源碼,是如何處理事務消息的。客戶端發送事務消息的部分(完整代碼請查看:rocketmq-example
工程下的com.alibaba.rocketmq.example.transaction.TransactionProducer
):
// =============================發送事務消息的一系列準備工做========================================
// 未決事務,MQ服務器回查客戶端
// 也就是上文所說的,當RocketMQ發現`Prepared消息`時,會根據這個Listener實現的策略來決斷事務
TransactionCheckListener transactionCheckListener = new TransactionCheckListenerImpl();
// 構造事務消息的生產者
TransactionMQProducer producer = new TransactionMQProducer("groupName");
// 設置事務決斷處理類
producer.setTransactionCheckListener(transactionCheckListener);
// 本地事務的處理邏輯,至關於示例中檢查Bob帳戶並扣錢的邏輯
TransactionExecuterImpl tranExecuter = new TransactionExecuterImpl();
producer.start()
// 構造MSG,省略構造參數
Message msg = new Message(......);
// 發送消息
SendResult sendResult = producer.sendMessageInTransaction(msg, tranExecuter, null);
producer.shutdown();
接着查看sendMessageInTransaction
方法的源碼,總共分爲3個階段:發送Prepared消息
、執行本地事務、發送確認消息。
// ================================事務消息的發送過程=============================================
public TransactionSendResult sendMessageInTransaction(.....) {
// 邏輯代碼,非實際代碼
// 1.發送消息
sendResult = this.send(msg);
// sendResult.getSendStatus() == SEND_OK
// 2.若是消息發送成功,處理與消息關聯的本地事務單元
LocalTransactionState localTransactionState = tranExecuter.executeLocalTransactionBranch(msg, arg);
// 3.結束事務
this.endTransaction(sendResult, localTransactionState, localException);
}
endTransaction
方法會將請求發往broker(mq server)
去更新事務消息的最終狀態:
sendResult
找到Prepared消息
,sendResult
包含事務消息的IDlocalTransaction
更新消息的最終狀態若是endTransaction
方法執行失敗,數據沒有發送到broker
,致使事務消息的 狀態更新失敗,broker
會有回查線程定時(默認1分鐘)掃描每一個存儲事務狀態的表格文件,若是是已經提交或者回滾的消息直接跳過,若是是prepared狀態
則會向Producer
發起CheckTransaction
請求,Producer
會調用DefaultMQProducerImpl.checkTransactionState()
方法來處理broker
的定時回調請求,而checkTransactionState
會調用咱們的事務設置的決斷方法來決定是回滾事務仍是繼續執行,最後調用endTransactionOneway
讓broker
來更新消息的最終狀態。
再回到轉帳的例子,若是Bob的帳戶的餘額已經減小,且消息已經發送成功,Smith端開始消費這條消息,這個時候就會出現消費失敗和消費超時兩個問題,解決超時問題的思路就是一直重試,直到消費端消費消息成功,整個過程當中有可能會出現消息重複的問題,按照前面的思路解決便可。
這樣基本上能夠解決消費端超時問題,可是若是消費失敗怎麼辦?阿里提供給咱們的解決方法是:人工解決。你們能夠考慮一下,按照事務的流程,由於某種緣由Smith加款失敗,那麼須要回滾整個流程。若是消息系統要實現這個回滾流程的話,系統複雜度將大大提高,且很容易出現Bug,估計出現Bug的機率會比消費失敗的機率大不少。這也是RocketMQ目前暫時沒有解決這個問題的緣由,在設計實現消息系統時,咱們須要衡量是否值得花這麼大的代價來解決這樣一個出現機率很是小的問題,這也是你們在解決疑難問題時須要多多思考的地方。
20160321補充:在3.2.6版本中移除了事務消息的實現,因此此版本不支持事務消息,具體狀況請參考rocketmq的issues:
https://github.com/alibaba/RocketMQ/issues/65
https://github.com/alibaba/RocketMQ/issues/138
https://github.com/alibaba/RocketMQ/issues/156
Producer
輪詢某topic下的全部隊列的方式來實現發送方的負載均衡,以下圖所示:
首先分析一下RocketMQ的客戶端發送消息的源碼:
// 構造Producer
DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName");
// 初始化Producer,整個應用生命週期內,只須要初始化1次
producer.start();
// 構造Message
Message msg = new Message("TopicTest1",// topic
"TagA",// tag:給消息打標籤,用於區分一類消息,可爲null
"OrderID188",// key:自定義Key,能夠用於去重,可爲null
("Hello MetaQ").getBytes());// body:消息內容
// 發送消息並返回結果
SendResult sendResult = producer.send(msg);
// 清理資源,關閉網絡鏈接,註銷本身
producer.shutdown();
在整個應用生命週期內,生產者須要調用一次start方法來初始化,初始化主要完成的任務有:
- 若是沒有指定
namesrv
地址,將會自動尋址- 啓動定時任務:更新namesrv地址、從namsrv更新topic路由信息、清理已經掛掉的broker、向全部broker發送心跳...
- 啓動負載均衡的服務
初始化完成後,開始發送消息,發送消息的主要代碼以下:
private SendResult sendDefaultImpl(Message msg,......) {
// 檢查Producer的狀態是不是RUNNING
this.makeSureStateOK();
// 檢查msg是否合法:是否爲null、topic,body是否爲空、body是否超長
Validators.checkMessage(msg, this.defaultMQProducer);
// 獲取topic路由信息
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
// 從路由信息中選擇一個消息隊列
MessageQueue mq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);
// 將消息發送到該隊列上去
sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);
}
代碼中須要關注的兩個方法tryToFindTopicPublishInfo
和selectOneMessageQueue
。前面說過在producer初始化時,會啓動定時任務獲取路由信息並更新到本地緩存,因此tryToFindTopicPublishInfo
會首先從緩存中獲取topic路由信息,若是沒有獲取到,則會本身去namesrv
獲取路由信息。selectOneMessageQueue
方法經過輪詢的方式,返回一個隊列,以達到負載均衡的目的。
若是Producer發送消息失敗,會自動重試,重試的策略:
RocketMQ的消息存儲是由consume queue
和commit log
配合完成的。
consume queue
是消息的邏輯隊列,至關於字典的目錄,用來指定消息在物理文件commit log
上的位置。
咱們能夠在配置中指定consumequeue
與commitlog
存儲的目錄
每一個topic
下的每一個queue
都有一個對應的consumequeue
文件,好比:
${rocketmq.home}/store/consumequeue/${topicName}/${queueId}/${fileName}
Consume Queue文件組織,如圖所示:
topic
和queueId
來組織文件,圖中TopicA有兩個隊列0,1,那麼TopicA和QueueId=0組成一個ConsumeQueue,TopicA和QueueId=1組成另外一個ConsumeQueue。GroupName
來分組重試隊列,若是消費端消費失敗,消息將被髮往重試隊列中,好比圖中的%RETRY%ConsumerGroupA
。GroupName
來分組死信隊列,若是消費端消費失敗,並重試指定次數後,仍然失敗,則發往死信隊列,好比圖中的%DLQ%ConsumerGroupA
。死信隊列(Dead Letter Queue)通常用於存放因爲某種緣由沒法傳遞的消息,好比處理失敗或者已通過期的消息。
Consume Queue中存儲單元是一個20字節定長的二進制數據,順序寫順序讀,以下圖所示:
CommitLog:消息存放的物理文件,每臺broker
上的commitlog
被本機全部的queue
共享,不作任何區分。
文件的默認位置以下,仍然可經過配置文件修改:
${user.home} \store\${commitlog}\${fileName}
CommitLog的消息存儲單元長度不固定,文件順序寫,隨機讀。消息的存儲結構以下表所示,按照編號順序以及編號對應的內容依次存儲。
消息存儲實現,比較複雜,也值得你們深刻了解,後面會單獨成文來分析(目前正在收集素材),這小節只以代碼說明一下具體的流程。
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
synchronized (this) {
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
// Here settings are stored timestamp, in order to ensure an orderly global
msg.setStoreTimestamp(beginLockTimestamp);
// MapedFile:操做物理文件在內存中的映射以及將內存數據持久化到物理文件中
MapedFile mapedFile = this.mapedFileQueue.getLastMapedFile();
// 將Message追加到文件commitlog
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:break;
case END_OF_FILE:
// Create a new file, re-write the message
mapedFile = this.mapedFileQueue.getLastMapedFile();
result = mapedFile.appendMessage(msg, this.appendMessageCallback);
break;
DispatchRequest dispatchRequest = new DispatchRequest(
topic,// 1
queueId,// 2
result.getWroteOffset(),// 3
result.getWroteBytes(),// 4
tagsCode,// 5
msg.getStoreTimestamp(),// 6
result.getLogicsOffset(),// 7
msg.getKeys(),// 8
/**
* Transaction
*/
msg.getSysFlag(),// 9
msg.getPreparedTransactionOffset());// 10
// 1.分發消息位置到ConsumeQueue
// 2.分發到IndexService創建索引
this.defaultMessageStore.putDispatchRequest(dispatchRequest);
}
若是一個消息包含key值的話,會使用IndexFile存儲消息索引,文件的內容結構如圖:
索引文件主要用於根據key來查詢消息的,流程主要是:
RocketMQ消息訂閱有兩種模式,一種是Push模式,即MQServer主動向消費端推送;另一種是Pull模式,即消費端在須要時,主動到MQServer拉取。但在具體實現時,Push和Pull模式都是採用消費端主動拉取的方式。
首先看下消費端的負載均衡:
消費端會經過RebalanceService線程,10秒鐘作一次基於topic下的全部隊列負載:
如同上圖所示:若是有 5 個隊列,2 個 consumer,那麼第一個 Consumer 消費 3 個隊列,第二 consumer 消費 2 個隊列。這裏採用的就是平均分配策略,它相似於分頁的過程,TOPIC下面的全部queue就是記錄,Consumer的個數就至關於總的頁數,那麼每頁有多少條記錄,就相似於某個Consumer會消費哪些隊列。
經過這樣的策略來達到大致上的平均消費,這樣的設計也能夠很方面的水平擴展Consumer來提升消費能力。
消費端的Push模式是經過長輪詢的模式來實現的,就如同下圖:
Consumer端每隔一段時間主動向broker發送拉消息請求,broker在收到Pull請求後,若是有消息就當即返回數據,Consumer端收到返回的消息後,再回調消費者設置的Listener方法。若是broker在收到Pull請求時,消息隊列裏沒有數據,broker端會阻塞請求直到有數據傳遞或超時才返回。
固然,Consumer端是經過一個線程將阻塞隊列LinkedBlockingQueue<PullRequest>
中的PullRequest
發送到broker拉取消息,以防止Consumer一致被阻塞。而Broker端,在接收到Consumer的PullRequest
時,若是發現沒有消息,就會把PullRequest
扔到ConcurrentHashMap中緩存起來。broker在啓動時,會啓動一個線程不停的從ConcurrentHashMap取出PullRequest
檢查,直到有數據返回。
前面的6個特性都是基本上都是點到爲止,想要深刻了解,還須要你們多多查看源碼,多多在實際中運用。固然除了已經提到的特性外,RocketMQ還支持:
其中涉及到的不少設計思路和解決方法都值得咱們深刻研究:
一、一個應用盡量用一個 Topic,消息子類型用 tags 來標識,tags 能夠由應用自由設置。只有發送消息設置了tags,消費方在訂閱消息時,才能夠利用 tags 在 broker 作消息過濾。
二、每一個消息在業務層面的惟一標識碼,要設置到 keys 字段,方便未來定位消息丟失問題。因爲是哈希索引,請務必保證 key 儘量惟一,這樣能夠避免潛在的哈希衝突。
三、消息發送成功或者失敗,要打印消息日誌,務必要打印 sendresult 和 key 字段。
四、對於消息不可丟失應用,務必要有消息重發機制。例如:消息發送失敗,存儲到數據庫,能有定時程序嘗試重發或者人工觸發重發。
五、某些應用若是不關注消息是否發送成功,請直接使用sendOneWay
方法發送消息。
一、消費過程要作到冪等(即消費端去重)
二、儘可能使用批量方式消費方式,能夠很大程度上提升消費吞吐量。
三、優化每條消息消費過程
線上應該關閉autoCreateTopicEnable
,即在配置文件中將其設置爲false
。
RocketMQ在發送消息時,會首先獲取路由信息。若是是新的消息,因爲MQServer上面尚未建立對應的Topic
,這個時候,若是上面的配置打開的話,會返回默認TOPIC的(RocketMQ會在每臺broker
上面建立名爲TBW102
的TOPIC)路由信息,而後Producer
會選擇一臺Broker
發送消息,選中的broker
在存儲消息時,發現消息的topic
尚未建立,就會自動建立topic
。後果就是:之後全部該TOPIC的消息,都將發送到這臺broker
上,達不到負載均衡的目的。
因此基於目前RocketMQ的設計,建議關閉自動建立TOPIC的功能,而後根據消息量的大小,手動建立TOPIC。
RocketMQ的設計假定:
每臺PC機器均可能宕機不可服務
任意集羣都有可能處理能力不足
最壞的狀況必定會發生
內網環境須要低延遲來提供最佳用戶體驗
RocketMQ的關鍵設計:
分佈式集羣化
強數據安全
海量數據堆積
毫秒級投遞延遲(推拉模式)
這是RocketMQ在設計時的假定前提以及須要到達的效果。我想這些假定適用於全部的系統設計。隨着咱們系統的服務的增多,每位開發者都要注意本身的程序是否存在單點故障,若是掛了應該怎麼恢復、能不能很好的水平擴展、對外的接口是否足夠高效、本身管理的數據是否足夠安全...... 多多規範本身的設計,才能開發出高效健壯的程序。