舒適提示:建議參考代碼RocketMQ4.4版本,4.5版本引入了多副本機制,實現了主從自動切換,本文並不關心主從切換功能。
主從同步基本實現過程以下圖所示:
RocketMQ 的主從同步機制以下:
A. 首先啓動Master並在指定端口監聽;
B. 客戶端啓動,主動鏈接Master,創建TCP鏈接;
C. 客戶端以每隔5s的間隔時間向服務端拉取消息,若是是第一次拉取的話,先獲取本地commitlog文件中最大的偏移量,以該偏移量向服務端拉取消息;
D. 服務端解析請求,並返回一批數據給客戶端;
E. 客戶端收到一批消息後,將消息寫入本地commitlog文件中,而後向Master彙報拉取進度,並更新下一次待拉取偏移量;
F. 而後重複第3步;java
RocketMQ主從同步一個重要的特徵:主從同步不具有主從切換功能,即當主節點宕機後,從不會接管消息發送,但能夠提供消息讀取。json
舒適提示:本文並不會詳細分析RocketMQ主從同步的實現細節,如你們對其感興趣,能夠查閱筆者所著的《RocketMQ技術內幕》或查看筆者博文: https://blog.csdn.net/prestig...
接下來帶着上述問題,一塊兒來探究其實現原理。緩存
RocketMQ的主從同步,在默認狀況下RocketMQ會優先選擇從主服務器進行拉取消息,並非一般意義的上的讀寫分離,那何時會從拉取呢?服務器
舒適提示:本節一樣不會詳細整個流程,只會點出其關鍵點,若是想詳細瞭解消息拉取、消息消費等核心流程,建議你們查閱筆者所著的《RocketMQ技術內幕》。
在RocketMQ中判斷是從主拉取,仍是從從拉取的核心代碼以下:
DefaultMessageStore#getMessage架構
long diff = maxOffsetPy - maxPhyOffsetPulling; // @1 long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); // @2 getResult.setSuggestPullingFromSlave(diff > memory); // @3
代碼@1:首先介紹一下幾個局部變量的含義:併發
當前最大的物理偏移量。返回的偏移量爲已存入到操做系統的PageCache中的內容。app
本次消息拉取最大物理偏移量,按照消息順序拉取的基本原則,能夠基本預測下次開始拉取的物理偏移量將大於該值,而且就在其附近。源碼分析
maxOffsetPy與maxPhyOffsetPulling之間的間隔,getMessage一般用於消息消費時,即這個間隔能夠理解爲目前未處理的消息總大小。this
代碼@2:獲取RocketMQ消息存儲在PageCache中的總大小,若是當RocketMQ容量超過該闊值,將會將被置換出內存,若是要訪問不在PageCache中的消息,則須要從磁盤讀取。spa
返回當前系統的總物理內存。參數
設置消息存儲在內存中的閥值,默認爲40。
結合代碼@2這兩個參數的含義,算出RocketMQ消息能映射到內存中最大值爲40% * (機器物理內存)。
代碼@3:設置下次拉起是否從從拉取標記,觸發下次從從服務器拉取的條件爲:當前全部可用消息數據(全部commitlog)文件的大小已經超過了其闊值,默認爲物理內存的40%。
那GetResult的suggestPullingFromSlave屬性在哪裏使用呢?
PullMessageProcessor#processRequest
if (getMessageResult.isSuggestPullingFromSlave()) { // @1 responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) { // @2 case ASYNC_MASTER: case SYNC_MASTER: break; case SLAVE: if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) { response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY); responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); } break; } if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) { // @3 // consume too slow ,redirect to another machine if (getMessageResult.isSuggestPullingFromSlave()) { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly()); } // consume ok else { responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId()); } } else { responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID); }
代碼@1:若是從commitlog文件查找消息時,發現消息堆積太多,默認超過物理內存的40%後,會建議從從服務器讀取。
代碼@2:若是當前服務器的角色爲從服務器:而且slaveReadEnable=true,則忽略代碼@1設置的值,下次拉取切換爲從主拉取。
代碼@3:若是slaveReadEnable=true(從容許讀),而且建議從從服務器讀取,則從消息消費組建議當消息消費緩慢時建議的拉取brokerId,由訂閱組配置屬性whichBrokerWhenConsumeSlowly決定;若是消息消費速度正常,則使用訂閱組建議的brokerId拉取消息進行消費,默認爲主服務器。若是不容許從可讀,則固定使用從主拉取。
舒適提示:請注意broker服務參數slaveReadEnable,與訂閱組配置信息:whichBrokerWhenConsumeSlowly、brokerId的值,在生產環境中,能夠經過updateSubGroup命令動態改變訂閱組的配置信息。
若是訂閱組的配置保持默認值的話,拉取消息請求發送到從服務器後,下一次消息拉取,不管是否開啓slaveReadEnable,下一次拉取,仍是會發往主服務器。
上面的步驟,在消息拉取命令的返回字段中,會將下次建議拉取Broker返回給客戶端,根據其值從指定的broker拉取。
消息拉取實現PullAPIWrapper在處理拉取結果時會將服務端建議的brokerId更新到broker拉取緩存表中。
在發起拉取請求以前,首先根據以下代碼,選擇待拉取消息的Broker。
從上面內容可知,主從同步引入的主要目的就是消息堆積的內容默認超過物理內存的40%,則消息讀取則由從服務器來接管,實現消息的讀寫分離,避免主服務IO抖動嚴重。那問題來了,主服務器宕機後,從服務器接管消息消費後,那消息消費進度存儲在哪裏?當主服務器恢復正常後,消息是從主服務器拉取仍是從從服務器拉取?主服務器如何得知最新的消息消費進度呢?
RocketMQ消息消費進度管理(集羣模式):
集羣模式下消息消費進度存儲文件位於服務端${ROCKETMQ_HOME}/store/config/consumerOffset.json。消息消費者從服務器拉取一批消息後提交到消費組特定的線程池中處理消息,當消息消費成功後會向Broker發送ACK消息,告知消費端已成功消費到哪條消息,Broker收到消息消費進度反饋後,首先存儲在內存中,而後定時持久化到consumeOffset.json文件中。備註:關於消息消費進度管理更多的實現細節,建議查閱筆者所著的《RocketMQ技術內幕》。
咱們先看一下客戶端向服務端反饋消息消費進度時如何選擇Broker。
由於主服務的brokerId爲0,默認狀況下當主服務器存活的時候,優先會選擇主服務器,只有當主服務器宕機的狀況下,纔會選擇從服務器。
既然集羣模式下消息消費進度存儲在Broker端,當主服務器正常時,消息消費進度文件存儲在主服務器,那提出以下兩個問題:
1)消息消費端在主服務器存活的狀況下,會優先向主服務器反饋消息消費進度,那從服務器是如何同步消息消費進度的。
2)當主服務器宕機後則消息消費端會向從服務器反饋消息消費進度,此時消息消費進度如何存儲,當主服務器恢復正常後,主服務器如何得知最新的消息消費進度。
爲了解開上述兩個疑問,咱們優先來看一下Broker服務器在收到提交消息消費進度反饋命令後的處理邏輯:
客戶端定時向Broker端發送更新消息消費進度的請求,其入口爲:RemoteBrokerOffsetStore#updateConsumeOffsetToBroker,該方法中一個很是關鍵的點是:選擇broker的邏輯,以下所示:
若是主服務器存活,則選擇主服務器,若是主服務器宕機,則選擇從服務器。也就是說,無論消息是從主服務器拉取的仍是從從服務器拉取的,提交消息消費進度請求,優先選擇主服務器。服務端就是接收其偏移量,更新到服務端的內存中,而後定時持久化到${ROCKETMQ_HOME}/store/config/consumerOffset.json。
通過上面的分析,咱們來討論一下這個場景:
消息消費者首先從主服務器拉取消息,並向其提交消息消費進度,若是當主服務器宕機後,從服務器會接管消息拉取服務,此時消息消費進度存儲在從服務器,主從服務器的消息消費進度會出現不一致?那當主服務器恢復正常後,二者之間的消息消費進度如何同步?
若是Broker角色爲從服務器,會經過定時任務調用syncAll,從主服務器定時同步topic路由信息、消息消費進度、延遲隊列處理進度、消費組訂閱信息。
那問題來了,若是主服務器啓動後,從服務器立刻從主服務器同步消息消息進度,那豈不是又要從新消費?
其實在絕大部分狀況下,就算從服務從主服務器同步了好久以前的消費進度,只要消息者沒有從新啓動,就不須要從新消費,在這種狀況下,RocketMQ提供了兩種機制來確保不丟失消息消費進度。
第一種,消息消費者在內存中存在最新的消息消費進度,繼續以該進度去服務器拉取消息後,消息處理完後,會定時向Broker服務器反饋消息消費進度,在上面也提到過,在反饋消息消費進度時,會優先選擇主服務器,此時主服務器的消息消費進度就立馬更新了,從服務器此時只需定時同步主服務器的消息消費進度便可。
第二種是,消息消費者在向主服務器拉取消息時,若是是是主服務器,在處理消息拉取時,也會更新消息消費進度。
主服務器在處理消息拉取命令時,會觸發消息消費進度的更新,其代碼入口爲:PullMessageProcessor#processRequest
boolean storeOffsetEnable = brokerAllowSuspend; // @1 storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag; storeOffsetEnable = storeOffsetEnable && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE; // @2 if (storeOffsetEnable) { this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset()); }
代碼@1:首先介紹幾個局部變量的含義:
若是Broker的角色爲主服務器,而且上面兩個變量都爲true,則首先使用commitOffset更新消息消費進度。
看到這裏,主從同步消息消費進度的相關問題,應該就有了答案了。
上述實現原理的講解有點枯燥無味,咱們先來回答以下幾個問題:
一、主,從服務器都在運行過程當中,消息消費者是從主拉取消息仍是從從拉取?
答:默認狀況下,RocketMQ消息消費者從主服務器拉取,當主服務器積壓的消息超過了物理內存的40%,則建議從從服務器拉取。但若是slaveReadEnable爲false,表示從服務器不可讀,從服務器也不會接管消息拉取。
二、當消息消費者向從服務器拉取消息後,會一直從從服務器拉取?
答:不是的。分以下狀況:
1)若是從服務器的slaveReadEnable設置爲false,則下次拉取,從主服務器拉取。
2)若是從服務器容許讀取而且從服務器積壓的消息未超過其物理內存的40%,下次拉取使用的Broker爲訂閱組的brokerId指定的Broker服務器,該值默認爲0,表明主服務器。
3)若是從服務器容許讀取而且從服務器積壓的消息超過了其物理內存的40%,下次拉取使用的Broker爲訂閱組的whichBrokerWhenConsumeSlowly指定的Broker服務器,該值默認爲1,表明從服務器。
三、主從服務消息消費進是如何同步的?
答:消息消費進度的同步時單向的,從服務器開啓一個定時任務,定時從主服務器同步消息消費進度;不管消息消費者是從主服務器拉的消息仍是從從服務器拉取的消息,在向Broker反饋消息消費進度時,優先向主服務器彙報;消息消費者向主服務器拉取消息時,若是消息消費者內存中存在消息消費進度時,主會嘗試跟新消息消費進度。
讀寫分離的正確使用姿式:
一、主從Broker服務器的slaveReadEnable設置爲true。
二、經過updateSubGroup命令更新消息組whichBrokerWhenConsumeSlowly、brokerId,特別是其brokerId不要設置爲0,否則從從服務器拉取一次後,下一次拉取就會從主去拉取。
做者介紹:
丁威,《RocketMQ技術內幕》做者,RocketMQ 社區佈道師,公衆號: 中間件興趣圈 維護者,目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。