消息中間件,說是一個通訊組件也沒有錯,由於它的本職工做是作消息的傳遞。然而要作到高效的消息傳遞,很重要的一點是數據結構,數據結構設計的好壞,必定程度上決定了該消息組件的性能以及能力上限。算法
消息中間件實現起來天然是很難的,但咱們能夠從某些角度,簡單了說說實現思路。apache
它的最基本的兩個功能接口爲:接收消息的發送(produce), 消息的消費(consume). 就像一個郵遞員同樣,通過它與不通過它實質性的東西沒有變化,它只是一箇中介(其餘功能效應,我們拋卻不說)。數組
爲了實現這兩個基本的接口,咱們就得實現兩個最基本的能力:消息的存儲和查詢。存儲便是接收發送過來的消息,查詢則包括業務查詢與系統自行查詢推送。安全
咱們先來看第一個點:消息的存儲。數據結構
直接基於內存的消息組件,能夠作到很是高效的傳遞,基本上此時的消息中間件就是由幾個內存隊列組成,只要保證這幾個隊列的安全性和實時性,就能夠工做得很好了。然而基於內存則必然意味着能力有限或者成本至關高,因此這樣的設計適用範圍得結合業務現狀作下比對。app
另外一個就是基於磁盤的消息組件,磁盤每每意味着更大的存儲空間,或者某種程度上意味着無限的存儲空間,由於畢竟全部的大數據都是存放在磁盤上的,前提是系統須要協調好各磁盤間的數據關係。然而,磁盤也意味着性能的降低,數據存放起來更麻煩。但rocketmq藉助於操做系統的pagecache和mmap以及順序寫機制,在讀寫性能方面已經很是優化。因此,更重要的是如何設計好磁盤的數據據結構。負載均衡
而後是第二個點:消息的查詢。ide
具體如何查詢,則必然依賴於如何存儲,與上面的原理相似,沒必要細說。但通常會有兩種消費模型:推送消息模型和拉取消費模型。便是消息中間件主動向消費者推送消息,或者是消費者主動查詢消息中間件。兩者也各有優劣,推送模型通常能夠體現出更強的實時性以及保持比較小的server端存儲空間佔用,可是也帶來了很是大的複雜度,它須要處理各類消費異常、重試、負載均衡、上下線,這不是件小事。而拉取模型則會對消息中間件減輕許多工做,主要是省去了異常、重試、負載均衡類的工做,將這些工做轉嫁到消費者客戶端上。但與此同時,也會對消息中間件提出更多要求,即要求可以保留足夠長時間的數據,以便全部合法的消費者均可以進行消費。而對於客戶端,則也須要中間件提供相應的便利,以即可以實現客戶端的基本訴求,好比消費組管理,上下線管理以及最基本的高效查詢能力。性能
很明顯,rocketmq的初衷就是要應對大數據的消息傳遞,因此其必然是基於磁盤的存儲。而其性能如上節所述,其利用操做系統的pagecache和mmap機制,讀寫性能很是好,另外他使用順序寫機制,使普通磁盤也能體現出很是高的性能。大數據
可是,以上幾項,只是爲高性能提供了必要的前提。但具體如何利用,還須要從重設計。畢竟,快不是目的,實現需求才是意義。
rocketmq中主要有四種存儲文件:commitlog 數據文件, consumequeue 消費隊列文件, index 索引文件, 元數據信息文件。最後一個元數據信息文件比較簡單,因其數據量小,方便操做。但針對前三個文件,都會涉及大量的數據問題,因此必然好詳細設計其結構。
從整體上來講,rocketmq都聽從定長數據結構存儲,定長的最大好處就在於能夠快速定位位置,這是其高性能的出發點。定長模型。
從核心上來講,commitlog文件保存了全部原始數據,全部數據想要獲取,都能從或也只能從commitlog文件中獲取,因爲commitlog文件保持了順序寫的特性,因此其性能很是高。而因數據只有一份,因此也就從根本上保證了數據一致性。
而根據各業務場景,衍生出了consumequeue和index文件,即 consumequeue 文件是爲了消費者可以快速獲取到相應消息而設計,而index文件則爲了可以快速搜索到消息而設計。從功能上說,consumequeue和index文件都是索引文件,只是索引的維度不一樣。consumequeue 是以topic和queueId維度進行劃分的索引,而index 則是以時間和key做爲劃分的索引。有了這兩個索引以後,就能夠爲各自的業務場景,提供高性能的服務了。具體其如何實現索引,咱們稍後再講!
commitlog vs consumequeue 的存儲模型以下:
直接順序寫的形式存儲,每一個文件設定固定大小,默認是1G即: 1073741824 bytes. 寫滿一個文件後,新開一個文件寫入。文件名就是其存儲的起始消息偏移量。
官方描述以下:
CommitLog:消息主體以及元數據的存儲主體,存儲Producer端寫入的消息主體內容,消息內容不是定長的。單個文件大小默認1G ,文件名長度爲20位,左邊補零,剩餘爲起始偏移量,好比00000000000000000000表明了第一個文件,起始偏移量爲0,文件大小爲1G=1073741824;當第一個文件寫滿了,第二個文件爲00000000001073741824,起始偏移量爲1073741824,以此類推。消息主要是順序寫入日誌文件,當文件滿了,寫入下一個文件;
當給定一個偏移量,要查找某條消息時,只需在全部的commitlog文件中,根據其名字便可知道偏移的數據信息是否存在其中,即至關於可基於文件實現一個二分查找,實際上rocketmq實現得更簡潔,直接一次性查找便可定位:
// org.apache.rocketmq.store.CommitLog#getData public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog(); // 1. 先在全部commitlog文件中查找到對應所在的 commitlog 分片文件 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound); if (mappedFile != null) { // 再從該分片文件中,移動餘數的大小偏移,便可定位到要查找的消息記錄了 int pos = (int) (offset % mappedFileSize); SelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos); return result; } return null; } // 查找偏移所在commitlog文件的實現方式: // org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean) // firstMappedFile.getFileFromOffset() / this.mappedFileSize 表明了第一條記錄所處的文件位置編號 // offset / this.mappedFileSize 表明當前offset所處的文件編號 // 那麼,兩個編號相減就是當前offset對應的文件編號,由於第一個文件編號的相對位置是0 // 但有個前提:就是每一個文件存儲的大小必須是真實的對應的 offset 大小之差,而實際上consumeQueue根本沒法肯定它存了多少offset // 也就是說,只要文件定長,offset用於定位 commitlog文件就是合理的 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 因此,此處能夠找到 commitlog 文件對應的 mappedFile targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 若是快速查找失敗,則退回到遍歷方式, 使用O(n)的複雜度再查找一次 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } }
定位到具體的消息記錄位置後,如何知道要讀多少數據呢?這實際上在commitlog的數據第1個字節中標明,只需讀出便可知道。
具體commitlog的存儲實現以下:
// org.apache.rocketmq.store.CommitLog.DefaultAppendMessageCallback#doAppend ... // Initialization of storage space this.resetByteBuffer(msgStoreItemMemory, msgLen); // 1 TOTALSIZE, 首先將消息大小寫入 this.msgStoreItemMemory.putInt(msgLen); // 2 MAGICCODE this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE); // 3 BODYCRC this.msgStoreItemMemory.putInt(msgInner.getBodyCRC()); // 4 QUEUEID this.msgStoreItemMemory.putInt(msgInner.getQueueId()); // 5 FLAG this.msgStoreItemMemory.putInt(msgInner.getFlag()); // 6 QUEUEOFFSET this.msgStoreItemMemory.putLong(queueOffset); // 7 PHYSICALOFFSET this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position()); // 8 SYSFLAG this.msgStoreItemMemory.putInt(msgInner.getSysFlag()); // 9 BORNTIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp()); // 10 BORNHOST this.resetByteBuffer(bornHostHolder, bornHostLength); this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder)); // 11 STORETIMESTAMP this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp()); // 12 STOREHOSTADDRESS this.resetByteBuffer(storeHostHolder, storeHostLength); this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder)); // 13 RECONSUMETIMES this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes()); // 14 Prepared Transaction Offset this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset()); // 15 BODY this.msgStoreItemMemory.putInt(bodyLength); if (bodyLength > 0) this.msgStoreItemMemory.put(msgInner.getBody()); // 16 TOPIC this.msgStoreItemMemory.put((byte) topicLength); this.msgStoreItemMemory.put(topicData); // 17 PROPERTIES this.msgStoreItemMemory.putShort((short) propertiesLength); if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); ...
能夠看出,commitlog的存儲仍是比較簡單的,由於其主要就是負責將接收到的全部消息,依次寫入同一文件中。由於專注因此專業。
consumequeue做爲消費者的重要依據,一樣起着很是重要的做用。消費者在進行消費時,會使用一些偏移量做爲依據(拉取模型實現)。而這些個偏移量,實際上就是指的consumequeue的偏移量(注意不是commitlog的偏移量)。這樣作有什麼好處呢?首先,consumequeue做爲索引文件,它被要求要有很是高的查詢性能,因此越簡單越好。最好是可以一次性定位到數據!
若是想一次性定位數據,那麼惟一的辦法是直接使用commitlog的offset。但這會帶來一個最大的問題,就是當我當前消息消費拉取完成後,下一條消息在哪裏呢?若是單靠commitlog文件,那麼,它必然須要將下一條消息讀入,而後再根據topic斷定是否是須要的數據。如此一來,就必然存在大量的commitlog文件的io問題了。因此,這看起來是很是快速的一個解決方案,最終又變成了很是費力的方案了。
而使用commitlog文件的offset,則好了許多。由於consumequeue的文件存儲格式是一條消息佔20字節,即定長。根據這20字節,你能夠找到commitlog的offset. 而由於consumequeue自己就是按照topic/queueId進行劃分的,因此,本次消費完成後,下一次消費的數據一定就在consumequeue的下一位置。如此簡單快速搞得定了。具體consume的存儲格式,如官方描述:
ConsumeQueue:消息消費隊列,引入的目的主要是提升消息消費的性能,因爲RocketMQ是基於主題topic的訂閱模式,消息消費是針對主題進行的,若是要遍歷commitlog文件中根據topic檢索消息是很是低效的。Consumer便可根據ConsumeQueue來查找待消費的消息。其中,ConsumeQueue(邏輯消費隊列)做爲消費消息的索引,保存了指定Topic下的隊列消息在CommitLog中的起始物理偏移量offset,消息大小size和消息Tag的HashCode值。consumequeue文件能夠當作是基於topic的commitlog索引文件,故consumequeue文件夾的組織方式以下:topic/queue/file三層組織結構,具體存儲路徑爲:$HOME/store/consumequeue/{topic}/{queueId}/{fileName}。一樣consumequeue文件採起定長設計,每個條目共20個字節,分別爲8字節的commitlog物理偏移量、4字節的消息長度、8字節tag hashcode,單個文件由30W個條目組成,能夠像數組同樣隨機訪問每個條目,每一個ConsumeQueue文件大小約5.72M;
其中fileName也是以偏移量做爲命名依據,由於這樣才能根據offset快速查找到數據所在的分片文件。
其存儲實現以下:
// org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode, final long cqOffset) { if (offset + size <= this.maxPhysicOffset) { log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset={} phyOffset={}", maxPhysicOffset, offset); return true; } // 依次寫入 offset + size + tagsCode this.byteBufferIndex.flip(); this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE); this.byteBufferIndex.putLong(offset); this.byteBufferIndex.putInt(size); this.byteBufferIndex.putLong(tagsCode); final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset); if (mappedFile != null) { if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) { this.minLogicOffset = expectLogicOffset; this.mappedFileQueue.setFlushedWhere(expectLogicOffset); this.mappedFileQueue.setCommittedWhere(expectLogicOffset); this.fillPreBlank(mappedFile, expectLogicOffset); log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " " + mappedFile.getWrotePosition()); } if (cqOffset != 0) { long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset(); if (expectLogicOffset < currentLogicOffset) { log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset); return true; } if (expectLogicOffset != currentLogicOffset) { LOG_ERROR.warn( "[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}", expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset ); } } this.maxPhysicOffset = offset + size; // 將buffer寫入 consumequeue 的 mappedFile 中 return mappedFile.appendMessage(this.byteBufferIndex.array()); } return false; } 當須要進行查找進,也就會根據offset, 定位到某個 consumequeue 文件,而後再根據偏移餘數信息,再找到對應記錄,取出20字節,便是 commitlog信息。此處實現與 commitlog 的offset查找實現一模一樣。 // 查找索引所在文件的實現,以下: // org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; // 給到客戶端的偏移量是除以 20 以後的,也就是說 若是上一次的偏移量是 1, 那麼下一次的偏移量應該是2 // 一次性消費多條記錄另算, 自行加減 long offset = startIndex * CQ_STORE_UNIT_SIZE; if (offset >= this.getMinLogicOffset()) { // 委託給mappedFileQueue進行查找到單個具體的consumequeue文件 // 根據 offset 和規範的命名,能夠快速定位分片文件,如上 commitlog 的查找實現 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { // 再根據剩餘的偏移量,直接相似於數組下標的形式,一次性定位到具體的數據記錄 SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); return result; } } return null; }
若是想一次性消費多條消息,則只須要依次從查找到索引記錄開始,依次讀取多條,而後同理回查commitlog便可。即consumequeue的連續,成就了commitlog的不連續。以下消息拉取實現:
// org.apache.rocketmq.store.DefaultMessageStore#getMessage // 其中 bufferConsumeQueue 是剛剛查找出的consumequeue的起始消費位置 // 基於此文件迭代,完成多消息記錄消費 ... long nextPhyFileStartOffset = Long.MIN_VALUE; long maxPhyOffsetPulling = 0; int i = 0; final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE); final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded(); ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit(); for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) { // 依次取出commitlog的偏移量,數據大小,hashCode // 一次循環便是取走一條記錄,屢次循環則依次往下讀取 long offsetPy = bufferConsumeQueue.getByteBuffer().getLong(); int sizePy = bufferConsumeQueue.getByteBuffer().getInt(); long tagsCode = bufferConsumeQueue.getByteBuffer().getLong(); maxPhyOffsetPulling = offsetPy; if (nextPhyFileStartOffset != Long.MIN_VALUE) { if (offsetPy < nextPhyFileStartOffset) continue; } boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy); if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(), isInDisk)) { break; } boolean extRet = false, isTagsCodeLegal = true; if (consumeQueue.isExtAddr(tagsCode)) { extRet = consumeQueue.getExt(tagsCode, cqExtUnit); if (extRet) { tagsCode = cqExtUnit.getTagsCode(); } else { // can't find ext content.Client will filter messages by tag also. log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}", tagsCode, offsetPy, sizePy, topic, group); isTagsCodeLegal = false; } } if (messageFilter != null && !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } continue; } SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy); if (null == selectResult) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.MESSAGE_WAS_REMOVING; } nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy); continue; } if (messageFilter != null && !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) { if (getResult.getBufferTotalSize() == 0) { status = GetMessageStatus.NO_MATCHED_MESSAGE; } // release... selectResult.release(); continue; } this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet(); getResult.addMessage(selectResult); status = GetMessageStatus.FOUND; nextPhyFileStartOffset = Long.MIN_VALUE; } if (diskFallRecorded) { long fallBehind = maxOffsetPy - maxPhyOffsetPulling; brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind); } // 分配下一次讀取的offset偏移信息,一樣要除以單條索引大小 nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE); long diff = maxOffsetPy - maxPhyOffsetPulling; long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE * (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0)); getResult.setSuggestPullingFromSlave(diff > memory); ...
以上即理論的實現,無須多言。
index文件是爲搜索場景而生的,若是沒有搜索業務需求,則這個實現是意義不大的。通常這種搜索,主要用於後臺查詢驗證類使用,或者有其餘同的有妙用,不得而知。總之,一切爲搜索。它更多的須要藉助於時間限定,以key或者id進行查詢。
官方描述以下:
IndexFile(索引文件)提供了一種能夠經過key或時間區間來查詢消息的方法。Index文件的存儲位置是:$HOME \store\index\${fileName},文件名fileName是以建立時的時間戳命名的,固定的單個IndexFile文件大小約爲400M,一個IndexFile能夠保存 2000W個索引,IndexFile的底層存儲設計爲在文件系統中實現HashMap結構,故rocketmq的索引文件其底層實現爲hash索引。
IndexFile索引文件爲用戶提供經過「按照Message Key查詢消息」的消息索引查詢服務,IndexFile文件的存儲位置是:$HOME\store\index\${fileName},文件名fileName是以建立時的時間戳命名的,文件大小是固定的,等於40+500W\*4+2000W\*20= 420000040個字節大小。若是消息的properties中設置了UNIQ_KEY這個屬性,就用 topic + 「#」 + UNIQ_KEY的value做爲 key 來作寫入操做。若是消息設置了KEYS屬性(多個KEY以空格分隔),也會用 topic + 「#」 + KEY 來作索引。
其中的索引數據包含了Key Hash/CommitLog Offset/Timestamp/NextIndex offset 這四個字段,一共20 Byte。NextIndex offset 即前面讀出來的 slotValue,若是有 hash衝突,就能夠用這個字段將全部衝突的索引用鏈表的方式串起來了。Timestamp記錄的是消息storeTimestamp之間的差,並非一個絕對的時間。整個Index File的結構如圖,40 Byte 的Header用於保存一些總的統計信息,4\*500W的 Slot Table並不保存真正的索引數據,而是保存每一個槽位對應的單向鏈表的頭。20\*2000W 是真正的索引數據,即一個 Index File 能夠保存 2000W個索引。
具體結構圖以下:
那麼,若是要查找一個key, 應當如何查找呢?rocketmq會根據時間段找到一個index索引分版,而後再根據key作hash獲得一個值,而後定位到 slotValue . 而後再從slotValue去取出索引數據的地址,找到索引數據,而後再回查 commitlog 文件。從而獲得具體的消息數據。也就是,至關於搜索經歷了四級查詢: 索引分片文件查詢 -> slotValue 查詢 -> 索引數據查詢 -> commitlog 查詢 。
具體查找實現以下:
// org.apache.rocketmq.broker.processor.QueryMessageProcessor#queryMessage public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class); final QueryMessageResponseHeader responseHeader = (QueryMessageResponseHeader) response.readCustomHeader(); final QueryMessageRequestHeader requestHeader = (QueryMessageRequestHeader) request .decodeCommandCustomHeader(QueryMessageRequestHeader.class); response.setOpaque(request.getOpaque()); String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG); if (isUniqueKey != null && isUniqueKey.equals("true")) { requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum()); } // 從索引文件中查詢消息 final QueryMessageResult queryMessageResult = this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), requestHeader.getEndTimestamp()); assert queryMessageResult != null; responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp()); if (queryMessageResult.getBufferTotalSize() > 0) { response.setCode(ResponseCode.SUCCESS); response.setRemark(null); try { FileRegion fileRegion = new QueryMessageTransfer(response.encodeHeader(queryMessageResult .getBufferTotalSize()), queryMessageResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { queryMessageResult.release(); if (!future.isSuccess()) { log.error("transfer query message by page cache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); queryMessageResult.release(); } return null; } response.setCode(ResponseCode.QUERY_NOT_FOUND); response.setRemark("can not find message, maybe time range not correct"); return response; } // org.apache.rocketmq.store.DefaultMessageStore#queryMessage @Override public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) { QueryMessageResult queryMessageResult = new QueryMessageResult(); long lastQueryMsgTime = end; for (int i = 0; i < 3; i++) { // 委託給 indexService 搜索記錄, 時間是必備參數 QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime); if (queryOffsetResult.getPhyOffsets().isEmpty()) { break; } Collections.sort(queryOffsetResult.getPhyOffsets()); queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset()); queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp()); for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) { long offset = queryOffsetResult.getPhyOffsets().get(m); try { boolean match = true; MessageExt msg = this.lookMessageByOffset(offset); if (0 == m) { lastQueryMsgTime = msg.getStoreTimestamp(); } if (match) { SelectMappedBufferResult result = this.commitLog.getData(offset, false); if (result != null) { int size = result.getByteBuffer().getInt(0); result.getByteBuffer().limit(size); result.setSize(size); queryMessageResult.addMessage(result); } } else { log.warn("queryMessage hash duplicate, {} {}", topic, key); } } catch (Exception e) { log.error("queryMessage exception", e); } } if (queryMessageResult.getBufferTotalSize() > 0) { break; } if (lastQueryMsgTime < begin) { break; } } return queryMessageResult; } public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) { List<Long> phyOffsets = new ArrayList<Long>(maxNum); long indexLastUpdateTimestamp = 0; long indexLastUpdatePhyoffset = 0; maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch()); try { this.readWriteLock.readLock().lock(); if (!this.indexFileList.isEmpty()) { //從最後一個索引文件,依次搜索 for (int i = this.indexFileList.size(); i > 0; i--) { IndexFile f = this.indexFileList.get(i - 1); boolean lastFile = i == this.indexFileList.size(); if (lastFile) { indexLastUpdateTimestamp = f.getEndTimestamp(); indexLastUpdatePhyoffset = f.getEndPhyOffset(); } // 斷定該時間段是否數據是否在該索引文件中 if (f.isTimeMatched(begin, end)) { // 構建出 key的hash, 而後查找 slotValue, 而後得以索引數據, 而後將offset放入 phyOffsets 中 f.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile); } if (f.getBeginTimestamp() < begin) { break; } if (phyOffsets.size() >= maxNum) { break; } } } } catch (Exception e) { log.error("queryMsg exception", e); } finally { this.readWriteLock.readLock().unlock(); } return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset); } // org.apache.rocketmq.store.index.IndexFile#selectPhyOffset public void selectPhyOffset(final List<Long> phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { // 超出搜索範圍,不處理 } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; // 依次讀出 keyHash+offset+timeDiff+nextOffset int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); if (timeDiff < 0) { break; } timeDiff *= 1000L; // 根據文件名可獲得索引寫入時間 long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); } if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } this.mappedFile.release(); } } }
看起來挺費勁,但真正處理起來性能還好,雖然沒有consumequeue高效,但有mmap和pagecache的加持,效率仍是扛扛的。並且,搜索相對慢一些,用戶也是能夠接受的嘛。畢竟這只是一個附加功能,並不是核心所在。
而索引文件並無使用什麼高效的搜索算法,而是簡單從最後一個文件遍歷完成,由於時間戳不必定老是有規律的,與其隨意查找,還不如直接線性查找。另外,實際上對於索引重建問題,搜索可能不必定會有效。不過,咱們能夠經過擴大搜索時間範圍的方式,老是可以找到存在的數據。並且因其使用hash索引實現,性能仍是不錯的。
另外,index索引文件與commitlog和consumequeue有一個不同的地方,就是它不能進行順序寫,由於hash存儲,寫必定是任意的。且其slotValue以一些統計信息可能隨時發生變化,這也給順序寫帶來了不可解決的問題。
其具體寫索引過程以下:
// org.apache.rocketmq.store.index.IndexFile#putKey public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { if (this.indexHeader.getIndexCount() < this.indexNum) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { // 先嚐試拉取slot對應的數據 // 若是爲0則說明是第一次寫入, 不然爲當前的索引條數 int slotValue = this.mappedByteBuffer.getInt(absSlotPos); if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } // 直接計算出本次存儲的索引記錄位置 // 因索引條數只會依次增長,故索引數據將表現爲順序寫樣子,主要是保證了數據不會寫衝突了 int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; // 按協議寫入內容便可 this.mappedByteBuffer.putInt(absIndexPos, keyHash); this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); // 寫入slotValue爲當前可知的索引記錄條數 // 即每次寫入索引以後,若是存在hash衝突,那麼它會寫入自身的位置 // 而此時 slotValue 一定存在一個值,那就是上一個發生衝突的索引,從而造成天然的鏈表 // 查找數據時,只需根據slotValue便可以找到上一個寫入的索引,這設計妙哉! // 作了2點關鍵性保證:1. 數據自增不衝突; 2. hash衝突自刷新; 磁盤版的hash結構已然造成 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount()); if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } if (invalidIndex == slotValue) { this.indexHeader.incHashSlotCount(); } this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); } return false; }
rocketmq 巧妙地使用了自增結構和hash slot, 完美實現一個磁盤版的hash索引。相信這也會給咱們平時的工做帶來一些提示。
以上就是本文對rocketmq的存儲模型設計的解析了,經過這些解析,相信你們對其工做原理也會有質的理解。存儲其實是目前咱們的許多的系統中的很是核心部分,由於大部分的業務幾乎都是在存儲以前作一些簡單的計算。
很顯然業務很重要,但有了存儲的底子,還何愁業務實現難?