RocketMQ源碼解析-Broker消息存CommitLog

博客:blog.ljbmxsm.com
如下源碼基於Rocket MQ 4.7.0java

1 CommitLog格式

2 CommitLog持久化過程

這裏咱們經過分析源碼來看一下從生產者把數據提交到Broker而後如何寫入到磁盤上的CommitLog文件中的。git

首先Broker模塊中BrokerStartu類做爲主啓動類:github

public static void main(String[] args) {
    start(createBrokerController(args));
}
複製代碼

經過調用 createBrokerController 方法來建立BrokerController。在 createBrokerController 方法中有一個有這樣一段代碼:apache

final BrokerController controller = new BrokerController(
                brokerConfig,
                nettyServerConfig,
                nettyClientConfig,
                messageStoreConfig);
            // remember all configs to prevent discard
            controller.getConfiguration().registerConfig(properties);
			
            boolean initResult = controller.initialize();
複製代碼

經過 controller.initialize() 來對 BrokerController 來進行初始化。在初始的過程當中,建立了用來處理處理生產者發送的MQ消息數據的類,這個類叫作 SendMessageProcessor 。這個類在 controller.initialize() 經過 BrokerController.registerProcessor 方法來注入。併發

public boolean initialize() throws CloneNotSupportedException {
    
    //省略代碼
   this.registerProcessor();
    
    //省略代碼
    
}


public void registerProcessor() {
        /** * SendMessageProcessor */
        SendMessageProcessor sendProcessor = new SendMessageProcessor(this);
        sendProcessor.registerSendMessageHook(sendMessageHookList);
        sendProcessor.registerConsumeMessageHook(consumeMessageHookList);

        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);
        this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);
 
 	//後續代碼省略
 }
複製代碼

因此經過上面的分析處理消息主要是經過 SendMessageProcessor 來進行處理。app

接下來看一下 SendMessageProcessor 的源碼:dom

public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
	
}
複製代碼

主要是經過繼承 AbstractSendMessageProcessor 抽象類和實現 NettyRequestProcessor 兩個接口。數據處理主要是經過實現 NettyRequestProcessor.processRequest 方法來處理各類不一樣類型的消息。看一下 SendMessageProcessor.processRequest 的實現:異步

@Override
    public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        RemotingCommand response = null;
        try {
            response = asyncProcessRequest(ctx, request).get();
        } catch (InterruptedException | ExecutionException e) {
            log.error("process SendMessage error, request : " + request.toString(), e);
        }
        return response;
    }
複製代碼

兩個參數:async

  • ctx : netty的相關參數
  • request: 類型RemotingCommand,這個是RocketMQ的消息協議的抽象(rocketmq-模塊設計的文章)

而後調用 asyncProcessRequest 來獲取結果。接下來看一下這個方法的實現:ide

注意: 從方法名稱來看是一個異步的調用過程,可是經過等待返回值來達到一個同步的過程。其實在後臺的不少實現都是異步調用,而後經過等待返回結果實現同步的過程。

public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {
        final SendMessageContext mqtraceContext;
        switch (request.getCode()) {
                ///這個分支是消費失敗將消息從新發回Broker纔會走
            case RequestCode.CONSUMER_SEND_MSG_BACK:
                return this.asyncConsumerSendMsgBack(ctx, request);
            default:
                SendMessageRequestHeader requestHeader = parseRequestHeader(request);
                if (requestHeader == null) {
                    return CompletableFuture.completedFuture(null);
                }
                mqtraceContext = buildMsgContext(ctx, requestHeader);
                //SendMessageHook 接口的處理
                this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
                if (requestHeader.isBatch()) {
                    //批量消息處理
                    return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
                } else {
                    //單個消息處理
                    return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
                }
        }
    }
複製代碼

上面處理數據就是兩大類數據:

  • 消費失敗的數據RequestCode.CONSUMER_SEND_MSG_BACK
  • 生產者的發送數據,這個數據又包含兩大類
    • RequestCode.SEND_BATCH_MESSAGE 批量數據
    • RequestCode.SEND_MESSAGE_V2 和 RequestCode.SEND_MESSAGE 單個數據 (只是版本不同)

因此上面主要有兩類處理一個是單個消息 asyncSendMessage f方法和 asyncSendBatchMessage 處理批量發送的數據。這裏只分析單個數據的存儲(多個數據原理差很少)。

首先是對數據進行處理和一些前期的校驗以下代碼:

final RemotingCommand response = preSend(ctx, request, requestHeader);
        final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();

        if (response.getCode() != -1) {
            return CompletableFuture.completedFuture(response);
        }

        final byte[] body = request.getBody();

        int queueIdInt = requestHeader.getQueueId();
        TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());

        if (queueIdInt < 0) {
            queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
        }
複製代碼

而後構建 MessageExtBrokerInner 在Broker內部使用的對象類,這裏主要是設置一些Broker的信息到消息中以下:

MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
        msgInner.setTopic(requestHeader.getTopic());
        msgInner.setQueueId(queueIdInt);

        if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig)) {
            return CompletableFuture.completedFuture(response);
        }

        msgInner.setBody(body);
        msgInner.setFlag(requestHeader.getFlag());
        MessageAccessor.setProperties(msgInner, MessageDecoder.string2messageProperties(requestHeader.getProperties()));
        msgInner.setPropertiesString(requestHeader.getProperties());
        msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
        msgInner.setBornHost(ctx.channel().remoteAddress());
        msgInner.setStoreHost(this.getStoreHost());
        msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
複製代碼

而後就是存儲消息處理:

CompletableFuture<PutMessageResult> putMessageResult = null;
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
//判斷是否爲事務消息
if (transFlag != null && Boolean.parseBoolean(transFlag)) {
    if (this.brokerController.getBrokerConfig().isRejectTransactionMessage()) {
        response.setCode(ResponseCode.NO_PERMISSION);
        response.setRemark(
                "the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
                        + "] sending transaction message is forbidden");
        return CompletableFuture.completedFuture(response);
    }
    putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
} else {
    putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
}
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
複製代碼

這裏有消息分爲兩種:

  • 事務消息(之後分析處理)
  • 普通消息

存儲經過調用 MessageStore.asyncPutMessage 方法,而 MessageStore 的實現爲 DefaultMessageStore 。 看一下 DefaultMessageStore.asyncPutMessage 實現:

@Override
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) {

        //檢測存儲狀態
        PutMessageStatus checkStoreStatus = this.checkStoreStatus();
        if (checkStoreStatus != PutMessageStatus.PUT_OK) {
            return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
        }

        //校驗消息的topic和Properties
        PutMessageStatus msgCheckStatus = this.checkMessage(msg);
        if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) {
            return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
        }

        long beginTime = this.getSystemClock().now();
        //消息處理
        CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

        putResultFuture.thenAccept((result) -> {
            long elapsedTime = this.getSystemClock().now() - beginTime;
            if (elapsedTime > 500) {
                log.warn("putMessage not in lock elapsed time(ms)={}, bodyLength={}", elapsedTime, msg.getBody().length);
            }
            this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

            if (null == result || !result.isOk()) {
                this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
            }
        });

        return putResultFuture;
    }
複製代碼

經過 CommitLog.asyncPutMessage 方法來持久化數據,在這個方法中主要主要作了三件事情:

  • 設置消息Body的CRC校驗,後期讀取數據要用到。

    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
            //設置存儲的時間
            msg.setStoreTimestamp(System.currentTimeMillis());
            //設置CRC的校驗值
            msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        
        // ..........省略代碼
    }
    複製代碼
  • 延遲消息的處理

    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
    
        //省略代碼
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                    || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) {
                // Delay Delivery
                if (msg.getDelayTimeLevel() > 0) {
                    if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {
                        msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                    }
    
                    topic = ScheduleMessageService.SCHEDULE_TOPIC;
                    queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
    
                    // Backup real topic, queueId
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                    MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                    msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
    
                    msg.setTopic(topic);
                    msg.setQueueId(queueId);
                }
            }
        
        //省略代碼
    }
    複製代碼

    這裏是處理不一樣延遲等級的延遲消費消息的數據。

  • CommitLog的數據處理

    message消息的處理也分爲了三步:

    1. CommitLog的提交

      public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) {
           //省略代碼
           putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
              try {
                  long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
                  this.beginTimeInLock = beginLockTimestamp;
      
                  // Here settings are stored timestamp, in order to ensure an orderly
                  // global
                  msg.setStoreTimestamp(beginLockTimestamp);
      
                  if (null == mappedFile || mappedFile.isFull()) {
                      mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
                  }
                  if (null == mappedFile) {
                      log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                      beginTimeInLock = 0;
                      return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
                  }
      			//關鍵代碼--添加CommitLog日誌消息
                  result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                  switch (result.getStatus()) {
                      case PUT_OK:
                          break;
                      case END_OF_FILE:
                          unlockMappedFile = mappedFile;
                          // Create a new file, re-write the message
                          mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                          if (null == mappedFile) {
                              // XXX: warn and notify me
                              log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                              beginTimeInLock = 0;
                              return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                          }
                          result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                          break;
                      case MESSAGE_SIZE_EXCEEDED:
                      case PROPERTIES_SIZE_EXCEEDED:
                          beginTimeInLock = 0;
                          return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                      case UNKNOWN_ERROR:
                          beginTimeInLock = 0;
                          return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                      default:
                          beginTimeInLock = 0;
                          return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                  }
      
                  elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
                  beginTimeInLock = 0;
              } finally {
                  putMessageLock.unlock();
              }
           //省略代碼
       }
      複製代碼
    2. CommitLog的刷盤(同步刷盤和異步刷盤兩種)

      CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, putMessageResult, msg);
      
          public CompletableFuture<PutMessageStatus> submitFlushRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
              // 判斷刷盤的方式---默認值爲FlushDiskType.ASYNC_FLUSH 異步刷盤的方式
              if (FlushDiskType.SYNC_FLUSH == this.defaultMessageStore.getMessageStoreConfig().getFlushDiskType()) {
                  final GroupCommitService service = (GroupCommitService) this.flushCommitLogService;
                  if (messageExt.isWaitStoreMsgOK()) {
                      GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                              this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                      service.putRequest(request);
                      return request.future();
                  } else {
                      service.wakeup();
                      return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
                  }
              }
              // Asynchronous flush
              else {
                  if (!this.defaultMessageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {
                      flushCommitLogService.wakeup();
                  } else  {
                      commitLogService.wakeup();
                  }
                  return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
              }
          }
      複製代碼

      經過配置文件能夠配置刷盤的方式,默認的刷盤方式爲異步刷盤方式(根據官網的說明若是使用防止消息的丟失可使用同步刷盤方式可是同步刷盤會影響併發)。

    3. 提交給Slave同步(同步和異步兩種)

      CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, putMessageResult, msg);
      
      public CompletableFuture<PutMessageStatus> submitReplicaRequest(AppendMessageResult result, PutMessageResult putMessageResult, MessageExt messageExt) {
          	//默認的是BrokerRole.ASYNC_MASTER 因此也是異步的方式
              if (BrokerRole.SYNC_MASTER == this.defaultMessageStore.getMessageStoreConfig().getBrokerRole()) {
                  HAService service = this.defaultMessageStore.getHaService();
                  if (messageExt.isWaitStoreMsgOK()) {
                      if (service.isSlaveOK(result.getWroteBytes() + result.getWroteOffset())) {
                          GroupCommitRequest request = new GroupCommitRequest(result.getWroteOffset() + result.getWroteBytes(),
                                  this.defaultMessageStore.getMessageStoreConfig().getSyncFlushTimeout());
                          service.putRequest(request);
                          service.getWaitNotifyObject().wakeupAll();
                          return request.future();
                      }
                      else {
                          return CompletableFuture.completedFuture(PutMessageStatus.SLAVE_NOT_AVAILABLE);
                      }
                  }
              }
              return CompletableFuture.completedFuture(PutMessageStatus.PUT_OK);
          }
      複製代碼

上面能夠經過官網的一個圖片來講明這兩種狀況:

經過上面的代碼分析能夠知道主要是經過:

result = mappedFile.appendMessage(msg, this.appendMessageCallback);
複製代碼

這段代碼把message持久化,在RocketMQ的全部的文件都是經過 MappedFile 包裝進行處理的。下面來看一下 MappedFile.appendMessage 方法:

public AppendMessageResult appendMessage(final MessageExtBrokerInner msg, final AppendMessageCallback cb) {
        return appendMessagesInner(msg, cb);
    }
複製代碼

appendMessage 方法調用了 MappedFile.appendMessagesInner 的內部方法。

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) {
        assert messageExt != null;
        assert cb != null;

        int currentPos = this.wrotePosition.get();
		//當前偏移量和文件大小作比較
        if (currentPos < this.fileSize) {
            ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
            byteBuffer.position(currentPos);
            AppendMessageResult result;
            //一樣這裏區分了處理批量消息和單個消息
            if (messageExt instanceof MessageExtBrokerInner) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
            } else if (messageExt instanceof MessageExtBatch) {
                result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
            } else {
                return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
            }
            this.wrotePosition.addAndGet(result.getWroteBytes());
            this.storeTimestamp = result.getStoreTimestamp();
            return result;
        }
        log.error("MappedFile.appendMessage return null, wrotePosition: {} fileSize: {}", currentPos, this.fileSize);
        return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
    }
複製代碼

經過代碼發現代碼裏面經過調用 AppendMessageCallback.doAppend 來處理數據, 在 CommitLog 類**AppendMessageCallback** 中有一個 的內部類的實現 DefaultAppendMessageCallback

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank, final MessageExtBrokerInner msgInner) {
            // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

            // PHY OFFSET
            long wroteOffset = fileFromOffset + byteBuffer.position();

            int sysflag = msgInner.getSysFlag();

            int bornHostLength = (sysflag & MessageSysFlag.BORNHOST_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            int storeHostLength = (sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0 ? 4 + 4 : 16 + 4;
            ByteBuffer bornHostHolder = ByteBuffer.allocate(bornHostLength);
            ByteBuffer storeHostHolder = ByteBuffer.allocate(storeHostLength);

            this.resetByteBuffer(storeHostHolder, storeHostLength);
            String msgId;
            if ((sysflag & MessageSysFlag.STOREHOSTADDRESS_V6_FLAG) == 0) {
                msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            } else {
                msgId = MessageDecoder.createMessageId(this.msgIdV6Memory, msgInner.getStoreHostBytes(storeHostHolder), wroteOffset);
            }

            // Record ConsumeQueue information
            keyBuilder.setLength(0);
            keyBuilder.append(msgInner.getTopic());
            keyBuilder.append('-');
            keyBuilder.append(msgInner.getQueueId());
            String key = keyBuilder.toString();
            Long queueOffset = CommitLog.this.topicQueueTable.get(key);
            if (null == queueOffset) {
                queueOffset = 0L;
                CommitLog.this.topicQueueTable.put(key, queueOffset);
            }

            // Transaction messages that require special handling
            final int tranType = MessageSysFlag.getTransactionValue(msgInner.getSysFlag());
            switch (tranType) {
                // Prepared and Rollback message is not consumed, will not enter the
                // consumer queuec
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    queueOffset = 0L;
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                default:
                    break;
            }

            /** * Serialize message */
            final byte[] propertiesData =
                msgInner.getPropertiesString() == null ? null : msgInner.getPropertiesString().getBytes(MessageDecoder.CHARSET_UTF8);

            final int propertiesLength = propertiesData == null ? 0 : propertiesData.length;

            if (propertiesLength > Short.MAX_VALUE) {
                log.warn("putMessage message properties length too long. length={}", propertiesData.length);
                return new AppendMessageResult(AppendMessageStatus.PROPERTIES_SIZE_EXCEEDED);
            }

            final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
            final int topicLength = topicData.length;

            final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

            final int msgLen = calMsgLength(msgInner.getSysFlag(), bodyLength, topicLength, propertiesLength);

            // Exceeds the maximum message
            if (msgLen > this.maxMessageSize) {
                CommitLog.log.warn("message size exceeded, msg total size: " + msgLen + ", msg body size: " + bodyLength
                    + ", maxMessageSize: " + this.maxMessageSize);
                return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
            }

            // Determines whether there is sufficient free space
            if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

            // Initialization of storage space
            this.resetByteBuffer(msgStoreItemMemory, msgLen);
            // 1 TOTALSIZE
            this.msgStoreItemMemory.putInt(msgLen);
            // 2 MAGICCODE
            this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
            // 3 BODYCRC
            this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
            // 4 QUEUEID
            this.msgStoreItemMemory.putInt(msgInner.getQueueId());
            // 5 FLAG
            this.msgStoreItemMemory.putInt(msgInner.getFlag());
            // 6 QUEUEOFFSET
            this.msgStoreItemMemory.putLong(queueOffset);
            // 7 PHYSICALOFFSET
            this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
            // 8 SYSFLAG
            this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
            // 9 BORNTIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
            // 10 BORNHOST
            this.resetByteBuffer(bornHostHolder, bornHostLength);
            this.msgStoreItemMemory.put(msgInner.getBornHostBytes(bornHostHolder));
            // 11 STORETIMESTAMP
            this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
            // 12 STOREHOSTADDRESS
            this.resetByteBuffer(storeHostHolder, storeHostLength);
            this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(storeHostHolder));
            // 13 RECONSUMETIMES
            this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
            // 14 Prepared Transaction Offset
            this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
            // 15 BODY
            this.msgStoreItemMemory.putInt(bodyLength);
            if (bodyLength > 0)
                this.msgStoreItemMemory.put(msgInner.getBody());
            // 16 TOPIC
            this.msgStoreItemMemory.put((byte) topicLength);
            this.msgStoreItemMemory.put(topicData);
            // 17 PROPERTIES
            this.msgStoreItemMemory.putShort((short) propertiesLength);
            if (propertiesLength > 0)
                this.msgStoreItemMemory.put(propertiesData);

            final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
            // Write messages to the queue buffer
            byteBuffer.put(this.msgStoreItemMemory.array(), 0, msgLen);

            AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
                msgInner.getStoreTimestamp(), queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);

            switch (tranType) {
                case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
                case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                    break;
                case MessageSysFlag.TRANSACTION_NOT_TYPE:
                case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                    // The next update ConsumeQueue information
                    CommitLog.this.topicQueueTable.put(key, ++queueOffset);
                    break;
                default:
                    break;
            }
            return result;
        }
複製代碼

這個方法講解CommitLog的整個數據組裝。

3 CommitLog持久化過程當中的重要類

以上的圖片說明了RocketMQ的不一樣分層的。

3.1 CommitLog

對CommitLog日誌的抽象和處理類

3.2 MappedFileQueue

映射的文件隊列,用來處理文件映射的隊列數據。好比CommitLog日誌文件

3.3 MappedFile

大文件的磁盤操做

相關文章
相關標籤/搜索