【RocketMQ源碼學習】- 5. 消息存儲機制

前言

面試官:你瞭解RocketMQ是如何存儲消息的嗎?
我:額,,,你等下,我看下這篇文字, (逃html

因爲這部份內容優勢多,因此請哥哥姐姐們自備茶水,歡迎留言!node

 

RocketMQ存儲設計是高可用和高性能的保證, 利用磁盤存儲來知足海量堆積能力。Kafka單機在topic數量在100+的時候,性能會降低不少,而RocketMQ可以在多個topic存在時,依然保持高性能面試

下面主要從存儲結構、存儲流程、存儲優化的技術來造成文字緩存

基於的版本是RocketMQ4.5.2數據結構

 

存儲架構圖

  1. 要發送的消息,會按順序寫入commitlog中,這裏全部topic和queue共享一個文件
  2. 存入commitlog後,因爲消息會按照topic緯度來消費,會異步構建consumeQueue(邏輯隊列)和index(索引文件),consumeQueue存儲消息的commitlogOffset/messageSize/tagHashCode, 方便定位commitlog中的消息實體。每一個 Topic下的每一個Message Queue都有一個對應的ConsumeQueue文件。索引文件(Index)提供消息檢索的能力,主要在問題排查和數據統計等場景應用
  3. 消費者會從consumeQueue取到msgOffset,方便快速取出消息 

好處

  1. CommitLog 順序寫 ,能夠大大提升寫人效率,提升堆積能力
  2. 雖然是隨機讀,可是利用操做系統的pagecache機制,能夠批量地從磁盤讀取,做爲cache存到內存中,加速後續的讀取速度
  3. 在實際狀況中,大部分的 ConsumeQueue可以被所有讀人內存,因此這個中間結構的操做速度很快, 能夠認爲是內存讀取的速度

消息文件存儲的結構設計

存儲的文件主要分爲:架構

  • commitlog: 存儲消息實體
  • consumequeue: 按Topic和隊列存儲消息的offset
  • index: index按key、tag、時間等存儲

commitlog(物理隊列)

文件地址:${user.home} \store${commitlog}${fileName}app

commitlog特色:異步

  • 存放該broke全部topic的消息
  • 默認1G大小
  • 以偏移量爲文件名,當一個文件寫滿時則建立新文件,這樣的設計主要是方便根據消息的物理偏移量,快速定位到消息所在的物理文件
  • 一個消息存儲單元是不定長的
  • 順序寫可是隨機讀

消息單元的存儲結構

下面的表格說明了,每一個消息體不是定長的,會存儲消息的哪些內容,包括物理偏移量、consumeQueue的偏移量、消息體等信息ide

順序 字段名 說明
1 totalSize(4Byte) 消息大小
2 magicCode(4) 設置爲daa320a7 (這個不太明白)
3 bodyCRC(4) 當broker重啓recover時會校驗
4 queueId(4) 消息對應的consumeQueueId
5 flag(4) rocketmq不作處理,只存儲後透傳
6 queueOffset(8) 消息在consumeQueue中的偏移量
7 physicalOffset(8) 消息在commitlog中的偏移量
8 sysFlg(4) 事務標示,NOT_TYPE/PREPARED_TYPE/COMMIT_TYPE/ROLLBACK_TYPE
9 bronTimestamp(8) 消息產生端(producer)的時間戳
10 bronHost(8) 消息產生端(producer)地址(address:port)
11 storeTimestamp(8) 消息在broker存儲時間
12 storeHostAddress(8) 消息存儲到broker的地址(address:port)
13 reconsumeTimes(4) 消息重試次數
14 preparedTransactionOffset(8) 事務消息的物理偏移量
15 bodyLength(4) 消息長度,最長不超過4MB
16 body(body length Bytes) 消息體內容
17 topicLength(1) 主題長度,最長不超過255Byte
18 topic(topic length Bytes) 主題內容
19 propertiesLength(2) 消息屬性長度,最長不超過65535Bytes
20 properties(properties length Bytes) 消息屬性內容

 

consumequeue文件(邏輯隊列)

文件地址:${user.home}\store\consumeQueue${topic}${queueId}${fileName}性能

consumequeue文件特色:

  • 按topic和queueId緯度分別存儲消息commitLogOffset、size、tagHashCode
  • 以偏移量爲文件名
  • 一個存儲單元是20個字節的定長的
  • 順序讀順序寫
  • 每一個ConsumeQueue文件大小約5.72M

每一個Topic下的每一個MessageQueue都有一個對應的ConsumeQueue文件
該結構對應於消費者邏輯隊列,爲何要將一個topic抽象出不少的queue呢?這樣的話,對集羣模式更有好處,可使多個消費者共同消費,而不用上鎖;

消息單元的存儲結構

順序 字段名 說明
1 offset(8) commitlog的偏移量
2 size(4) commitlog消息大小
3 tagHashCode tag的哈希值

 

index索引文件

文件地址:${user.home}\store\index${fileName}

index文件特色:

  • 以時間做爲文件名
  • 一個存儲單元是20個字節定長的

索引文件(Index)提供消息檢索的能力,主要在問題排查和數據統計等場景應用

存儲單元的結構

順序 字段名 說明
1 keyHash(4) key的結構是
2 phyOffset(8) commitLog真實的物理位移
3 timeOffset(4) 時間偏移量
4 slotValue(4) 下一個記錄的slot值

 

消息存儲流程

RocketMQ文件存儲模型層次結構

層次從上到下依次爲:

  1. 業務層
    • QueueMessageProcessor類
    • PullMessageProcessor類
    • SendMessageProcessor類
    • DefaultMessageStore類
  2. 存儲邏輯層
    • IndexService類
    • ConsumeQueue類
    • CommitLog類
    • IndexFile類
    • MappedFileQueue類
  3. 磁盤交互IO層
    • MappedFile類
    • MappedByteBuffer類
業務層 QueueMessageProcessor PullMessageProcessor
SendMessageProcessor
DefaultMessageStore
存儲邏輯層 IndexService ConsumeQueue CommitLog
IndexFile MappedFileQueue
磁盤交互IO層 MappedFile
MappedByteBuffer
Disk

 

寫commoitlog流程

1. DefaultMessageStore,入口方法是putMessage方法

RocketMQ 的存儲核心類爲 DefaultMessageStore,入口方法是putMessage方法

 1 // DefaultMessageStore#putMessage
 2 public PutMessageResult putMessage(MessageExtBrokerInner msg) {
 3     // 判斷該服務是否shutdown,不可用直接返回【代碼省略】
 4     // 判斷broke的角色,若是是從節點直接返回【代碼省略】
 5     // 判斷runningFlags是不是可寫狀態,不可寫直接返回,可寫把printTimes設爲0【代碼省略】
 6     // 判斷topic名字是否大於byte字節127, 大於則直接返回【代碼省略】
 7     // 判斷msg中properties屬性長度是否大於short最大長度32767,大於則直接返回【代碼省略】
 8 
 9     if (this.isOSPageCacheBusy()) { // 判斷操做系統頁寫入是否繁忙
10         return new PutMessageResult(PutMessageStatus.OS_PAGECACHE_BUSY, null);
11     }
12 
13     long beginTime = this.getSystemClock().now();
14     PutMessageResult result = this.commitLog.putMessage(msg);   // $2 查看下方代碼,寫msg核心
15 
16     long elapsedTime = this.getSystemClock().now() - beginTime;
17     if (elapsedTime > 500) {
18         log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
19     }
20     // 記錄寫commitlog時間,大於最大時間則設置爲這個最新的時間
21     this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
22 
23     if (null == result || !result.isOk()) {
24         // 記錄寫commitlog 失敗次數
25         this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
26     }
27 
28     return result;
29 }

$2 CommitLog#putMessage 將日誌寫入CommitLog 文件

public PutMessageResult putMessage(final MessageExtBrokerInner msg) {
    // Set the storage time
    msg.setStoreTimestamp(System.currentTimeMillis());
    // Set the message body BODY CRC (consider the most appropriate setting
    // on the client)
    msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
    // Back to Results
    AppendMessageResult result = null;

    StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

    String topic = msg.getTopic();
    int queueId = msg.getQueueId();

    final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());  // $1
    if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
        || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) { // $2
        // Delay Delivery
        if (msg.getDelayTimeLevel() > 0) {
            if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
            }

            topic = ScheduleMessageService.SCHEDULE_TOPIC;
            queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

            // Backup real topic, queueId
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
            MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
            msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

            msg.setTopic(topic);
            msg.setQueueId(queueId);
        }
    }

    long elapsedTimeInLock = 0;
    MappedFile unlockMappedFile = null;
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();   // $3

    putMessageLock.lock(); //spin or ReentrantLock ,depending on store config // $4
    try {
        long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
        this.beginTimeInLock = beginLockTimestamp;

        // Here settings are stored timestamp, in order to ensure an orderly
        // global
        msg.setStoreTimestamp(beginLockTimestamp);

        if (null == mappedFile || mappedFile.isFull()) {    // $5 
            mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
        }
        if (null == mappedFile) {
            log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
            beginTimeInLock = 0;
            return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null);
        }

        result = mappedFile.appendMessage(msg, this.appendMessageCallback); // $6
        switch (result.getStatus()) {   // $7
            case PUT_OK:
                break;
            case END_OF_FILE:   
                unlockMappedFile = mappedFile;
                // Create a new file, re-write the message
                mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                if (null == mappedFile) {   
                    // XXX: warn and notify me
                    log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                    beginTimeInLock = 0;
                    return new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result);
                }
                result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                break;
            case MESSAGE_SIZE_EXCEEDED:
            case PROPERTIES_SIZE_EXCEEDED:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result);
            case UNKNOWN_ERROR:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
            default:
                beginTimeInLock = 0;
                return new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result);
        }

        elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
        beginTimeInLock = 0;
    } finally {
        putMessageLock.unlock();
    }

    if (elapsedTimeInLock > 500) {
        log.warn("[NOTIFYME]putMessage in lock cost time(ms)={}, bodyLength={} AppendMessageResult={}", elapsedTimeInLock, msg.getBody().length, result);
    }

    if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) {
        this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
    }

    PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

    // Statistics 
    storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
    storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

    handleDiskFlush(result, putMessageResult, msg); // $8
    handleHA(result, putMessageResult, msg);        // $9

    return putMessageResult;
}
  1. $1 獲取消息的事務類型
  2. $2 對於事務消息中UNKNOW、COMMIT消息,處理topic和queueId, 同時備份real_topic,real_queueId
  3. $3 獲取最新的mappedFile文件,有可能爲空
  4. $4 給寫mappedFile加鎖(默認自旋鎖)
  5. $5 mappedFile爲空時建立mappedFile文件, 建立的mappedFile文件offset爲0
  6. $6 在mappedFile中append消息,下面具體說明
  7. $7 根據mappedFile寫消息的結果
    • ok, 直接break
    • 文件剩下的空間不夠寫了,從新建立一個mappedFile文件, 從新寫消息
    • msg大小,properties大小,未知錯誤,返回錯誤類型
  8. $8 執行刷盤
  9. $9 執行主從同步

3. $6 在mappedFile中append消息

mappedFile.appendMessage方法會調用this.appendMessagesInner方法

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
    assert messageExt != null;
    assert cb != null;

    int currentPos = this.wrotePosition.get();  // $1

    if (currentPos < this.fileSize) {
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice(); // $2
        byteBuffer.position(currentPos);
        AppendMessageResult result;
        if (messageExt instanceof MessageExtBrokerInner) {  // $3
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt); // $4
        } else if (messageExt instanceof MessageExtBatch) {
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
        } else {
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        }
        this.wrotePosition.addAndGet(result.getWroteBytes());   // $5
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    }
    log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
}
  1. $1 獲取當前寫入位置
  2. $2 建立寫緩存,放入文件的寫入位置
  3. $3 判斷是單條消息仍是批量消息
  4. $4 同步寫消息, fileSize-currentPos即爲該文件還剩下的空白大小
  5. $5 寫完消息,累加文件當前位置

4. $4 同步寫消息

代碼在CommitLog內部類 DefaultAppendMessageCallback中 

// CommitLog$DefaultAppendMessageCallback#doAppend
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner) {
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    long wroteOffset = fileFromOffset + byteBuffer.position();  // $1
    this.resetByteBuffer(hostHolder, 8);    // $2
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    Long queueOffset = CommitLog.this.topicQueueTable.get(key); // $3
    if (null == queueOffset) {
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    }

    // Transaction messages that require special handling
    final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
    switch (tranType) {
        // Prepared and Rollback message is not consumed, will not enter the
        // consumer queuec
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:  // $4
            queueOffset = 0L;
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
        default:
            break;
    }

    // Serialize message        // $5
    final byte[] propertiesData =
        msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);
    final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;
    if (propertiesLength > Short.MAX_VALUE) {
        log.warn("putMessage message properties length too long. length={}", propertiesData.length);
        return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
    }

    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;
    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
    final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

    // Exceeds the maximum message
    if (msgLen > this.maxMessageSize) {
        CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
            + ", maxMessageSize: " + this.maxMessageSize);
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    }

    // Determines whether there is sufficient free space
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {  // $6
        this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
        
        this.msgStoreItemMemory.putInt(maxBlank);   // 1 TOTALSIZE
        this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE); // 2 MAGICCODE
        // 3 The remaining space may be any value
        // Here the length of the specially set maxBlank
        final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
        byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    }

    // $7 【代碼省略】
    
    if (propertiesLength > 0) this.msgStoreItemMemory.put(propertiesData);
    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);     // $8

    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,    // $9
        msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

    switch (tranType) {
        case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
        case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
            break;
        case MessageSysFlag.TRANSACTION_NOT_TYPE:
        case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
            // The next update ConsumeQueue information
            CommitLog.this.topicQueueTable.put(key, ++queueOffset);
            break;
        default:
            break;
    }
    return result;
}
  1. $1 計算消息的物理偏移量=文件初始偏移量+byteBuffer開始的偏移量,文件初始偏移量跟commitlog文件名相同
  2. $2 在讀buffer以前,調用flip方法翻轉buffer(設置position爲0,limit設置爲8)
  3. $3 在topicQueueTable中緩存msg對應的offset
  4. $4 針對事務消息的prepare、rollback消息,因爲這個消息不須要對消費這可見,因此queueOffset=0,不記到consumerQueue
  5. $5 序列化properties,topic,計算消息最大值
  6. $6 若是消息長度+8大於MapperFile剩餘文件空間,則返回END_OF_FILE, 拋給上層,由CommitLog#putMessage這層從新建立文件,從新寫消息
  7. $7 根據commitlog的數據結構,構建commitlog數據,如TOTALSIZE,MAGICCODE 。。等等
  8. $8 把構建的this.msgStoreItemMemory寫到byteBuffer中(內存中)
  9. $9 生成返回值
  10. $10 針對提交事務消息,從新放入topicQueueTable ??? 

異步構建ConsumeQueue和Index文件流程

  1. ConsumeQueue和IndexFile何時創建的呢?
    – 在Broker啓動的時候,會啓動一個ReputMessageService線程服務, 會去設置consumeQueueTable內存中最大的偏移量
long maxPhysicalPosInLogicQueue = commitLog.getMinOffset();
for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {
    for (ConsumeQueue logic : maps.values()) {
        if (logic.getMaxPhysicOffset() > maxPhysicalPosInLogicQueue) {
            maxPhysicalPosInLogicQueue = logic.getMaxPhysicOffset();
        }
    }
}
if (maxPhysicalPosInLogicQueue < 0) {
    maxPhysicalPosInLogicQueue = 0;
}
if (maxPhysicalPosInLogicQueue < this.commitLog.getMinOffset()) {
    maxPhysicalPosInLogicQueue = this.commitLog.getMinOffset();
    log.warn("[TooSmallCqOffset] maxPhysicalPosInLogicQueue={} clMinOffset={}", maxPhysicalPosInLogicQueue, this.commitLog.getMinOffset());
}
this.reputMessageService.start();
  1. ReputMessageService線程每隔1ms執行doReput操做->根據CommitLog最新追加到的消息不斷生成:
  • 消息的offset到CommitQueue
  • 消息索引到IndexFile
  1. 下面查看下doReput方法具體執行
private void doReput() {
    if (this.reputFromOffset < DefaultMessageStore.this.commitLog.getMinOffset()) { // $1
        log.warn("The reputFromOffset={} is smaller than minPyOffset={}, this usually indicate that the dispatch behind too much and the commitlog has expired.",
            this.reputFromOffset, DefaultMessageStore.this.commitLog.getMinOffset());
        this.reputFromOffset = DefaultMessageStore.this.commitLog.getMinOffset();
    }
    for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) { // $2
        if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()
            && this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {   
            break;
        }
        SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);  // $3
        if (result != null) {
            try {
                this.reputFromOffset = result.getStartOffset(); // $4

                for (int readSize = 0; readSize < result.getSize() && doNext; ) {
                    DispatchRequest dispatchRequest =
                        DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false); // $5 構建dispatchRequest
                    int size = dispatchRequest.getBufferSize() == -1 ? dispatchRequest.getMsgSize() : dispatchRequest.getBufferSize();

                    if (dispatchRequest.isSuccess()) {
                        if (size > 0) {
                            DefaultMessageStore.this.doDispatch(dispatchRequest);   // $6 

                            if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() // 若是該broker是主broker,能夠推送消息到達conusmerQueue的消息,這裏用戶也客戶自定定推送的監聽
                                && DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {
                                DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),
                                    dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,
                                    dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),
                                    dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());
                            }

                            this.reputFromOffset += size;   // $7
                            readSize += size;
                            if (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();
                                DefaultMessageStore.this.storeStatsService
                                    .getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic())
                                    .addAndGet(dispatchRequest.getMsgSize());
                            }
                        } else if (size == 0) {
                            this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);
                            readSize = result.getSize();
                        }
                    } else if (!dispatchRequest.isSuccess()) {

                        if (size > 0) { // &8
                            log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);
                            this.reputFromOffset += size;
                        } else {
                            doNext = false;
                            // If user open the dledger pattern or the broker is master node,
                            // it will not ignore the exception and fix the reputFromOffset variable
                            if (DefaultMessageStore.this.getMessageStoreConfig().isEnableDLegerCommitLog() ||
                                DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {
                                log.error("[BUG]dispatch message to consume queue error, COMMITLOG OFFSET: {}",
                                    this.reputFromOffset);
                                this.reputFromOffset += result.getSize() - readSize;
                            }
                        }
                    }
                }
            } finally {
                result.release();
            }
        } else {
            doNext = false;
        }
    }
}
  • doReput流程:
    1. $1 若是reputFromOffset小於文件起始偏移量,則把reputFromOffset設置爲文件起始偏移量,出現的可能緣由:磁盤損壞,認爲人爲了文件等
    2. $2 由於reputFromOffset是consumeQueue中的偏移量,因此只要reputFromOffset小於commitlog最大偏移量,就會不斷的循環
    3. $3 根據offset獲取byteBuffer
    4. $4 更新reputFromOffset成byteBuffer中的offset
    5. $5 構建dispatchRequest
    6. $6 分別調用CommitLogDispatcherBuildConsumeQueue(構建消息消費隊列)和CommitLogDispatcherBuildIndex(構建索引文件)
    7. $7 讀完這條消息,更新reputFromOffset+=size,更新readSize+=size
    8. $8 不成功,若是這個消息的size不爲0,嘗試下一條
  1. 根據消息更新ConsumeQueue
    在doReput方法中$6中會更新consumeQueue, 消息消費隊列轉發的任務實現類爲:CommitLogDispatcherBuildConsumeQueue,內部實際調用的是putMessagePositionInfo方法

  Step1: 根據topicId和queueId獲取ConsumeQueue
  Step2: 將消息偏移量、消息size、tagHashCode(查看ConsumeQueue的數據結構)),把消息追加到ConsumeQueue的內存映射文件(mappedFile)中(不刷盤),consumeQueue默認異步刷盤

1 return mappedFile.appendMessage(this.byteBufferIndex.array());
  1. 根據消息更新Index索引文件
    Hash索引文件轉發任務實現類:CommitLogDispatcherBuildIndex

    若是messageIndexEnable設置爲true, 則轉發此任務,不然不轉發
    step1: 獲取indexFile, 若是indexFileList的內存中沒有indexFile,則根據路徑從新構建indexFile
    step2: 若是消息的惟一鍵不存在,則條件到放到indexFile中

說說存儲的類與文件

 DefaultMessageStore類核心屬性

上面說到DefaultMessageStore是存儲的業務層,putMessage是入口方法

  • messageStoreConfig
    • 存儲相關的配置,例如存儲路徑、commitLog文件大小,刷盤頻次等等。
  • CommitLog commitLog
    • comitLog 的核心處理類,消息存儲在 commitlog 文件中。
  • ConcurrentMap<String/* topic /, ConcurrentMap<Integer/ queueId */, ConsumeQueue>> consumeQueueTable
    • topic 的隊列信息。
  • FlushConsumeQueueService flushConsumeQueueService
    • ConsumeQueue 刷盤服務線程。
  • CleanCommitLogService cleanCommitLogService
    commitLog 過時文件刪除線程。
  • CleanConsumeQueueService cleanConsumeQueueService
    • consumeQueue 過時文件刪除線程。、
  • IndexService indexService
    • 索引服務。
  • AllocateMappedFileService allocateMappedFileService
    • MappedFile 分配線程,RocketMQ 使用內存映射處理 commitlog、consumeQueue文件。
  • ReputMessageService reputMessageService
    • reput 轉發線程(負責 Commitlog 轉發到 Consumequeue、Index文件)。
  • HAService haService
    • 主從同步實現服務。
  • ScheduleMessageService scheduleMessageService
    • 定時任務調度器,執行定時任務。
  • StoreStatsService storeStatsService
    • 存儲統計服務。
  • TransientStorePool transientStorePool
    • ByteBuffer 池
  • RunningFlags runningFlags
    • 存儲服務狀態。
  • BrokerStatsManager brokerStatsManager
    • Broker 統計服務。
  • MessageArrivingListener messageArrivingListener
    • 消息達到監聽器。
  • StoreCheckpoint storeCheckpoint
    • 刷盤檢測點。
  • LinkedList dispatcherList
    • 轉發 comitlog 日誌,主要是從 commitlog 轉發到 consumeQueue、index 文件。

從上面的屬性能夠觀察到有幾類屬性:

  • 服務類:如刷盤服務線程、刪除文件線程、索引服務、mappedFile分配線程、reput轉發線程、主從同步線程、定時任務服務、broker統計服務
  • 配置類:存儲設置類
  • 存儲信息類:commitLog、consumeQueueTable topic隊列信息、transientStorePool ByteBuffer池、刷盤檢測點、dispatcherList
  • 監聽器:消息達到監聽器

刷盤

這裏會另起一篇文字來講明

 

執行主從同步

這裏會另起一篇文字來講明

PageCache(頁緩存)與Mmap內存映射 

pageCache定義

Page cache 也叫頁緩衝或文件緩衝,是由好幾個磁盤塊構成,大小一般爲4k,在64位系統上爲8k,構成的幾個磁盤塊在物理磁盤上不必定連續,文件的組織單位爲一頁, 也就是一個page cache大小,文件讀取是由外存上不連續的幾個磁盤塊,到buffer cache,而後組成page cache,而後供給應用程序。

pageCache加載

操做系統操做I/O時,會先在pageCache中查找,若是未命中,則啓動磁盤I/O,並把磁盤文件中的數據加載到pageCache的一個空閒快中,而後在copy到用戶緩衝區 

pageCache預讀

對於每一個文件的第一個讀請求操做,系統在讀入所請求頁面的同時會順序讀入後面少數幾個頁面 

pageCache與RocketMQ的關聯

MQ讀取消息依賴系統PageCache,PageCache命中率越高,讀性能越高

ConsumeQueue邏輯消費隊列是順序讀取,在pageCache機制的預讀取做用下,ConsumeQueue的讀性能會比較高近乎內存,即便在有消息堆積狀況下也不會影響性能。

Mmap內存映射技術—MappedByteBuffer

另外,RocketMQ主要經過MappedByteBuffer對文件進行讀寫操做。其中,利用了NIO中的FileChannel模型直接將磁盤上的物理文件直接映射到用戶態的內存地址中(這種Mmap的方式減小了傳統IO將磁盤文件數據在操做系統內核地址空間的緩衝區和用戶應用程序地址空間的緩衝區之間來回進行拷貝的性能開銷),將對文件的操做轉化爲直接對內存地址進行操做,從而極大地提升了文件的讀寫效率 

使用mmap內存映射的限制

  • 每次只能映射1.5左右的文件至用戶態的虛擬內存,這也是爲什麼RocketMQ默認設置單個CommitLog日誌數據文件爲1G的緣由
  • MMAP 使用的是虛擬內存,和 PageCache 同樣是由操做系統來控制刷盤的,雖然能夠經過 force() 來手動控制,但這個時間把握很差,在小內存場景下會很使人頭疼。
  • 會存在內存佔用率較高和文件關閉不肯定性的問題

結語

參考:

歡迎關注個人公衆號

 

 


相關文章
相關標籤/搜索