面試官:你瞭解RocketMQ是如何存儲消息的嗎?
我:額,,,你等下,我看下這篇文字, (逃html
因爲這部份內容優勢多,因此請哥哥姐姐們自備茶水,歡迎留言!node
RocketMQ存儲設計是高可用和高性能的保證, 利用磁盤存儲來知足海量堆積能力。Kafka單機在topic數量在100+的時候,性能會降低不少,而RocketMQ可以在多個topic存在時,依然保持高性能面試
下面主要從存儲結構、存儲流程、存儲優化的技術來造成文字緩存
基於的版本是RocketMQ4.5.2數據結構
存儲的文件主要分爲:架構
文件地址:${user.home} \store${commitlog}${fileName}app
commitlog特色:異步
下面的表格說明了,每一個消息體不是定長的,會存儲消息的哪些內容,包括物理偏移量、consumeQueue的偏移量、消息體等信息ide
順序 | 字段名 | 說明 |
---|---|---|
1 | totalSize(4Byte) | 消息大小 |
2 | magicCode(4) | 設置爲daa320a7 (這個不太明白) |
3 | bodyCRC(4) | 當broker重啓recover時會校驗 |
4 | queueId(4) | 消息對應的consumeQueueId |
5 | flag(4) | rocketmq不作處理,只存儲後透傳 |
6 | queueOffset(8) | 消息在consumeQueue中的偏移量 |
7 | physicalOffset(8) | 消息在commitlog中的偏移量 |
8 | sysFlg(4) | 事務標示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE |
9 | bronTimestamp(8) | 消息產生端(producer)的時間戳 |
10 | bronHost(8) | 消息產生端(producer)地址(address:port) |
11 | storeTimestamp(8) | 消息在broker存儲時間 |
12 | storeHostAddress(8) | 消息存儲到broker的地址(address:port) |
13 | reconsumeTimes(4) | 消息重試次數 |
14 | preparedTransactionOffset(8) | 事務消息的物理偏移量 |
15 | bodyLength(4) | 消息長度,最長不超過4MB |
16 | body(body length Bytes) | 消息體內容 |
17 | topicLength(1) | 主題長度,最長不超過255Byte |
18 | topic(topic length Bytes) | 主題內容 |
19 | propertiesLength(2) | 消息屬性長度,最長不超過65535Bytes |
20 | properties(properties length Bytes) | 消息屬性內容 |
文件地址:${user.home}\store\consumeQueue${topic}${queueId}${fileName}性能
consumequeue文件特色:
每一個Topic下的每一個MessageQueue都有一個對應的ConsumeQueue文件
該結構對應於消費者邏輯隊列,爲何要將一個topic抽象出不少的queue呢?這樣的話,對集羣模式更有好處,可使多個消費者共同消費,而不用上鎖;
順序 | 字段名 | 說明 |
---|---|---|
1 | offset(8) | commitlog的偏移量 |
2 | size(4) | commitlog消息大小 |
3 | tagHashCode | tag的哈希值 |
文件地址:${user.home}\store\index${fileName}
index文件特色:
索引文件(Index)提供消息檢索的能力,主要在問題排查和數據統計等場景應用
順序 | 字段名 | 說明 |
---|---|---|
1 | keyHash(4) | key的結構是 |
2 | phyOffset(8) | commitLog真實的物理位移 |
3 | timeOffset(4) | 時間偏移量 |
4 | slotValue(4) | 下一個記錄的slot值 |
層次從上到下依次爲:
業務層 | QueueMessageProcessor | PullMessageProcessor SendMessageProcessor |
|
DefaultMessageStore | |||
存儲邏輯層 | IndexService | ConsumeQueue | CommitLog |
IndexFile | MappedFileQueue | ||
磁盤交互IO層 | MappedFile | ||
MappedByteBuffer | |||
Disk |
RocketMQ 的存儲核心類爲 DefaultMessageStore,入口方法是putMessage方法
1 // DefaultMessageStore#putMessage 2 public PutMessageResult putMessage(MessageExtBrokerInner msg) { 3 // 判斷該服務是否shutdown,不可用直接返回【代碼省略】 4 // 判斷broke的角色,若是是從節點直接返回【代碼省略】 5 // 判斷runningFlags是不是可寫狀態,不可寫直接返回,可寫把printTimes設爲0【代碼省略】 6 // 判斷topic名字是否大於byte字節127, 大於則直接返回【代碼省略】 7 // 判斷msg中properties屬性長度是否大於short最大長度32767,大於則直接返回【代碼省略】 8 9 if (this.isOSPageCacheBusy()) { // 判斷操做系統頁寫入是否繁忙 10 return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null); 11 } 12 13 long beginTime = this.getSystemClock().now(); 14 PutMessageResult result = this.commitLog.putMessage(msg); // $2 查看下方代碼,寫msg核心 15 16 long elapsedTime = this.getSystemClock().now() - beginTime; 17 if (elapsedTime > 500) { 18 log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length); 19 } 20 // 記錄寫commitlog時間,大於最大時間則設置爲這個最新的時間 21 this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime); 22 23 if (null == result || !result.isOk()) { 24 // 記錄寫commitlog 失敗次數 25 this.storeStatsService.getPutMessageFailedTimes().incrementAndGet(); 26 } 27 28 return result; 29 }
public PutMessageResult putMessage(final MessageExtBrokerInner msg) { // Set the storage time msg.setStoreTimestamp(System.currentTimeMillis()); // Set the message body BODY CRC (consider the most appropriate setting // on the client) msg.setBodyCRC(UtilAll.crc32(msg.getBody())); // Back to Results AppendMessageResult result = null; StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService(); String topic = msg.getTopic(); int queueId = msg.getQueueId(); final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag()); // $1 if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // $2 // Delay Delivery if (msg.getDelayTimeLevel() > 0) { if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) { msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()); } topic = ScheduleMessageService.SCHEDULE_TOPIC; queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel()); // Backup real topic, queueId MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic()); MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId())); msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties())); msg.setTopic(topic); msg.setQueueId(queueId); } } long elapsedTimeInLock = 0; MappedFile unlockMappedFile = null; MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); // $3 putMessageLock.lock(); //spin or ReentrantLock ,depending on store config // $4 try { long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now(); this.beginTimeInLock = beginLockTimestamp; // Here settings are stored timestamp, in order to ensure an orderly // global msg.setStoreTimestamp(beginLockTimestamp); if (null == mappedFile || mappedFile.isFull()) { // $5 mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise } if (null == mappedFile) { log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); // $6 switch (result.getStatus()) { // $7 case PUT_OK: break; case END_OF_FILE: unlockMappedFile = mappedFile; // Create a new file, re-write the message mappedFile = this.mappedFileQueue.getLastMappedFile(0); if (null == mappedFile) { // XXX: warn and notify me log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString()); beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result); } result = mappedFile.appendMessage(msg, this.appendMessageCallback); break; case MESSAGE_SIZE_EXCEEDED: case PROPERTIES_SIZE_EXCEEDED: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result); case UNKNOWN_ERROR: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); default: beginTimeInLock = 0; return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result); } elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp; beginTimeInLock = 0; } finally { putMessageLock.unlock(); } if (elapsedTimeInLock > 500) { log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result); } if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) { this.defaultMessageStore.unlockMappedFile(unlockMappedFile); } PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result); // Statistics storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet(); storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes()); handleDiskFlush(result, putMessageResult, msg); // $8 handleHA(result, putMessageResult, msg); // $9 return putMessageResult; }
mappedFile.appendMessage方法會調用this.appendMessagesInner方法
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) { assert messageExt != null; assert cb != null; int currentPos = this.wrotePosition.get(); // $1 if (currentPos < this.fileSize) { ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); // $2 byteBuffer.position(currentPos); AppendMessageResult result; if (messageExt instanceof MessageExtBrokerInner) { // $3 result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); // $4 } else if (messageExt instanceof MessageExtBatch) { result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt); } else { return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); } this.wrotePosition.addAndGet(result.getWroteBytes()); // $5 this.storeTimestamp = result.getStoreTimestamp(); return result; } log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize); return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR); }
代碼在CommitLog內部類 DefaultAppendMessageCallback中
// CommitLog$DefaultAppendMessageCallback#doAppend public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) { // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br> long wroteOffset = fileFromOffset + byteBuffer.position(); // $1 this.resetByteBuffer(hostHolder, 8); // $2 String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset); // Record ConsumeQueue information keyBuilder.setLength(0); keyBuilder.append(msgInner.getTopic()); keyBuilder.append('-'); keyBuilder.append(msgInner.getQueueId()); String key = keyBuilder.toString(); Long queueOffset = CommitLog.this.topicQueueTable.get(key); // $3 if (null == queueOffset) { queueOffset = 0L; CommitLog.this.topicQueueTable.put(key, queueOffset); } // Transaction messages that require special handling final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag()); switch (tranType) { // Prepared and Rollback message is not consumed, will not enter the // consumer queuec case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: // $4 queueOffset = 0L; break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: default: break; } // Serialize message // $5 final byte[] propertiesData = msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8); final int propertiesLength = propertiesData == null ? 0 : propertiesData.length; if (propertiesLength > Short.MAX_VALUE) { log.warn("putMessage message properties length too long. length={}", propertiesData.length); return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED); } final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8); final int topicLength = topicData.length; final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length; final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength); // Exceeds the maximum message if (msgLen > this.maxMessageSize) { CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength + ", maxMessageSize: " + this.maxMessageSize); return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED); } // Determines whether there is sufficient free space if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) { // $6 this.resetByteBuffer(this.msgStoreItemMemory, maxBlank); this.msgStoreItemMemory.putInt(maxBlank); // 1 TOTALSIZE this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 2 MAGICCODE // 3 The remaining space may be any value // Here the length of the specially set maxBlank final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank); return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); } // $7 【代碼省略】 if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData); final long beginTimeMills = CommitLog.this.defaultMessageStore.now(); // Write messages to the queue buffer byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen); // $8 AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId, // $9 msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills); switch (tranType) { case MessageSysFlag.TRANSACTION_PREPARED_TYPE: case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE: break; case MessageSysFlag.TRANSACTION_NOT_TYPE: case MessageSysFlag.TRANSACTION_COMMIT_TYPE: // The next update ConsumeQueue information CommitLog.this.topicQueueTable.put(key, ++queueOffset); break; default: break; } return result; }
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset(); for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) { for (ConsumeQueue logic : maps.values()) { if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) { maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset(); } } } if (maxPhysicalPosInLogicQueue < 0) { maxPhysicalPosInLogicQueue = 0; } if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) { maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset(); log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset()); } this.reputMessageService.start();
private void doReput() { if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { // $1 log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.", this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset()); this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset(); } for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { // $2 if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable() && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) { break; } SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset); // $3 if (result != null) { try { this.reputFromOffset = result.getStartOffset(); // $4 for (int readSize = 0; readSize < result.getSize() && doNext; ) { DispatchRequest dispatchRequest = DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // $5 構建dispatchRequest int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize(); if (dispatchRequest.isSuccess()) { if (size > 0) { DefaultMessageStore.this.doDispatch(dispatchRequest); // $6 if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() // 若是該broker是主broker,能夠推送消息到達conusmerQueue的消息,這裏用戶也客戶自定定推送的監聽 && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) { DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(), dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1, dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(), dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap()); } this.reputFromOffset += size; // $7 readSize += size; if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) { DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet(); DefaultMessageStore.this.storeStatsService .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()) .addAndGet(dispatchRequest.getMsgSize()); } } else if (size == 0) { this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset); readSize = result.getSize(); } } else if (!dispatchRequest.isSuccess()) { if (size > 0) { // &8 log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset); this.reputFromOffset += size; } else { doNext = false; // If user open the dledger pattern or the broker is master node, // it will not ignore the exception and fix the reputFromOffset variable if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() || DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) { log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}", this.reputFromOffset); this.reputFromOffset += result.getSize() - readSize; } } } } } finally { result.release(); } } else { doNext = false; } } }
Step1: 根據topicId和queueId獲取ConsumeQueue
Step2: 將消息偏移量、消息size、tagHashCode(查看ConsumeQueue的數據結構)),把消息追加到ConsumeQueue的內存映射文件(mappedFile)中(不刷盤),consumeQueue默認異步刷盤
1 return mappedFile.appendMessage(this.byteBufferIndex.array());
若是messageIndexEnable設置爲true, 則轉發此任務,不然不轉發
step1: 獲取indexFile, 若是indexFileList的內存中沒有indexFile,則根據路徑從新構建indexFile
step2: 若是消息的惟一鍵不存在,則條件到放到indexFile中
上面說到DefaultMessageStore是存儲的業務層,putMessage是入口方法
從上面的屬性能夠觀察到有幾類屬性:
這裏會另起一篇文字來講明
這裏會另起一篇文字來講明
Page cache 也叫頁緩衝或文件緩衝,是由好幾個磁盤塊構成,大小一般爲4k,在64位系統上爲8k,構成的幾個磁盤塊在物理磁盤上不必定連續,文件的組織單位爲一頁, 也就是一個page cache大小,文件讀取是由外存上不連續的幾個磁盤塊,到buffer cache,而後組成page cache,而後供給應用程序。
操做系統操做I/O時,會先在pageCache中查找,若是未命中,則啓動磁盤I/O,並把磁盤文件中的數據加載到pageCache的一個空閒快中,而後在copy到用戶緩衝區
對於每一個文件的第一個讀請求操做,系統在讀入所請求頁面的同時會順序讀入後面少數幾個頁面
MQ讀取消息依賴系統PageCache,PageCache命中率越高,讀性能越高
ConsumeQueue邏輯消費隊列是順序讀取,在pageCache機制的預讀取做用下,ConsumeQueue的讀性能會比較高近乎內存,即便在有消息堆積狀況下也不會影響性能。
另外,RocketMQ主要經過MappedByteBuffer對文件進行讀寫操做。其中,利用了NIO中的FileChannel模型直接將磁盤上的物理文件直接映射到用戶態的內存地址中(這種Mmap的方式減小了傳統IO將磁盤文件數據在操做系統內核地址空間的緩衝區和用戶應用程序地址空間的緩衝區之間來回進行拷貝的性能開銷),將對文件的操做轉化爲直接對內存地址進行操做,從而極大地提升了文件的讀寫效率
參考:
歡迎關注個人公衆號