2.1.1 CommitLog文件(物理隊列)
CommitLog是用於存儲真實的物理消息的結構,保存消息元數據,全部消息到達Broker後都會保存到commitLog文件,這裏須要強調的是全部topic的消息都會統一保存在commitLog中。
舉個例子:當前集羣有TopicA, TopicB,
這兩個Toipc的消息會按照消息到達的前後順序保存到同一個commitLog中,而不是每一個Topic有本身獨立的commitLog
onsumeQueue是邏輯隊列,僅僅存儲了CommitLog的位移而已,真實的存儲都在本結構中。
首先這裏會使用CommitLog.this.topicQueueTable.put(key, queueOffset),其中的key是 topic-queueId, queueOffset是當前這個key中的消息數,每增長一個消息增長一(不會自減);
這裏queueOffset的用途以下:每次用戶請求putMessage的時候,將queueOffset返回給客戶端使用,這裏的queueoffset表示邏輯上的隊列偏移。
消息存放物理文件,每臺broker上的commitLog被本機器全部queue共享不作區分
- commitlog文件的存儲地址:$HOME\store\commitlog\${fileName}
- 一個消息存儲單元長度是不定的,順序寫可是隨機讀
- 每一個commitLog文件的默認大小爲 1G =1024*1024*1024,滿1G以後會自動新建CommitLog文件作保存數據用
- commitlog的文件名fileName,名字長度爲20位,左邊補零,剩餘爲起始偏移量;好比00000000000000000000表明了第一個文件,起始偏移量爲0,文件大小爲1G=1073741824;當這個文件滿了,
第二個文件名字爲00000000001073741824,起始偏移量爲1073741824,以此類推,第三個文件名字爲00000000002147483648,起始偏移量爲2147483648消息存儲的時候會順序寫入文件,
當文件滿了,寫入下一個文件。
CommitLog的清理機制:
- 按時間清理,rocketmq默認會清理3天前的commitLog文件;
- 按磁盤水位清理:當磁盤使用量到達磁盤容量75%,開始清理最老的commitLog文件。
1)、CommitLog 文件生成規則
偏移量:每一個 CommitLog 文件的大小爲 1G,通常狀況下第一個 CommitLog 的起始偏移量爲 0,第二個 CommitLog 的起始偏移量爲 1073741824 (1G = 1073741824byte)。
2)、怎麼知道消息存儲在哪一個 CommitLog 文件上?
假設 1073742827 爲物理偏移量(物理偏移量也即全局偏移量),則其對應的相對偏移量爲 1003(1003 = 1073742827 - 1073741824),而且該偏移量位於第二個 CommitLog。
index 和 ComsumerQueue 中都有消息對應的物理偏移量,經過物理偏移量就能夠計算出該消息位於哪一個 CommitLog 文件上。
文件地址:${user.home} \store\${commitlog}\${fileName} 消息存儲結構: flag 這個標誌值rocketmq不作處理,只存儲後透傳 QUEUEOFFSET這個值是個自增值不是真正的consume queue的偏移量,能夠表明這個隊列中消息的個數,要經過這個值查找到consume queue中數據,QUEUEOFFSET * 20纔是偏移地址 PHYSICALOFFSET 表明消息在commitLog中的物理起始地址偏移量 SYSFLAG消息標誌,指明消息是事物事物狀態等等消息特徵 BORNTIMESTAMP 消息產生端(producer)的時間戳 BORNHOST 消息產生端(producer)地址(address:port) STORETIMESTAMP 消息在broker存儲時間 STOREHOSTADDRESS 消息存儲到broker的地址(address:port) RECONSUMETIMES消息被某個訂閱組從新消費了幾回(訂閱組之間獨立計數),由於重試消息發送到了topic名字爲%retry%groupName的隊列queueId=0的隊列中去了 Prepared Transaction Offset 表示是prepared狀態的事物消息
2.1.2 ConsumeQueue文件組織:
ConsumerQueue至關於CommitLog的索引文件,消費者消費時會先從ConsumerQueue中查找消息的在commitLog中的offset,再去CommitLog中找元數據。
若是某個消息只在CommitLog中有數據,沒在ConsumerQueue中, 則消費者沒法消費,Rocktet的事務消息就是這個原理
Consumequeue類對應的是每一個topic和queuId下面的全部文件,至關於字典的目錄用來指定消息在消息的真正的物理文件commitLog上的位置
每條數據的結構以下圖所示:
消息的起始物理偏移量physical offset(long 8字節)+消息大小size(int 4字節)+tagsCode(long 8字節)。
- 每一個topic下的每一個queue都有一個對應的consumequeue文件。
- 文件默認存儲路徑:${user.home} \store\consumequeue\${topicName}\${queueId}\${fileName}
- 每一個文件由30W條數據組成,每條數據的大小爲20個字節,從而每一個文件的默認大小爲600萬個字節(consume queue中存儲單元是一個20字節定長的數據)是順序寫順序讀
- commitLogOffset是指這條消息在commitLog文件實際偏移量
- size就是指消息大小
- 消息tag的哈希值
ConsumeQueue幾個重要的字段
private final String topic; private final int queueId;//隊列id private final ByteBuffer byteBufferIndex;// 寫索引時用到的ByteBuffer private long maxPhysicOffset = -1;// 最後一個消息對應的物理Offset
每一個cosumequeue文件的名稱fileName,名字長度爲20位,左邊補零,剩餘爲起始偏移量;
好比00000000000000000000表明了第一個文件,起始偏移量爲0,文件大小爲600W,
當第一個文件滿以後建立的第二個文件的名字爲00000000000006000000,起始偏移量爲6000000,以此類推,
第三個文件名字爲00000000000012000000,起始偏移量爲12000000,消息存儲的時候會順序寫入文件,當文件滿了,寫入下一個文件。
- topic queueId來組織的:好比TopicA配了讀寫隊列0、1,那麼TopicA和Queue=0組成一個ConsumeQueue, TopicA和Queue=1組成一個另外一個ConsumeQueue.
- 按消費端group分組重試隊列,若是消費端消費失敗,發送到retry消費隊列中
- 按消費端group分組死信隊列,若是消費端重試超過指定次數,發送死信隊列
- 每一個ConsumeQueue能夠由多個文件組成無限隊列被MapedFileQueue對象管理
2.1.3 MapedFile 是PageCache文件封裝
操做物理文件在內存中的映射以及將內存數據持久化到物理文件中,代碼中寫死了要求os系統的頁大小爲4k, 消息刷盤根據參數(commitLog默認至少刷4頁, consumeQueue默認至少刷2頁)才刷
如下io對象構建了物理文件映射內存的對象 FileChannel fileChannel = new RandomAccessFile(file,「rw」).getChannel(); MappedByteBuffer mappedByteBuffer=fileChannel.map(READE_WRITE,0,fileSize); 構建mapedFile對象須要兩個參數 fileSize: 映射的物理文件的大小 commitLog每一個文件的大小默認1G =1024*1024*1024 ConsumeQueue每一個文件默認存30W條 = 300000 *CQStoreUnitSize(每條大小) filename: filename文件名稱但不只僅是名稱還表示文件記錄的初始偏移量, 文件名實際上是個long類型的值
2.1.4 MapedFileQueue 存儲隊列,數據定時刪除,無限增加。
隊列有多個文件(MapedFile)組成,由集合對象List表示升序排列,前面講到文件名便是消息在此文件的中初始偏移量,排好序後組成了一個連續的消息隊
當消息到達broker時,須要獲取最新的MapedFile寫入數據,調用MapedFileQueue的getLastMapedFile獲取,此函數若是集合中一個也沒有建立一個,若是最後一個寫滿了也建立一個新的。 MapedFileQueue在獲取getLastMapedFile時,若是須要建立新的MapedFile會計算出下一個MapedFile文件地址,經過預分配服務AllocateMapedFileService異步預建立下一個MapedFile文件,這樣下次建立新文件請求就不要等待,由於建立文件特別是一個1G的文件仍是有點耗時的, getMinOffset獲取隊列消息最少偏移量,即第一個文件的文件起始偏移量 getMaxOffset獲取隊列目前寫到位置偏移量 getCommitWhere刷盤刷到哪裏了
2.1.5 消息存儲及消費過程
1)消息發送流程:
- Broker啓動時,向NameServer註冊信息
- 客戶端調用producer發送消息時,會先從NameServer獲取該topic的路由信息。消息頭code爲GET_ROUTEINFO_BY_TOPIC
- 從NameServer返回的路由信息,包括topic包含的隊列列表和broker列表
- Producer端根據查詢策略,選出其中一個隊列,用於後續存儲消息
- 每條消息會生成一個惟一id,添加到消息的屬性中。屬性的key爲UNIQ_KEY
- 對消息作一些特殊處理,好比:超過4M會對消息進行壓縮
- producer向Broker發送rpc請求,將消息保存到broker端。消息頭的code爲SEND_MESSAGE或SEND_MESSAGE_V2(配置文件設置了特殊標誌)
消息存儲流程
- Broker端收到消息後,將消息原始信息保存在CommitLog文件對應的MappedFile中,而後異步刷新到磁盤
- ReputMessageServie線程異步的將CommitLog中MappedFile中的消息保存到ConsumerQueue和IndexFile中
- ConsumerQueue和IndexFile只是原始文件的索引信息
1)消息消費過程:
如今咱們再來看 Broker 服務器端。首先咱們應該知道,消息往 Broker 存儲就是在向 CommitLog 消息文件中寫入數據的一個過程。
在 Broker 啓動過程當中,其會啓動一個叫作 ReputMessageService
的服務,這個服務每隔 1 秒會檢查一下這個 CommitLog 是否有新的數據寫入。
ReputMessageService
自身維護了一個偏移量 reputFromOffset
,用以對比和 CommitLog 文件中的消息總偏移量的差距。
當這兩個偏移量不一樣的時候,就表明有新的消息到來了,在有新的消息到來以後,doReput()
函數會取出新到來的全部消息,每一條消息都會封裝爲一個 DispatchRequest
請求,
進而將這條請求分發給不一樣的請求消費者,咱們在這篇文章中只會關注利用消息建立消費隊列的服務 CommitLogDispatcherBuildConsumeQueue,
CommitLogDispatcherBuildConsumeQueue
服務會根據這條請求按照不一樣的隊列 ID 建立不一樣的消費隊列文件,並在內存中維護一份消費隊列列表。
而後將 DispatchRequest
請求中這條消息的消息偏移量、消息大小以及消息在發送時候附帶的標籤的 Hash 值寫入到相應的消費隊列文件中去。
3)客戶端如何記錄本身所消費的隊列消費到哪裏了呢?
答案就是:消費隊列偏移量。
集羣模式:因爲每一個客戶端所消費的消息隊列不一樣,因此每一個消息隊列已經消費到哪裏的消費偏移量是記錄在 Broker 服務器端的。
廣播模式:因爲每一個客戶端分配消費這個話題的全部消息隊列,因此每一個消息隊列已經消費到哪裏的消費偏移量是記錄在客戶端本地的。
(1) 集羣模式
在集羣模式下,消費者客戶端在內存中維護了一個
offsetTable
表,一樣在 Broker 服務器端也維護了一個偏移量表,在消費者客戶端,RebalanceService
服務會定時地 (默認 20 秒) 從 Broker 服務器獲取當前客戶端所須要消費的消息隊列,並與當前消費者客戶端的消費隊列進行對比,看是否有變化。對於每一個消費隊列,會從 Broker 服務器查詢這個隊列當前的消費偏移量。而後根據這幾個消費隊列,建立對應的拉取請求PullRequest
準備從 Broker 服務器拉取消息,當從 Broker 服務器拉取下來消息之後,只有當用戶成功消費的時候,纔會更新本地的偏移量表。本地的偏移量表再經過定時服務每隔 5 秒同步到 Broker 服務器端,而維護在 Broker 服務器端的偏移量表也會每隔 5 秒鐘序列化到磁盤中(文件地址:${user.home} /store/config/consume/consumerOffset.json)保存的格式以下所示:
(2) 廣播模式
對於廣播模式而言,每一個消費隊列的偏移量確定不能存儲在 Broker 服務器端,由於多個消費者對於同一個隊列的消費可能不一致,偏移量會互相覆蓋掉。所以,在廣播模式下,每一個客戶端的消費偏移量是存儲在本地的,而後每隔 5 秒將內存中的
offsetTable
持久化到磁盤中。當首次從服務器獲取可消費隊列的時候,偏移量不像集羣模式下是從 Broker 服務器讀取的,而是直接從本地文件中讀取這裏提一下,在廣播模式下,消息隊列的偏移量默認放在用戶目錄下的
.rocketmq_offsets
目錄下存儲格式以下:
3. load、recover
Broker啓動的時候須要加載一系列的配置,啓動一系列的任務,主要分佈在BrokerController 的initialize()和start()方法中
1.加載topic配置 2.加載消費進度consumer offset 3.加載消費者訂閱關係consumer subscription 4.加載本地消息messageStore.load() Load 定時進度,Load commit log,commitLog其實調用存儲消費隊列mapedFileQueue.load()方法來加載的。 遍歷出${user.home} \store\${commitlog}目錄下全部commitLog文件,按文件名(文件名就是文件的初始偏移量)升序排一下, 每一個文件構建一個MapedFile對象, 在MapedFileQueue中用集合list把這些MapedFile文件組成一個邏輯上連續的隊列 Load consume Queue 遍歷${user.home} \store\consumequeue下的全部文件夾(每一個topic就是一個文件夾) 遍歷${user.home} \store\consumequeue\${topic}下的全部文件夾(每一個queueId就是一個文件夾) 遍歷${user.home} \store\consumequeue\${topic}\${queueId}下全部文件,根據topic, queueId, 文件來構建ConsueQueue對象 DefaultMessageStore中存儲結構Map<topic,Map<queueId, CosnueQueue>> 每一個Consumequeue利用MapedFileQueue把mapedFile組成一個邏輯上連續的隊列 加載事物模塊 加載存儲檢查點 加載${user.home} \store\checkpoint 這個文件存儲了3個long類型的值來記錄存儲模型最終一致的時間點,這個3個long的值爲 physicMsgTimestamp爲commitLog最後刷盤的時間 logicMsgTimestamp爲consumeQueue最終刷盤的時間 indexMsgTimestamp爲索引最終刷盤時間 checkpoint做用是當異常恢復時須要根據checkpoint點來恢復消息 加載索引服務indexService recover嘗試數據恢復 判斷是不是正常恢復,系統啓動的啓動存儲服務(DefaultMessageStore)的時候會建立一個臨時文件abort, 當系統正常關閉的時候會把這個文件刪掉,這個相似在Linux下打開vi編輯器生成那個臨時文件,全部當這個abort文件存在,系統認爲是異常恢復
1) 先按照正常流程恢復ConsumeQueue 什麼是恢復ConsumeQueue, 前面不是有步驟load了ConsumeQueue嗎,爲何還要恢復? 前面load步驟建立了MapedFile對象創建了文件的內存映射,可是數據是否正確,如今文件寫到哪了(wrotePosition),
Flush到了什麼位置(committedPosition)?恢復數據來幫我解決這些問題。 每一個ConsumeQueue的mapedFiles集合中,從倒數第三個文件開始恢復(爲何只恢復倒數三個文件,看消息消費程度),
由於consumequeue的存儲單元是20字節的定長數據,因此是依次分別取了 Offset long類型存儲了commitLog的數據偏移量 Size int類型存儲了在commitLog上消息大小 tagcode tag的哈希值 目前rocketmq判斷存儲的consumequeue數據是否有效的方式爲判斷offset>= 0 && size > 0 若是數據有效讀取下20個字節判斷是否有效 若是數據無效跳出循環,記錄此時有效數據的偏移量processOffset 若是讀到文件尾,讀取下一個文件 proccessOffset是有效數據的偏移量,獲取這個值的做用什麼? (1)proccessOffset後面的數據屬於髒數據,後面的文件要刪除掉 (2)設置proccessOffset所在文件MapedFile的wrotePosition和commitedPosition值,值爲 proccessOffset%mapedFileSize 2正常恢復commitLog文件 步驟跟流程恢復Consume Queue 判斷消息有效, 根據消息的存儲格式讀取消息到DispatchRequest對象,獲取消息大小值msgSize 大於 0 正常數據 等於-1 文件讀取錯誤 恢復結束 等於0 讀到文件末尾 3) 異常數據恢復,OSCRASH或者JVM CRASH或者機器掉電 當${user.home}\store\abort文件存在,表明異常恢復 讀取${user.home} \store\checkpoint獲取最終一致的時間點 判斷最終一致的點所在的文件是哪一個 從最新的mapedFile開始,獲取存儲的一條消息在broker的生成時間,大於checkpoint時間點的放棄找前一個文件,小於等於checkpoint時間點的說明checkpoint
在此mapedfile文件中 從checkpoint所在mapedFile開始恢復數據,它的總體過程跟正常恢復commitlog相似,最重要的區別在於 (1)讀取消息後派送到分發消息服務DispatchMessageService中,來重建ConsumeQueue以及索引 (2)根據恢復的物理offset,清除ConsumeQueue多餘的數據 4)恢復TopicQueueTable=Map<topic-queueid,offset> (1)恢復寫入消息時,消費記錄隊列的offset (2)恢復每一個隊列的最小offset 初始化通訊層 初始化線程池 註冊broker端處理器用來接收client請求後選擇處理器處理 啓動天天凌晨00:00:00統計消費量任務 啓動定時刷消費進度任務 啓動掃描數據被刪除了的topic,offset記錄也對應刪除任務 若是namesrv地址不是指定的,而是從靜態服務器取的,啓動定時向靜態服務器獲取namesrv地址的任務 若是broker是master,啓動任務打印slave落後master沒有同步的bytes 若是broker是slave,啓動任務定時到mastser同步配置信息
3. master slave
在broker啓動的時候BrokerController若是是slave,配置了master地址更新,沒有配置全部broker會想namesrv註冊,從namesrv獲取haServerAddr,而後更新到HAClient
當HAClient的MasterAddress不爲空的時候(由於broker master和slave都構建了HAClient)會主動鏈接master獲取SocketChannel Master監聽Slave請求的端口,默認爲服務端口+1
接收slave上傳的offset long類型 int pos = this.byteBufferRead.position() -(this.byteBufferRead.position() % 8)
//沒有理解意圖
long readOffset =this.byteBufferRead.getLong(pos - 8); this.processPostion = pos;
主從複製從哪裏開始複製:若是請求時0 ,從最後一個文件開始複製
Slave啓動的時候brokerController開啓定時任務定時拷貝master的配置信息
SlaveSynchronize類表明slave從master同步信息(非消息) syncTopicConfig 同步topic的配置信息 syncConsumerOffset 同步消費進度 syncDelayOffset 同步定時進度 syncSubcriptionGroupConfig 同步訂閱組配7F6E
HaService類實現了HA服務,負責同步雙寫,異步複製功能, 這個類master和slave的broker都會實例化,
Master經過AcceptSocketService監聽slave的鏈接,每一個masterslave鏈接都會構建一個HAConnection對象搭建他們之間的橋樑,對於一個master多slave部署結構的會有多個HAConnection實例,
Master構建HAConnection時會構建向slave寫入數據服務線程對象WriteSocketService對象和讀取Slave反饋服務線程對象ReadSocketService
WriteSocketService
向slave同步commitLog數據線程, slaveRequestOffset是每次slave同步完數據都會向master發送一個ack表示下次同步的數據的offset。 若是slave是第一次啓動的話slaveRequestOffset=0, master會從最近那個commitLog文件開始同步。(若是要把master上的全部commitLog文件同步到slave的話, 把masterOffset值賦爲minOffset)
向socket寫入同步數據: 傳輸數據協議<Phy Offset> <Body Size> <Body Data>
ReadSocketService:
4 ReadSocketService
讀取slave經過HAClient向master返回同步commitLog的物理偏移量phyOffset值 通知前端線程,若是是同步複製的話通知是否複製成功 Slave 經過HAClient創建與master的鏈接, 來定時彙報slave最大物理offset,默認5秒彙報一次也表明了跟master之間的心跳檢測 讀取master向slave寫入commitlog的數據, master向slave寫入數據的格式是
Slave初始化DefaultMessageStore時候會構建ReputMessageService服務線程並在啓動存儲服務的start方法中被啓動 ReputMessageService的做用是slave從物理隊列(由commitlog文件構成的MapedFileQueue)加載數據,並分發到各個邏輯隊列 HA同步複製, 當msg寫入master的commitlog文件後,判斷maser的角色若是是同步雙寫SYNC_MASTER, 等待master同步到slave在返回結果
5 HA異步複製
6.索引服務
1索引結構
IndexFile 存儲具體消息索引的文件,文件的內容結構如圖:
索引文件由索引文件頭IndexHeader, 槽位Slot和消息的索引內容三部分構成
IndexHeader:索引文件頭信息40個字節的數據組成
beginTimestamp 8位long類型,索引文件構建第一個索引的消息落在broker的時間 endTimestamp 8位long類型,索引文件構建最後一個索引消息落broker時間 beginPhyOffset 8位long類型,索引文件構建第一個索引的消息commitLog偏移量 endPhyOffset 8位long類型,索引文件構建最後一個索引消息commitLog偏移量 hashSlotCount 4位int類型,構建索引佔用的槽位數(這個值貌似沒有具體做用) indexCount 4位int類型,索引文件中構建的索引個數
槽位slot, 默認每一個文件配置的slot個數爲500萬個,每一個slot是4位的int類型數據
計算消息的對應的slotPos=Math.abs(keyHash)%hashSlotNum
消息在IndexFile中的偏移量absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos *HASH_SLOT_SIZE
Slot存儲的值爲消息個數索引
消息的索引內容是20位定長內容的數據
4位int值, 存儲的是key的hash值 8位long值 存儲的是消息在commitlog的物理偏移量phyOffset 4位int值 存儲了當前消息跟索引文件中第一個消息在broker落地的時間差 4位int值 若是存在hash衝突,存儲的是上一個消息的索引地址
7. 索引服務IndexService線程
1. 索引配置:hashSlotNum哈希槽位個數、indexNum存儲索引的最大個數、storePath索引文件indexFile存儲的路徑 2. Load broker啓動的時候加載本地IndexFile, 若是是異常啓動刪除以後storeCheckPoint文件,由於commitLog根據storeCheckPoint會重建以後的索引文件, 3. Run方法,任務從阻塞隊列中獲取請求構建索引 4. queryOffset 根據topic key 時間跨度來查詢消息 倒敘遍歷全部索引文件 每個indexfile存儲了第一個消息和最後一個消息的存儲時間,根據傳入時間範圍來判斷索引是否落在此索引文件
8. 構建索引服務
分發消息索引服務將消息位置分發到ConsumeQueue中後,加入IndexService的LinkedBlockingQueue隊列中,IndexService經過任務向隊列中獲取請求來構建索引 剔除commitType或者rollbackType消息,由於這兩種消息都有對應的preparedType的消息 構建索引key(topic + "#" + key) 根據key的hashcode計算槽位,即跟槽位最大值取餘數 計算槽位在indexfile的具體偏移量位置 根據槽位偏移量獲取存儲的上一個索引 計算消息跟文件頭存儲開始時間的時間差 根據消息頭記錄的存儲消息個數計算消息索引存儲的集體偏移量位置 寫入真正的索引,內容參考上面索引內容格式 將槽位中的更新爲此消息索引 更新索引頭文件信息
9. Broker與client(comsumer ,producer)之間的心跳,
一:Broker接收client心跳ClientManageProcessor處理client的心跳請求 1. 構建ClientChannelInfo對象 1) 持有channel對象,表示與客戶端的鏈接通道 2) ClientID表示客戶端 ….. 2. 每次心跳會更新ClientChannelInfo的時間戳,來表示client還活着 3. 註冊或者更新consumer的訂閱關係(是以group爲單位來組織的, group下可能有多個訂閱關係) 4. 註冊producer,其實就是發送producer的group(這個在事物消息中才有點做用) 二:ClientHouseKeepingService線程定時清除不活動的鏈接 1) ProducerManager.scanNotActiveChannel 默認兩分鐘producer沒有發送心跳清除 2) ConsumerManager.scanNotActiveChannel 默認兩份中Consumer沒有發送心跳清除
10. Broker與namesrv之間的心跳
1) namesrv接收borker心跳DefaultRequestProcessor的REGISTER_BROKE事件處理, (1) 註冊broker的topic信息 (2) 構建或者更新BrokerLiveInfo的時間戳 NamesrvController初始化時啓動線程定時調用RouteInfoManger的scanNotActiveBroker方法來定時不活動的broker(默認兩分鐘沒有向namesrv發送心跳更新時間戳的)