RocketMQ 解析

一、RocketMQ網絡部署圖

 

 

  • RocketMQ的幾個關鍵角色和配置
  • Producer

兩種消息發送方式:前端

 

1.同步發送:發送成功後繼續執行代碼邏輯。java

 

2.異步發送:發送後,無論成功失敗執行代碼邏輯。成功後調用回調方法。apache

 

  • Broker

兩種刷盤方式,flushDiskType配置,SYNC_FLUSH,ASYNC_FLUSH。json

 

1.同步刷盤方式:消息寫入磁盤後再返回成功狀態。bash

 

2.異步刷盤方式:消息寫入內存後就返回成功狀態。服務器

 

兩種複製方式,表示消息從Master複製到Slave的方式,brokerRole,ASYNC_MASTER(異步master),SYNC_MASTER(同步master),SLAVE(slave)。網絡

 

1.同步複製方式:等Master和Slave都寫入成功後才返回寫入成功。app

 

2.異步複製方式:Master寫入成功後就返回寫入成功。負載均衡

 

  • Consumer

三種消費方式 :dom

 

1.Push(服務端主動推送消息),RocketMQ服務器收到消息後自動調用消費者函數來處理消息,自動維護Offset。支持兩種消息模式,Clustering模式,同一個ConsumerGroup的每一個Consumer消費訂閱消息的一部份內容,broadcasting模式,同一個ConsumerGroup的每一個Consumer都消費所訂閱的的所有消息。

 

2.Pull (客戶端主動拉取消息),Client端循環從Server端拉取消息。須要客戶端本身維護Offset。

 

3.長輪詢消費方式,Client發送消息請求,Server端接受請求,若是發現Server隊列裏沒有新消息,Server端不當即返回,而是持有這個請求一段時間(經過設置超時時間來實現),在這段時間內輪詢Server隊列內是否有新的消息,若是有新消息,就利用現有的鏈接返回消息給消費者;若是這段時間內沒有新消息進入隊列,則返回空。

 

深刻了解了上面三個角色,咱們來總結下雙master,雙slave模式下的整個發送,消費流程。生產者發送消息,消息會負載均衡到兩個Master上,若是master的刷盤方式是同步刷盤方式,複製方式是同步複製方式,須要消息寫到master和slave的硬盤上後,服務器纔會放回發送消息成功。消息存儲到服務器後,消費者根據本身的消費方式來消費消息,若是是Push,消息到達服務器後立刻推送消息到消費者,若是是pull,消費拉取消息後再消費。

 

 

1.1 RocketMQ網絡部署特色:

  • NameServer是一個幾乎無狀態節點,可集羣部署,節點之間無任何信息同步。相對來講,nameserver的穩定性很是高。緣由有二:

1)nameserver互相獨立,彼此沒有通訊關係,單臺nameserver掛掉,不影響其餘nameserver,即便所有掛掉,也不影響業務系統使用。無狀態
2)nameserver不會有頻繁的讀寫,因此性能開銷很是小,穩定性很高。
  • Broker部署相對複雜,Broker四種集羣方式

Broker分爲Master與Slave(Slave不可寫,但可讀,相似於MySQL的主備方式),一個Master能夠對應多個Slave,可是一個Slave只能對應一個Master,Master與Slave的對應關係經過指定相同的BrokerName,不一樣的BrokerId來定義,BrokerId爲0表示Master,非0表示Slave。Master也能夠部署多個。每一個Broker與NameServer集羣中的全部節點創建長鏈接,定時註冊Topic信息到全部 NameServer。

1)單個master:這是一種風險比較大的集羣方式,由於一旦Borker重啓或宕機期間,將會致使這個服務不可用,所以是不建議線上環境去使用的。
2)多個master:
一個集羣所有都是Master,沒有Slave。

  優勢:配置簡單,單個Master宕機或者是重啓維護對應用沒有什麼影響的,在磁盤配置爲RAID10時,即便機器宕機不可恢復的狀況下,消息也不會丟失(異步刷盤丟失少許消息,同步刷盤則是一條都不會丟失),性能最高

  缺點:當單個Broker宕機期間,這臺機器上未被消費的消息在機器恢復以前不可訂閱,消息的實時性會受到影響

3)多master多salve異步複製,每一個Master配置一個Slave,有多對的Master-Slave,HA採用的是異步複製方式,主備有短暫的消息延遲,毫秒級別的(Master收到消息以後馬上嚮應用返回成功標識,同時向Slave寫入消息)。

  優勢:即便是磁盤損壞了,消息丟失的很是少,且消息實時性不會受到影響,由於Master宕機以後,消費者仍然能夠從Slave消費,此過程對應用透明,不須要人工干預,性能同多個Master模式機會同樣。

  缺點:Master宕機,磁盤損壞的狀況下,會丟失少許的消息

4)多master多salve同步雙寫,每一個Master配置一個Slave,有多對的Master-Slave,HA採用的是同步雙寫模式,主備都寫成功,纔會嚮應用返回成功。

  優勢:數據與服務都無單點,Master宕機的狀況下,消息無延遲,服務可用性與數據可用性都很是高

  缺點:性能比異步複製模式略低,大約低10%左右,發送單個Master的RT會略高,目前主機宕機後,Slave不能自動切換爲主機,後續會支持自動切換功能。

  • Producer與NameServer集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master創建長鏈接,且定時向Master發送心跳。Producer徹底無狀態,可集羣部署。
  • Consumer與NameServer集羣中的其中一個節點(隨機選擇)創建長鏈接,按期從Name Server取Topic路由信息,並向提供Topic服務的Master、Slave創建長鏈接,且定時向Master、Slave發送心跳。Consumer既能夠從Master訂閱消息,也能夠從Slave訂閱消息,訂閱規則由Broker配置決定。

1.2 安裝及使用步驟:

一、下載並安裝
  根目錄執行 mvn clean package -Dmaven.test.skip=true或mvn -Preplease-all -DskipTests clean install -U
  並在distribution/target/apache-rocketmq 目錄下找到打好的包.解壓至指定目錄
 
二、rocketmq的啓動
  啓動namesrv服務:nohup sh bin/mqnamesrv &  查看日誌:tail -f ~/logs/rocketmqlogs/namesrv.log
  啓動broker服務:nohup sh bin/mqbroker &   查看日誌:tail -f ~/logs/rocketmqlogs/broker.log
  nohup sh mqbroker -n localhost:9876 autoCreateTopicEnable=true & 默認自動建立topic,不然報錯no route info of this topic
 
三、rocketmq服務關閉
  關閉namesrv服務:sh bin/mqshutdown namesrv
  關閉broker服務 :sh bin/mqshutdown broker

 2.  RocketMQ配置文件 - broker

衆所周知,RocketMQ有多種集羣部署方式,它們的配置文件也是分開的,在安裝包conf目錄下有官方自帶配置文件模版自上而下依次是:兩主兩從異步,兩主兩從同步,兩主

broker.conf,這個是至關於配置的簡單模板,另外其餘的配置也能夠到上面說的2m-2s那些目錄中去參考一下,這個broker.conf是不能直接使用的,由於broker啓動的時候用-c參數傳入配置文件,這裏只認識*.properties的配置文件,因此這裏應該分別執行:

cp broker.conf broker1.properties
cp broker1.properties broker2.properties

以下:

說明:

  • 2m-noslave: 多Master模式
  • 2m-2s-sync: 多Master多Slave模式,同步雙寫
  • 2m-2s-async:多Master多Slave模式,異步複製

其中namesrvAddr:主機地址,brokerClusterName:集羣名稱,brokerName :分片名稱 ,deleteWhen=04:刪除文件時間點,默認是凌晨4點
,fileReservedTime=120:文件保留時間,默認48小時,brokerId:分片id編號 ;brokerRole分片角色。

注意:其中主從之間的分片名稱相同。主從區分是brokerId 主 0,從 1。brokerRole 主MASTER從SLAVE。

 

  • RocketMQ默認提供的配置文件都是最基本的,不少配置都是默認值,生產環境中須要根據實際狀況進行修改。
  • #所屬集羣名字 brokerClusterName=rocketmq-cluster
  • #broker名字,注意此處不一樣的配置文件填寫的不同 brokerName=broker-a|broker-b
  • #0表示Master,>0表示Slave brokerId=0 #nameServer地址,分號分割 namesrvAddr=192.168.1.101:9876;192.168.1.102:9876
  • #在發送消息時,自動建立服務器不存在的topic,默認建立的隊列數 defaultTopicQueueNums=4
  • #是否容許 Broker 自動建立Topic,建議線下開啓,線上關閉 autoCreateTopicEnable=true
  • #是否容許 Broker 自動建立訂閱組,建議線下開啓,線上關閉 autoCreateSubscriptionGroup=true
  • #Broker 對外服務的監聽端口 listenPort=10911
  • #刪除文件時間點,默認凌晨 4點 deleteWhen=04
  • #文件保留時間,默認 48 小時 fileReservedTime=120
  • #commitLog每一個文件的大小默認1G mapedFileSizeCommitLog=1073741824
  • #ConsumeQueue每一個文件默認存30W條,根據業務狀況調整 mapedFileSizeConsumeQueue=300000
  • #destroyMapedFileIntervalForcibly=120000
  • #redeleteHangedFileInterval=120000
  • #檢測物理文件磁盤空間 diskMaxUsedSpaceRatio=88
  • #存儲路徑 storePathRootDir=/usr/local/alibaba-rocketmq/store
  • #commitLog 存儲路徑 storePathCommitLog=/usr/local/alibaba-rocketmq/store/commitlog
  • #消費隊列存儲路徑存儲路徑 storePathConsumeQueue=/usr/local/alibaba-rocketmq/store/consumequeue
  • #消息索引存儲路徑 storePathIndex=/usr/local/alibaba-rocketmq/store/index
  • #checkpoint 文件存儲路徑 storeCheckpoint=/usr/local/alibaba-rocketmq/store/checkpoint
  • #abort 文件存儲路徑 abortFile=/usr/local/alibaba-rocketmq/store/abort
  • #限制的消息大小 maxMessageSize=65536
  • #flushCommitLogLeastPages=4
  • #flushConsumeQueueLeastPages=2
  • #flushCommitLogThoroughInterval=10000
  • #flushConsumeQueueThoroughInterval=60000
  • #Broker 的角色

    - ASYNC_MASTER 異步複製Master

    - SYNC_MASTER 同步雙寫Master

    - SLAVE brokerRole=ASYNC_MASTER

  • 刷盤方式 #- ASYNC_FLUSH
  • 異步刷盤 #- SYNC_FLUSH
  • 同步刷盤 flushDiskType=ASYNC_FLUSH
  • #checkTransactionMessageEnable=false
  • #發消息線程池數量 #sendMessageThreadPoolNums=128
  • #拉消息線程池數量 #pullMessageThreadPoolNums=128

 

  • Broker向namesrv註冊
1.      獲取namesrv的地址列表(是亂序的)

2.      遍歷向每一個namesrv註冊topic的配置信息topicconfig
 

  •  Topic在broker文件上的存儲json格式

  Namesrv接收Broker註冊的topic信息, namesrv只存內存,可是broker有任務定時推送

  1.   接收數據向RouteInfoManager註冊。

    Broker初始化加載本地配置,配置信息是以json格式存儲在本地, rocketmq強依賴fastjson做轉換, RocketMq經過ConfigMananger來管理配置加載以及持久化

 

複製代碼
 
1.      加載topic配置${user.home}/store/config/topics.json
{
"dataVersion":{
           "counter":2,
           "timestatmp":1393729865073
},
"topicConfigTable":{
           //根據consumer的group生成的重試topic
           "%RETRY% group_name":{
                    "perm":6,
                    "readQueueNums":1,
                    "topicFilterType":"SINGLE_TAG",
                    "topicName":"%RETRY%group_name",
                    "writeQueueNums":1
           },

        "TopicTest":{
            "perm":6, // 100讀權限 , 10寫權限 6是110讀寫權限 "readQueueNums":8, "topicFilterType":"SINGLE_TAG", "topicName":"TopicTest", "writeQueueNums":8 } } }

2.加載消費進度偏移量  ${user.home}/store/config/consumerOffset.json
{

"offsetTable":{
       //重試隊列消費進度爲零
      "%RETRY% group_name@group_name":{0:0}, 
      //分組名group_name消費topic爲TopicTest的進度爲:
      // 隊列queue=0  消費進度23
      // 隊列 queue=2  消費進度爲22  等等…

       "TopicTest@ group_name":{0:23,1:23,2:22,3:22,4:21,5:18,6:18,7:18}
}
}
 
3. 加載消費者訂閱關係 ${user.home}/store/config/subscriptionGroup.json
{

         "dataVersion":{
                   "counter":1,
                   "timestatmp":1393641744664
         },
         "group_name":{
                            "brokerId":0,  //0表明這臺broker機器爲master,若要設爲slave值大於0
                            "consumeBroadcastEnable":true,
                            "consumeEnable":true,
                            "consumeFromMinEnable":true,
                            "groupName":"group_name",
                            "retryMaxTimes":5,
                            "retryQueueNums":1,
                            "whichBrokerWhenConsumeSlowly":1
                   }
         }
}
 
複製代碼

 

 2.1 broker的消息存儲

 存儲特色:

如上圖所示:
(1)消息主體以及元數據都存儲在**CommitLog**當中
(2)Consume Queue至關於kafka中的partition,是一個邏輯隊列,存儲了這個Queue在CommiLog中的起始offset,log大小和MessageTag的hashCode。 (3)每次讀取消息隊列先讀取consumerQueue,而後再經過consumerQueue去commitLog中拿到消息主體。

Rocketmq的消息的存儲是由consume queue和 commitLog 配合完成的。

ConsumeQueue是定長的結構,每1條記錄固定的20個字節。

Consumer消費消息的時候,要讀2次:先讀ConsumeQueue獲得offset,再讀CommitLog獲得消息內容。

2.1.1  CommitLog文件(物理隊列

CommitLog是用於存儲真實的物理消息的結構,保存消息元數據,全部消息到達Broker後都會保存到commitLog文件,這裏須要強調的是全部topic的消息都會統一保存在commitLog中。
舉個例子:當前集羣有TopicA, TopicB,
這兩個Toipc的消息會按照消息到達的前後順序保存到同一個commitLog中,而不是每一個Topic有本身獨立的commitLog
onsumeQueue是邏輯隊列,僅僅存儲了CommitLog的位移而已,真實的存儲都在本結構中。
首先這裏會使用CommitLog.this.topicQueueTable.put(key, queueOffset),其中的key是 topic-queueId, queueOffset是當前這個key中的消息數,每增長一個消息增長一(不會自減);
這裏queueOffset的用途以下:每次用戶請求putMessage的時候,將queueOffset返回給客戶端使用,這裏的queueoffset表示邏輯上的隊列偏移。

消息存放物理文件,每臺broker上的commitLog被本機器全部queue共享不作區分
  • commitlog文件的存儲地址:$HOME\store\commitlog\${fileName}
  • 一個消息存儲單元長度是不定的,順序寫可是隨機讀
  • 每一個commitLog文件的默認大小爲 1G =1024*1024*1024,滿1G以後會自動新建CommitLog文件作保存數據用
  • commitlog的文件名fileName,名字長度爲20位,左邊補零,剩餘爲起始偏移量;好比00000000000000000000表明了第一個文件,起始偏移量爲0,文件大小爲1G=1073741824;當這個文件滿了,

    第二個文件名字爲00000000001073741824,起始偏移量爲1073741824,以此類推,第三個文件名字爲00000000002147483648,起始偏移量爲2147483648消息存儲的時候會順序寫入文件,

    當文件滿了,寫入下一個文件。

 

  CommitLog的清理機制:

 

  • 按時間清理,rocketmq默認會清理3天前的commitLog文件;
  • 按磁盤水位清理:當磁盤使用量到達磁盤容量75%,開始清理最老的commitLog文件。

 

 
     

1)、CommitLog 文件生成規則

偏移量:每一個 CommitLog 文件的大小爲 1G,通常狀況下第一個 CommitLog 的起始偏移量爲 0,第二個 CommitLog 的起始偏移量爲 1073741824 (1G = 1073741824byte)。

2)、怎麼知道消息存儲在哪一個 CommitLog 文件上?

假設 1073742827 爲物理偏移量(物理偏移量也即全局偏移量),則其對應的相對偏移量爲 1003(1003 = 1073742827 - 1073741824),而且該偏移量位於第二個 CommitLog。

index 和 ComsumerQueue 中都有消息對應的物理偏移量,經過物理偏移量就能夠計算出該消息位於哪一個 CommitLog 文件上。

複製代碼
文件地址:${user.home} \store\${commitlog}\${fileName}

消息存儲結構:
  flag  這個標誌值rocketmq不作處理,只存儲後透傳
  QUEUEOFFSET這個值是個自增值不是真正的consume queue的偏移量,能夠表明這個隊列中消息的個數,要經過這個值查找到consume queue中數據,QUEUEOFFSET * 20纔是偏移地址
  PHYSICALOFFSET 表明消息在commitLog中的物理起始地址偏移量
  SYSFLAG消息標誌,指明消息是事物事物狀態等等消息特徵
  BORNTIMESTAMP 消息產生端(producer)的時間戳
  BORNHOST     消息產生端(producer)地址(address:port)
  STORETIMESTAMP 消息在broker存儲時間
  STOREHOSTADDRESS 消息存儲到broker的地址(address:port)
  RECONSUMETIMES消息被某個訂閱組從新消費了幾回(訂閱組之間獨立計數),由於重試消息發送到了topic名字爲%retry%groupName的隊列queueId=0的隊列中去了
  Prepared Transaction Offset 表示是prepared狀態的事物消息
複製代碼

2.1.2 ConsumeQueue文件組織:

ConsumerQueue至關於CommitLog的索引文件,消費者消費時會先從ConsumerQueue中查找消息的在commitLog中的offset,再去CommitLog中找元數據。

若是某個消息只在CommitLog中有數據,沒在ConsumerQueue中, 則消費者沒法消費,Rocktet的事務消息就是這個原理

Consumequeue類對應的是每一個topic和queuId下面的全部文件,至關於字典的目錄用來指定消息在消息的真正的物理文件commitLog上的位置

每條數據的結構以下圖所示:

消息的起始物理偏移量physical offset(long 8字節)+消息大小size(int 4字節)+tagsCode(long 8字節)。

  • 每一個topic下的每一個queue都有一個對應的consumequeue文件。
  • 文件默認存儲路徑:${user.home} \store\consumequeue\${topicName}\${queueId}\${fileName}
  • 每一個文件由30W條數據組成,每條數據的大小爲20個字節,從而每一個文件的默認大小爲600萬個字節(consume queue中存儲單元是一個20字節定長的數據)是順序寫順序讀
  • commitLogOffset是指這條消息在commitLog文件實際偏移量
  • size就是指消息大小
  • 消息tag的哈希值

 

ConsumeQueue幾個重要的字段
  private final String topic; private final int queueId;//隊列id private final ByteBuffer byteBufferIndex;// 寫索引時用到的ByteBuffer private long maxPhysicOffset = -1;// 最後一個消息對應的物理Offset

 

每一個cosumequeue文件的名稱fileName,名字長度爲20位,左邊補零,剩餘爲起始偏移量;

好比00000000000000000000表明了第一個文件,起始偏移量爲0,文件大小爲600W,

當第一個文件滿以後建立的第二個文件的名字爲00000000000006000000,起始偏移量爲6000000,以此類推,

第三個文件名字爲00000000000012000000,起始偏移量爲12000000,消息存儲的時候會順序寫入文件,當文件滿了,寫入下一個文件。

  • topic queueId來組織的:好比TopicA配了讀寫隊列0、1,那麼TopicA和Queue=0組成一個ConsumeQueue, TopicA和Queue=1組成一個另外一個ConsumeQueue.
  • 按消費端group分組重試隊列,若是消費端消費失敗,發送到retry消費隊列中
  • 按消費端group分組死信隊列,若是消費端重試超過指定次數,發送死信隊列
  • 每一個ConsumeQueue能夠由多個文件組成無限隊列被MapedFileQueue對象管理

 2.1.3  MapedFile 是PageCache文件封裝

操做物理文件在內存中的映射以及將內存數據持久化到物理文件中,代碼中寫死了要求os系統的頁大小爲4k, 消息刷盤根據參數(commitLog默認至少刷4頁, consumeQueue默認至少刷2頁)才刷 

如下io對象構建了物理文件映射內存的對象
FileChannel fileChannel = new RandomAccessFile(file,「rw」).getChannel();
MappedByteBuffer mappedByteBuffer=fileChannel.map(READE_WRITE,0,fileSize);
構建mapedFile對象須要兩個參數
fileSize: 映射的物理文件的大小
commitLog每一個文件的大小默認1G =1024*1024*1024
ConsumeQueue每一個文件默認存30W條 = 300000 *CQStoreUnitSize(每條大小)
filename: filename文件名稱但不只僅是名稱還表示文件記錄的初始偏移量, 文件名實際上是個long類型的值

2.1.4  MapedFileQueue 存儲隊列,數據定時刪除,無限增加。

隊列有多個文件(MapedFile)組成,由集合對象List表示升序排列,前面講到文件名便是消息在此文件的中初始偏移量,排好序後組成了一個連續的消息隊     

 當消息到達broker時,須要獲取最新的MapedFile寫入數據,調用MapedFileQueue的getLastMapedFile獲取,此函數若是集合中一個也沒有建立一個,若是最後一個寫滿了也建立一個新的。
 MapedFileQueue在獲取getLastMapedFile時,若是須要建立新的MapedFile會計算出下一個MapedFile文件地址,經過預分配服務AllocateMapedFileService異步預建立下一個MapedFile文件,這樣下次建立新文件請求就不要等待,由於建立文件特別是一個1G的文件仍是有點耗時的,
 getMinOffset獲取隊列消息最少偏移量,即第一個文件的文件起始偏移量
 getMaxOffset獲取隊列目前寫到位置偏移量
 getCommitWhere刷盤刷到哪裏了

 

2.1.5 消息存儲及消費過程

1)消息發送流程:

  • Broker啓動時,向NameServer註冊信息
  • 客戶端調用producer發送消息時,會先從NameServer獲取該topic的路由信息。消息頭code爲GET_ROUTEINFO_BY_TOPIC
  • 從NameServer返回的路由信息,包括topic包含的隊列列表和broker列表
  • Producer端根據查詢策略,選出其中一個隊列,用於後續存儲消息
  • 每條消息會生成一個惟一id,添加到消息的屬性中。屬性的key爲UNIQ_KEY
  • 對消息作一些特殊處理,好比:超過4M會對消息進行壓縮
  • producer向Broker發送rpc請求,將消息保存到broker端。消息頭的code爲SEND_MESSAGE或SEND_MESSAGE_V2(配置文件設置了特殊標誌)

消息存儲流程

  • Broker端收到消息後,將消息原始信息保存在CommitLog文件對應的MappedFile中,而後異步刷新到磁盤
  • ReputMessageServie線程異步的將CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
  • ConsumerQueue和IndexFile只是原始文件的索引信息

1)消息消費過程:

如今咱們再來看 Broker 服務器端。首先咱們應該知道,消息往 Broker 存儲就是在向 CommitLog 消息文件中寫入數據的一個過程。

在 Broker 啓動過程當中,其會啓動一個叫作 ReputMessageService 的服務,這個服務每隔 1 秒會檢查一下這個 CommitLog 是否有新的數據寫入。

ReputMessageService 自身維護了一個偏移量 reputFromOffset,用以對比和 CommitLog 文件中的消息總偏移量的差距。

當這兩個偏移量不一樣的時候,就表明有新的消息到來了,在有新的消息到來以後,doReput() 函數會取出新到來的全部消息,每一條消息都會封裝爲一個 DispatchRequest 請求,

進而將這條請求分發給不一樣的請求消費者,咱們在這篇文章中只會關注利用消息建立消費隊列的服務 CommitLogDispatcherBuildConsumeQueue,

CommitLogDispatcherBuildConsumeQueue 服務會根據這條請求按照不一樣的隊列 ID 建立不一樣的消費隊列文件,並在內存中維護一份消費隊列列表。

而後將 DispatchRequest 請求中這條消息的消息偏移量、消息大小以及消息在發送時候附帶的標籤的 Hash 值寫入到相應的消費隊列文件中去。

3)客戶端如何記錄本身所消費的隊列消費到哪裏了呢?

答案就是:消費隊列偏移量

集羣模式:因爲每一個客戶端所消費的消息隊列不一樣,因此每一個消息隊列已經消費到哪裏的消費偏移量是記錄在 Broker 服務器端的。

廣播模式:因爲每一個客戶端分配消費這個話題的全部消息隊列,因此每一個消息隊列已經消費到哪裏的消費偏移量是記錄在客戶端本地的。

(1) 集羣模式

在集羣模式下,消費者客戶端在內存中維護了一個 offsetTable 表,一樣在 Broker 服務器端也維護了一個偏移量表,在消費者客戶端,RebalanceService 服務會定時地 (默認 20 秒) 從 Broker 服務器獲取當前客戶端所須要消費的消息隊列,並與當前消費者客戶端的消費隊列進行對比,看是否有變化。對於每一個消費隊列,會從 Broker 服務器查詢這個隊列當前的消費偏移量。而後根據這幾個消費隊列,建立對應的拉取請求 PullRequest 準備從 Broker 服務器拉取消息,當從 Broker 服務器拉取下來消息之後,只有當用戶成功消費的時候,纔會更新本地的偏移量表。本地的偏移量表再經過定時服務每隔 5 秒同步到 Broker 服務器端,而維護在 Broker 服務器端的偏移量表也會每隔 5 秒鐘序列化到磁盤中(文件地址:${user.home} /store/config/consume/consumerOffset.json)

保存的格式以下所示:

broker_offset_table

 

(2) 廣播模式

對於廣播模式而言,每一個消費隊列的偏移量確定不能存儲在 Broker 服務器端,由於多個消費者對於同一個隊列的消費可能不一致,偏移量會互相覆蓋掉。所以,在廣播模式下,每一個客戶端的消費偏移量是存儲在本地的,而後每隔 5 秒將內存中的 offsetTable 持久化到磁盤中。當首次從服務器獲取可消費隊列的時候,偏移量不像集羣模式下是從 Broker 服務器讀取的,而是直接從本地文件中讀取

這裏提一下,在廣播模式下,消息隊列的偏移量默認放在用戶目錄下的 .rocketmq_offsets 目錄下

存儲格式以下:

broadcasting_offset_table_persist

 

3. load、recover

Broker啓動的時候須要加載一系列的配置,啓動一系列的任務,主要分佈在BrokerController 的initialize()和start()方法中

複製代碼
1.加載topic配置
2.加載消費進度consumer offset
3.加載消費者訂閱關係consumer subscription
4.加載本地消息messageStore.load()
  Load 定時進度,Load commit log,commitLog其實調用存儲消費隊列mapedFileQueue.load()方法來加載的。
  遍歷出${user.home} \store\${commitlog}目錄下全部commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每一個文件構建一個MapedFile對象, 在MapedFileQueue中用集合list把這些MapedFile文件組成一個邏輯上連續的隊列

  Load consume Queue
  遍歷${user.home} \store\consumequeue下的全部文件夾(每一個topic就是一個文件夾)
  遍歷${user.home} \store\consumequeue\${topic}下的全部文件夾(每一個queueId就是一個文件夾)
  遍歷${user.home} \store\consumequeue\${topic}\${queueId}下全部文件,根據topic, queueId, 文件來構建ConsueQueue對象
  DefaultMessageStore中存儲結構Map<topic,Map<queueId, CosnueQueue>>
  每一個Consumequeue利用MapedFileQueue把mapedFile組成一個邏輯上連續的隊列

  加載事物模塊
  加載存儲檢查點
  加載${user.home} \store\checkpoint 這個文件存儲了3個long類型的值來記錄存儲模型最終一致的時間點,這個3個long的值爲
  physicMsgTimestamp爲commitLog最後刷盤的時間
  logicMsgTimestamp爲consumeQueue最終刷盤的時間
  indexMsgTimestamp爲索引最終刷盤時間
  checkpoint做用是當異常恢復時須要根據checkpoint點來恢復消息

  加載索引服務indexService
  recover嘗試數據恢復
  判斷是不是正常恢復,系統啓動的啓動存儲服務(DefaultMessageStore)的時候會建立一個臨時文件abort, 當系統正常關閉的時候會把這個文件刪掉,這個相似在Linux下打開vi編輯器生成那個臨時文件,全部當這個abort文件存在,系統認爲是異常恢復 
複製代碼

 

複製代碼
1)  先按照正常流程恢復ConsumeQueue

什麼是恢復ConsumeQueue, 前面不是有步驟load了ConsumeQueue嗎,爲何還要恢復?

前面load步驟建立了MapedFile對象創建了文件的內存映射,可是數據是否正確,如今文件寫到哪了(wrotePosition),
Flush到了什麼位置(committedPosition)?恢復數據來幫我解決這些問題。 每一個ConsumeQueue的mapedFiles集合中,從倒數第三個文件開始恢復(爲何只恢復倒數三個文件,看消息消費程度),
由於consumequeue的存儲單元是20字節的定長數據,因此是依次分別取了 Offset long類型存儲了commitLog的數據偏移量 Size int類型存儲了在commitLog上消息大小 tagcode tag的哈希值 目前rocketmq判斷存儲的consumequeue數據是否有效的方式爲判斷offset>= 0 && size > 0 若是數據有效讀取下20個字節判斷是否有效 若是數據無效跳出循環,記錄此時有效數據的偏移量processOffset 若是讀到文件尾,讀取下一個文件 proccessOffset是有效數據的偏移量,獲取這個值的做用什麼? (1)proccessOffset後面的數據屬於髒數據,後面的文件要刪除掉 (2)設置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值爲 proccessOffset%mapedFileSize 2正常恢復commitLog文件 步驟跟流程恢復Consume Queue 判斷消息有效, 根據消息的存儲格式讀取消息到DispatchRequest對象,獲取消息大小值msgSize   大於 0 正常數據   等於-1 文件讀取錯誤 恢復結束   等於0 讀到文件末尾 3) 異常數據恢復,OSCRASH或者JVM CRASH或者機器掉電 當${user.home}\store\abort文件存在,表明異常恢復 讀取${user.home} \store\checkpoint獲取最終一致的時間點 判斷最終一致的點所在的文件是哪一個 從最新的mapedFile開始,獲取存儲的一條消息在broker的生成時間,大於checkpoint時間點的放棄找前一個文件,小於等於checkpoint時間點的說明checkpoint
在此mapedfile文件中 從checkpoint所在mapedFile開始恢復數據,它的總體過程跟正常恢復commitlog相似,最重要的區別在於 (1)讀取消息後派送到分發消息服務DispatchMessageService中,來重建ConsumeQueue以及索引 (2)根據恢復的物理offset,清除ConsumeQueue多餘的數據 4)恢復TopicQueueTable=Map<topic-queueid,offset> (1)恢復寫入消息時,消費記錄隊列的offset (2)恢復每一個隊列的最小offset 初始化通訊層 初始化線程池 註冊broker端處理器用來接收client請求後選擇處理器處理 啓動天天凌晨00:00:00統計消費量任務 啓動定時刷消費進度任務 啓動掃描數據被刪除了的topic,offset記錄也對應刪除任務 若是namesrv地址不是指定的,而是從靜態服務器取的,啓動定時向靜態服務器獲取namesrv地址的任務 若是broker是master,啓動任務打印slave落後master沒有同步的bytes 若是broker是slave,啓動任務定時到mastser同步配置信息
複製代碼

 

3. master slave

在broker啓動的時候BrokerController若是是slave,配置了master地址更新,沒有配置全部broker會想namesrv註冊,從namesrv獲取haServerAddr,而後更新到HAClient

當HAClient的MasterAddress不爲空的時候(由於broker  master和slave都構建了HAClient)會主動鏈接master獲取SocketChannel Master監聽Slave請求的端口,默認爲服務端口+1

接收slave上傳的offset long類型 int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8) 

//沒有理解意圖

 long readOffset =this.byteBufferRead.getLong(pos - 8);  this.processPostion = pos;

主從複製從哪裏開始複製:若是請求時0 ,從最後一個文件開始複製

Slave啓動的時候brokerController開啓定時任務定時拷貝master的配置信息

複製代碼
SlaveSynchronize類表明slave從master同步信息(非消息)

         syncTopicConfig       同步topic的配置信息

         syncConsumerOffset       同步消費進度

         syncDelayOffset                同步定時進度

         syncSubcriptionGroupConfig  同步訂閱組配7F6E 
複製代碼

HaService類實現了HA服務,負責同步雙寫,異步複製功能, 這個類master和slave的broker都會實例化,

Master經過AcceptSocketService監聽slave的鏈接,每一個masterslave鏈接都會構建一個HAConnection對象搭建他們之間的橋樑,對於一個master多slave部署結構的會有多個HAConnection實例,

Master構建HAConnection時會構建向slave寫入數據服務線程對象WriteSocketService對象和讀取Slave反饋服務線程對象ReadSocketService

WriteSocketService

向slave同步commitLog數據線程,

slaveRequestOffset是每次slave同步完數據都會向master發送一個ack表示下次同步的數據的offset。

若是slave是第一次啓動的話slaveRequestOffset=0, master會從最近那個commitLog文件開始同步。(若是要把master上的全部commitLog文件同步到slave的話, 把masterOffset值賦爲minOffset)

向socket寫入同步數據: 傳輸數據協議<Phy Offset> <Body Size> <Body Data>

ReadSocketService:

4 ReadSocketService

複製代碼
   讀取slave經過HAClient向master返回同步commitLog的物理偏移量phyOffset值

         通知前端線程,若是是同步複製的話通知是否複製成功

 

Slave 經過HAClient創建與master的鏈接, 

來定時彙報slave最大物理offset,默認5秒彙報一次也表明了跟master之間的心跳檢測

讀取master向slave寫入commitlog的數據, master向slave寫入數據的格式是
複製代碼

複製代碼
Slave初始化DefaultMessageStore時候會構建ReputMessageService服務線程並在啓動存儲服務的start方法中被啓動

ReputMessageService的做用是slave從物理隊列(由commitlog文件構成的MapedFileQueue)加載數據,並分發到各個邏輯隊列

HA同步複製, 當msg寫入master的commitlog文件後,判斷maser的角色若是是同步雙寫SYNC_MASTER, 等待master同步到slave在返回結果
複製代碼

5 HA異步複製

6.索引服務

1索引結構

IndexFile 存儲具體消息索引的文件,文件的內容結構如圖:

 

索引文件由索引文件頭IndexHeader, 槽位Slot和消息的索引內容三部分構成

 

IndexHeader:索引文件頭信息40個字節的數據組成

 

複製代碼
 beginTimestamp    8位long類型,索引文件構建第一個索引的消息落在broker的時間

endTimestamp         8位long類型,索引文件構建最後一個索引消息落broker時間

beginPhyOffset         8位long類型,索引文件構建第一個索引的消息commitLog偏移量

endPhyOffset            8位long類型,索引文件構建最後一個索引消息commitLog偏移量

hashSlotCount    4位int類型,構建索引佔用的槽位數(這個值貌似沒有具體做用)

indexCount                4位int類型,索引文件中構建的索引個數
複製代碼

 

槽位slot, 默認每一個文件配置的slot個數爲500萬個,每一個slot是4位的int類型數據

計算消息的對應的slotPos=Math.abs(keyHash)%hashSlotNum

消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE

Slot存儲的值爲消息個數索引

 

消息的索引內容是20位定長內容的數據

       

複製代碼
 4位int值, 存儲的是key的hash值

         8位long值     存儲的是消息在commitlog的物理偏移量phyOffset

         4位int值        存儲了當前消息跟索引文件中第一個消息在broker落地的時間差

         4位int值        若是存在hash衝突,存儲的是上一個消息的索引地址
複製代碼

 

7. 索引服務IndexService線程

複製代碼
1.      索引配置:hashSlotNum哈希槽位個數、indexNum存儲索引的最大個數、storePath索引文件indexFile存儲的路徑

2.      Load broker啓動的時候加載本地IndexFile,

若是是異常啓動刪除以後storeCheckPoint文件,由於commitLog根據storeCheckPoint會重建以後的索引文件,

3.      Run方法,任務從阻塞隊列中獲取請求構建索引

4.      queryOffset 根據topic key 時間跨度來查詢消息

倒敘遍歷全部索引文件

每個indexfile存儲了第一個消息和最後一個消息的存儲時間,根據傳入時間範圍來判斷索引是否落在此索引文件
複製代碼

 

8. 構建索引服務

複製代碼
分發消息索引服務將消息位置分發到ConsumeQueue中後,加入IndexService的LinkedBlockingQueue隊列中,IndexService經過任務向隊列中獲取請求來構建索引

剔除commitType或者rollbackType消息,由於這兩種消息都有對應的preparedType的消息

構建索引key(topic + "#" + key)

根據key的hashcode計算槽位,即跟槽位最大值取餘數

計算槽位在indexfile的具體偏移量位置

根據槽位偏移量獲取存儲的上一個索引

計算消息跟文件頭存儲開始時間的時間差

根據消息頭記錄的存儲消息個數計算消息索引存儲的集體偏移量位置

寫入真正的索引,內容參考上面索引內容格式

將槽位中的更新爲此消息索引

更新索引頭文件信息
複製代碼

 

 

 

9.  Broker與client(comsumer ,producer)之間的心跳

複製代碼
一:Broker接收client心跳ClientManageProcessor處理client的心跳請求

1.      構建ClientChannelInfo對象

1)  持有channel對象,表示與客戶端的鏈接通道

2)  ClientID表示客戶端

…..

2.      每次心跳會更新ClientChannelInfo的時間戳,來表示client還活着

3.      註冊或者更新consumer的訂閱關係(是以group爲單位來組織的, group下可能有多個訂閱關係)

4.      註冊producer,其實就是發送producer的group(這個在事物消息中才有點做用)

二:ClientHouseKeepingService線程定時清除不活動的鏈接

1)  ProducerManager.scanNotActiveChannel    默認兩分鐘producer沒有發送心跳清除

2)  ConsumerManager.scanNotActiveChannel   默認兩份中Consumer沒有發送心跳清除
複製代碼

 

10. Broker與namesrv之間的心跳

複製代碼
1)  namesrv接收borker心跳DefaultRequestProcessor的REGISTER_BROKE事件處理,

(1)      註冊broker的topic信息

(2)      構建或者更新BrokerLiveInfo的時間戳

NamesrvController初始化時啓動線程定時調用RouteInfoManger的scanNotActiveBroker方法來定時不活動的broker(默認兩分鐘沒有向namesrv發送心跳更新時間戳的) 
複製代碼
相關文章
相關標籤/搜索