說在前面apache
broker啓動緩存
源碼解析微信
返回方法,取消註冊broker,org.apache.rocketmq.broker.BrokerController#unregisterBrokerAllapp
private void unregisterBrokerAll() {// =》this.brokerOuterAPI.unregisterBrokerAll(this.brokerConfig.getBrokerClusterName(),this.getBrokerAddr(),this.brokerConfig.getBrokerName(),this.brokerConfig.getBrokerId()); }
進入方法,org.apache.rocketmq.broker.out.BrokerOuterAPI#unregisterBrokerAll分佈式
public void unregisterBrokerAll(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) {// 獲取namesrv地址列表List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();if (nameServerAddressList != null) {for (String namesrvAddr : nameServerAddressList) {try {// =》this.unregisterBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId);log.info("unregisterBroker OK, NamesrvAddr: {}", namesrvAddr);} catch (Exception e) {log.warn("unregisterBroker Exception, {}", namesrvAddr, e);}}} }
進入方法,org.apache.rocketmq.broker.out.BrokerOuterAPI#unregisterBrokeride
public void unregisterBroker(final String namesrvAddr,final String clusterName,final String brokerAddr,final String brokerName,final long brokerId) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {UnRegisterBrokerRequestHeader requestHeader = new UnRegisterBrokerRequestHeader();requestHeader.setBrokerAddr(brokerAddr);requestHeader.setBrokerId(brokerId);requestHeader.setBrokerName(brokerName);requestHeader.setClusterName(clusterName);RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UNREGISTER_BROKER, requestHeader);// 同步執行RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, 3000);assert response != null;switch (response.getCode()) {case ResponseCode.SUCCESS: {return;}default:break;}throw new MQBrokerException(response.getCode(), response.getRemark()); }
進入方法,org.apache.rocketmq.remoting.netty.NettyRemotingClient#invokeSync介紹過了。ui
返回方法,org.apache.rocketmq.broker.BrokerStartup#startthis
public void start() throws Exception {if (this.messageStore != null) {// 消息存儲服務啓動=》this.messageStore.start();}if (this.remotingServer != null) {this.remotingServer.start();}if (this.fastRemotingServer != null) {this.fastRemotingServer.start();}if (this.fileWatchService != null) {this.fileWatchService.start();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.start();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.start();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.start();}if (this.filterServerManager != null) {this.filterServerManager.start();}// 註冊broker=》this.registerBrokerAll(true, false, true);this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {@Overridepublic void run() {try {// =》BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister());} catch (Throwable e) {log.error("registerBrokerAll Exception", e);}}// 10s註冊一次}, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS);if (this.brokerStatsManager != null) {this.brokerStatsManager.start();}if (this.brokerFastFailure != null) {// broker快速失敗服務=》this.brokerFastFailure.start();}if (BrokerRole.SLAVE != messageStoreConfig.getBrokerRole()) {if (this.transactionalMessageCheckService != null) {log.info("Start transaction service!");// 分佈式消息事務服務啓動=》this.transactionalMessageCheckService.start();}} }
進入方法,消息存儲服務啓動,org.apache.rocketmq.store.DefaultMessageStore#startspa
public void start() throws Exception {lock = lockFile.getChannel().tryLock(0, 1, false);if (lock == null || lock.isShared() || !lock.isValid()) {throw new RuntimeException("Lock failed,MQ already started");}lockFile.getChannel().write(ByteBuffer.wrap("lock".getBytes()));lockFile.getChannel().force(true);// 刷新消費隊列啓動=》this.flushConsumeQueueService.start();// commitLog服務啓動=》this.commitLog.start();this.storeStatsService.start();if (this.scheduleMessageService != null && SLAVE != messageStoreConfig.getBrokerRole()) {// 調度消息服務啓動=》this.scheduleMessageService.start();}if (this.getMessageStoreConfig().isDuplicationEnable()) {this.reputMessageService.setReputFromOffset(this.commitLog.getConfirmOffset());} else {// 從commitLog中獲取消息隊列最大offset=》this.reputMessageService.setReputFromOffset(this.commitLog.getMaxOffset());}// 存儲消息服務啓動this.reputMessageService.start();// ha服務啓動=》this.haService.start();this.createTempFile();// 添加調度服務=》this.addScheduleTask();this.shutdown = false; }
進入方法,刷新消費隊列啓動,org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#run.net
public void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {// 刷新消息隊列的頻次1sint interval = DefaultMessageStore.this.getMessageStoreConfig().getFlushIntervalConsumeQueue();this.waitForRunning(interval);// =》this.doFlush(1);} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}this.doFlush(RETRY_TIMES_OVER);DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
進入方法,org.apache.rocketmq.store.DefaultMessageStore.FlushConsumeQueueService#doFlush
private void doFlush(int retryTimes) {// 刷新消息隊列頁數2int flushConsumeQueueLeastPages = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueLeastPages();if (retryTimes == RETRY_TIMES_OVER) {flushConsumeQueueLeastPages = 0;}long logicsMsgTimestamp = 0;// 刷新消息隊列最大頻次60sint flushConsumeQueueThoroughInterval = DefaultMessageStore.this.getMessageStoreConfig().getFlushConsumeQueueThoroughInterval();long currentTimeMillis = System.currentTimeMillis();if (currentTimeMillis >= (this.lastFlushTimestamp + flushConsumeQueueThoroughInterval)) {this.lastFlushTimestamp = currentTimeMillis;flushConsumeQueueLeastPages = 0;logicsMsgTimestamp = DefaultMessageStore.this.getStoreCheckpoint().getLogicsMsgTimestamp();}ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue cq : maps.values()) {boolean result = false;for (int i = 0; i < retryTimes && !result; i++) {// =》result = cq.flush(flushConsumeQueueLeastPages);}}}if (0 == flushConsumeQueueLeastPages) {if (logicsMsgTimestamp > 0) {DefaultMessageStore.this.getStoreCheckpoint().setLogicsMsgTimestamp(logicsMsgTimestamp);}DefaultMessageStore.this.getStoreCheckpoint().flush();} }
進入方法,org.apache.rocketmq.store.ConsumeQueue#flush
public boolean flush(final int flushLeastPages) {// 隊列映射文件刷新=》boolean result = this.mappedFileQueue.flush(flushLeastPages);if (isExtReadEnable()) {result = result & this.consumeQueueExt.flush(flushLeastPages);}return result; }
進入方法,隊列映射文件刷新,org.apache.rocketmq.store.MappedFileQueue#flush
public boolean flush(final int flushLeastPages) {boolean result = true;// 根據offset找到映射文件=》MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {long tmpTimeStamp = mappedFile.getStoreTimestamp();// =》int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;this.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result; }
進入方法,根據offset找到映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 獲取隊列中第一個映射文件MappedFile firstMappedFile = this.getFirstMappedFile();// 獲取隊列中最後一個映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {// 若是offset不在索引文件的offset範圍內if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 找到映射文件在隊列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {// 獲取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// offset在目標文件的起始offset和結束offset範圍內if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}// 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}// 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null; }
返回方法,消息隊列刷新,org.apache.rocketmq.store.MappedFileQueue#flush
public boolean flush(final int flushLeastPages) {boolean result = true;// 根據offset找到映射文件=》MappedFile mappedFile = this.findMappedFileByOffset(this.flushedWhere, this.flushedWhere == 0);if (mappedFile != null) {long tmpTimeStamp = mappedFile.getStoreTimestamp();// =》int offset = mappedFile.flush(flushLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.flushedWhere;this.flushedWhere = where;if (0 == flushLeastPages) {this.storeTimestamp = tmpTimeStamp;}}return result; }
進入方法,根據offset找到映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)介紹過了。
返回方法,commitLog服務啓動,org.apache.rocketmq.store.CommitLog.CommitRealTimeService#run
@Overridepublic void run() {CommitLog.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {// 刷新commitLog頻次200msint interval = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitIntervalCommitLog();// 提交數據頁數4int commitDataLeastPages = CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogLeastPages();// 提交commitLog最大頻次200msint commitDataThoroughInterval =CommitLog.this.defaultMessageStore.getMessageStoreConfig().getCommitCommitLogThoroughInterval();long begin = System.currentTimeMillis();if (begin >= (this.lastCommitTimestamp + commitDataThoroughInterval)) {this.lastCommitTimestamp = begin;commitDataLeastPages = 0;}try {// 提交數據=》boolean result = CommitLog.this.mappedFileQueue.commit(commitDataLeastPages);long end = System.currentTimeMillis();if (!result) {this.lastCommitTimestamp = end; // result = false means some data committed.//now wake up flush thread.flushCommitLogService.wakeup();}if (end - begin > 500) {log.info("Commit data to file costs {} ms", end - begin);}this.waitForRunning(interval);} catch (Throwable e) {CommitLog.log.error(this.getServiceName() + " service has exception. ", e);}}boolean result = false;for (int i = 0; i < RETRY_TIMES_OVER && !result; i++) {result = CommitLog.this.mappedFileQueue.commit(0);CommitLog.log.info(this.getServiceName() + " service shutdown, retry " + (i + 1) + " times " + (result ? "OK" : "Not OK"));}CommitLog.log.info(this.getServiceName() + " service end");} }
進入方法,提交數據,org.apache.rocketmq.store.MappedFileQueue#commit
public boolean commit(final int commitLeastPages) {boolean result = true;// commitOffset找到mappedFile=》MappedFile mappedFile = this.findMappedFileByOffset(this.committedWhere, this.committedWhere == 0);if (mappedFile != null) {int offset = mappedFile.commit(commitLeastPages);long where = mappedFile.getFileFromOffset() + offset;result = where == this.committedWhere;this.committedWhere = where;}return result; }
進入方法,commitOffset找到mappedFile,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)介紹過了。
返回方法,調度消息服務啓動,org.apache.rocketmq.store.schedule.ScheduleMessageService#start
public void start() {for (Map.Entry<Integer, Long> entry : this.delayLevelTable.entrySet()) {Integer level = entry.getKey();Long timeDelay = entry.getValue();Long offset = this.offsetTable.get(level);if (null == offset) {offset = 0L;}if (timeDelay != null) {// 這裏用timer,爲何不用調度線程池呢this.timer.schedule(new DeliverDelayedMessageTimerTask(level, offset), FIRST_DELAY_TIME);}}this.timer.scheduleAtFixedRate(new TimerTask() {@Overridepublic void run() {try {// 消息持久化=》ScheduleMessageService.this.persist();} catch (Throwable e) {log.error("scheduleAtFixedRate flush exception", e);}}}, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval()); }
返回方法,存儲消息服務啓動,org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#run
@Overridepublic void run() {DefaultMessageStore.log.info(this.getServiceName() + " service started");while (!this.isStopped()) {try {Thread.sleep(1);// =》this.doReput();} catch (Exception e) {DefaultMessageStore.log.warn(this.getServiceName() + " service has exception. ", e);}}DefaultMessageStore.log.info(this.getServiceName() + " service end"); }
進入方法,org.apache.rocketmq.store.DefaultMessageStore.ReputMessageService#doReput
private void doReput() {for (boolean doNext = true; this.isCommitLogAvailable() && doNext; ) {if (DefaultMessageStore.this.getMessageStoreConfig().isDuplicationEnable()&& this.reputFromOffset >= DefaultMessageStore.this.getConfirmOffset()) {break;}// 獲取消息數據=》SelectMappedBufferResult result = DefaultMessageStore.this.commitLog.getData(reputFromOffset);if (result != null) {try {this.reputFromOffset = result.getStartOffset();for (int readSize = 0; readSize < result.getSize() && doNext; ) {DispatchRequest dispatchRequest =DefaultMessageStore.this.commitLog.checkMessageAndReturnSize(result.getByteBuffer(), false, false);int size = dispatchRequest.getMsgSize();if (dispatchRequest.isSuccess()) {if (size > 0) {// 存儲服務轉發請求=》DefaultMessageStore.this.doDispatch(dispatchRequest);// broker角色是master長輪詢模式if (BrokerRole.SLAVE != DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole()&& DefaultMessageStore.this.brokerConfig.isLongPollingEnable()) {// 消息到達通知監聽器=》DefaultMessageStore.this.messageArrivingListener.arriving(dispatchRequest.getTopic(),dispatchRequest.getQueueId(), dispatchRequest.getConsumeQueueOffset() + 1,dispatchRequest.getTagsCode(), dispatchRequest.getStoreTimestamp(),dispatchRequest.getBitMap(), dispatchRequest.getPropertiesMap());}this.reputFromOffset += size;readSize += size;// 若是broker是slaveif (DefaultMessageStore.this.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE) {DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicTimesTotal(dispatchRequest.getTopic()).incrementAndGet();DefaultMessageStore.this.storeStatsService.getSinglePutMessageTopicSizeTotal(dispatchRequest.getTopic()).addAndGet(dispatchRequest.getMsgSize());}} else if (size == 0) {this.reputFromOffset = DefaultMessageStore.this.commitLog.rollNextFile(this.reputFromOffset);readSize = result.getSize();}} else if (!dispatchRequest.isSuccess()) {if (size > 0) {log.error("[BUG]read total count not equals msg total size. reputFromOffset={}", reputFromOffset);this.reputFromOffset += size;} else {doNext = false;if (DefaultMessageStore.this.brokerConfig.getBrokerId() == MixAll.MASTER_ID) {log.error("[BUG]the master dispatch message to consume queue error, COMMITLOG OFFSET: {}",this.reputFromOffset);this.reputFromOffset += result.getSize() - readSize;}}}}} finally {result.release();}} else {doNext = false;}} }
進入方法,獲取消息數據,org.apache.rocketmq.store.CommitLog#getData(long, boolean)
public SelectMappedBufferResult getData(final long offset, final boolean returnFirstOnNotFound) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();// 按offset查詢映射文件=》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, returnFirstOnNotFound);if (mappedFile != null) {int pos = (int) (offset % mappedFileSize);// 查詢消息所在映射bufferSelectMappedBufferResult result = mappedFile.selectMappedBuffer(pos);return result;}return null; }
進入方法,按offset查詢映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)介紹過了。
進入方法,存儲服務轉發請求,org.apache.rocketmq.store.DefaultMessageStore#doDispatch
public void doDispatch(DispatchRequest req) {for (CommitLogDispatcher dispatcher : this.dispatcherList) {// =》dispatcher.dispatch(req);} }
進入方法,org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildConsumeQueue#dispatch
@Overridepublic void dispatch(DispatchRequest request) {// 獲取事務類型=》final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());switch (tranType) {case MessageSysFlag.TRANSACTION_NOT_TYPE:case MessageSysFlag.TRANSACTION_COMMIT_TYPE:// 構建存放消息位置信息=》DefaultMessageStore.this.putMessagePositionInfo(request);break;case MessageSysFlag.TRANSACTION_PREPARED_TYPE:case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:break;} }
進入方法,構建存放消息位置信息,org.apache.rocketmq.store.DefaultMessageStore#putMessagePositionInfo
public void putMessagePositionInfo(DispatchRequest dispatchRequest) {// 按topic、queueId查詢到消息隊列=》ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());// 包裝消息存儲信息=》cq.putMessagePositionInfoWrapper(dispatchRequest); }
進入方法,按topic、queueId查詢到消息隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic; }
返回方法,包裝消息存儲信息,org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfoWrapper
public void putMessagePositionInfoWrapper(DispatchRequest request) {final int maxRetries = 30;boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();for (int i = 0; i < maxRetries && canWrite; i++) {long tagsCode = request.getTagsCode();// 消息擴展服務是否開啓=》if (isExtWriteEnable()) {ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();cqExtUnit.setFilterBitMap(request.getBitMap());cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());cqExtUnit.setTagsCode(request.getTagsCode());long extAddr = this.consumeQueueExt.put(cqExtUnit);if (isExtAddr(extAddr)) {tagsCode = extAddr;} else {log.warn("Save consume queue extend fail, So just save tagsCode! {}, topic:{}, queueId:{}, offset:{}", cqExtUnit,topic, queueId, request.getCommitLogOffset());}}// 組裝消息存儲位置信息=》boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());if (result) {this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());return;} else {// XXX: warn and notify melog.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()+ " failed, retry " + i + " times");try {Thread.sleep(1000);} catch (InterruptedException e) {log.warn("", e);}}}// XXX: warn and notify melog.error("[BUG]consume queue can not write, {} {}", this.topic, this.queueId);this.defaultMessageStore.getRunningFlags().makeLogicsQueueError(); }
進入方法,組裝消息存儲位置信息,org.apache.rocketmq.store.ConsumeQueue#putMessagePositionInfo
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,final long cqOffset) {if (offset <= this.maxPhysicOffset) {return true;}this.byteBufferIndex.flip();this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);this.byteBufferIndex.putLong(offset);this.byteBufferIndex.putInt(size);this.byteBufferIndex.putLong(tagsCode);final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;// 從映射文件隊列中獲取最後一個映射文件=》MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);if (mappedFile != null) {// 映射文是第一個建立、consumerOffset不是0,映射文件寫位置是0if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) {this.minLogicOffset = expectLogicOffset;this.mappedFileQueue.setFlushedWhere(expectLogicOffset);this.mappedFileQueue.setCommittedWhere(expectLogicOffset);// 填充文件=》this.fillPreBlank(mappedFile, expectLogicOffset);log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "+ mappedFile.getWrotePosition());}// consumerOffset不是0if (cqOffset != 0) {long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();if (expectLogicOffset < currentLogicOffset) {log.warn("Build consume queue repeatedly, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);return true;}if (expectLogicOffset != currentLogicOffset) {LOG_ERROR.warn("[BUG]logic queue order maybe wrong, expectLogicOffset: {} currentLogicOffset: {} Topic: {} QID: {} Diff: {}",expectLogicOffset,currentLogicOffset,this.topic,this.queueId,expectLogicOffset - currentLogicOffset);}}this.maxPhysicOffset = offset;// 消息寫入映射文件=》return mappedFile.appendMessage(this.byteBufferIndex.array());}return false; }
進入方法,從映射文件隊列中獲取最後一個映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile(long, boolean)
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) {long createOffset = -1;// 獲取映射文件隊列中最後一個映射文件MappedFile mappedFileLast = getLastMappedFile();if (mappedFileLast == null) {createOffset = startOffset - (startOffset % this.mappedFileSize);}if (mappedFileLast != null && mappedFileLast.isFull()) {// 建立的offset=最後映射文件的開始offset+映射文件的大小createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;}// 建立文件的offset不是-1且須要建立映射文件if (createOffset != -1 && needCreate) {// 下個文件存儲路徑 System.getProperty("user.home") + File.separator + "store"// + File.separator + "commitlog",根據offset建立文件名String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);// 下下個文件存儲路經String nextNextFilePath = this.storePath + File.separator+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);MappedFile mappedFile = null;if (this.allocateMappedFileService != null) {// 處理請求返回映射文件=》mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,nextNextFilePath, this.mappedFileSize);} else {try {// 建立映射文件=》mappedFile = new MappedFile(nextFilePath, this.mappedFileSize);} catch (IOException e) {log.error("create mappedFile exception", e);}}if (mappedFile != null) {if (this.mappedFiles.isEmpty()) {mappedFile.setFirstCreateInQueue(true);}this.mappedFiles.add(mappedFile);}return mappedFile;}return mappedFileLast; }
進入方法,處理請求返回映射文件,org.apache.rocketmq.store.AllocateMappedFileService#putRequestAndReturnMappedFile
public MappedFile putRequestAndReturnMappedFile(String nextFilePath, String nextNextFilePath, int fileSize) {int canSubmitRequests = 2;// 是否瞬間持久化if (this.messageStore.getMessageStoreConfig().isTransientStorePoolEnable()) {// 若是broker是master,buffer不夠用瞬間失敗if (this.messageStore.getMessageStoreConfig().isFastFailIfNoBufferInStorePool()&& BrokerRole.SLAVE != this.messageStore.getMessageStoreConfig().getBrokerRole()) { //if broker is slave, don't fast fail even no buffer in poolcanSubmitRequests = this.messageStore.getTransientStorePool().remainBufferNumbs() - this.requestQueue.size();}}AllocateRequest nextReq = new AllocateRequest(nextFilePath, fileSize);// 緩存存儲請求boolean nextPutOK = this.requestTable.putIfAbsent(nextFilePath, nextReq) == null;if (nextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so create mapped file error, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());this.requestTable.remove(nextFilePath);return null;}// 下一個請求添加到優先級阻塞隊列中boolean offerOK = this.requestQueue.offer(nextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}canSubmitRequests--;}AllocateRequest nextNextReq = new AllocateRequest(nextNextFilePath, fileSize);// 緩存下下個請求boolean nextNextPutOK = this.requestTable.putIfAbsent(nextNextFilePath, nextNextReq) == null;if (nextNextPutOK) {if (canSubmitRequests <= 0) {log.warn("[NOTIFYME]TransientStorePool is not enough, so skip preallocate mapped file, " +"RequestQueueSize : {}, StorePoolSize: {}", this.requestQueue.size(), this.messageStore.getTransientStorePool().remainBufferNumbs());this.requestTable.remove(nextNextFilePath);} else {// 下下個請求加入優先級阻塞隊列boolean offerOK = this.requestQueue.offer(nextNextReq);if (!offerOK) {log.warn("never expected here, add a request to preallocate queue failed");}}}if (hasException) {log.warn(this.getServiceName() + " service has exception. so return null");return null;}AllocateRequest result = this.requestTable.get(nextFilePath);try {if (result != null) {// 同步等待boolean waitOK = result.getCountDownLatch().await(waitTimeOut, TimeUnit.MILLISECONDS);if (!waitOK) {log.warn("create mmap timeout " + result.getFilePath() + " " + result.getFileSize());return null;} else {this.requestTable.remove(nextFilePath);return result.getMappedFile();}} else {log.error("find preallocate mmap failed, this never happen");}} catch (InterruptedException e) {log.warn(this.getServiceName() + " service has exception. ", e);}return null; }
返回方法,org.apache.rocketmq.store.DefaultMessageStore.CommitLogDispatcherBuildIndex#dispatch
@Overridepublic void dispatch(DispatchRequest request) {if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) {// 構建索引=》DefaultMessageStore.this.indexService.buildIndex(request);} }
接下篇。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣