rocketmq源碼解析broker啓動②

說在前面apache

broker啓動微信

 

源碼解析app

進入方法,加載消費隊列,org.apache.rocketmq.store.DefaultMessageStore#loadConsumeQueue分佈式




private boolean loadConsumeQueue() {//        System.getProperty("user.home") + File.separator + "store" File.separator + "consumequeue"File dirLogic = new File(StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()));File[] fileTopicList = dirLogic.listFiles();if (fileTopicList != null) {for (File fileTopic : fileTopicList) {//                文件名是topic名String topic = fileTopic.getName();File[] fileQueueIdList = fileTopic.listFiles();if (fileQueueIdList != null) {for (File fileQueueId : fileQueueIdList) {int queueId;try {queueId = Integer.parseInt(fileQueueId.getName());} catch (NumberFormatException e) {continue;}ConsumeQueue logic = new ConsumeQueue(topic,queueId,StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                            消費隊列文件默認大小30wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),//                            存儲消息隊列=》this);this.putConsumeQueue(topic, queueId, logic);//                        消息隊列加載=》if (!logic.load()) {return false;}}}}}log.info("load logics queue all over, OK");return true;    }

進入方法,存儲消息隊列,org.apache.rocketmq.store.DefaultMessageStore#putConsumeQueueide

private void putConsumeQueue(final String topic, final int queueId, final ConsumeQueue consumeQueue) {ConcurrentMap<Integer/* queueId */, ConsumeQueue> map = this.consumeQueueTable.get(topic);if (null == map) {map = new ConcurrentHashMap<Integer/* queueId */, ConsumeQueue>();map.put(queueId, consumeQueue);this.consumeQueueTable.put(topic, map);} else {map.put(queueId, consumeQueue);}    }

返回方法,消息隊列加載,org.apache.rocketmq.store.ConsumeQueue#loadthis

public boolean load() {//        =》boolean result = this.mappedFileQueue.load();log.info("load consume queue " + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));if (isExtReadEnable()) {//            消息隊列擴展加載=》result &= this.consumeQueueExt.load();}return result;    }

進入方法,org.apache.rocketmq.store.MappedFileQueue#load介紹過了。spa

返回方法,消息隊列擴展加載,org.apache.rocketmq.store.ConsumeQueueExt#loadcode

public boolean load() {boolean result = this.mappedFileQueue.load();log.info("load consume queue extend" + this.topic + "-" + this.queueId + " " + (result ? "OK" : "Failed"));return result;    }

進入方法,org.apache.rocketmq.store.MappedFileQueue#load介紹過了。orm

進入方法,恢復,org.apache.rocketmq.store.DefaultMessageStore#recoverblog


private void recover(final boolean lastExitOK) {//        恢復消息隊列=》this.recoverConsumeQueue();if (lastExitOK) {//            正常恢復commitLog=》this.commitLog.recoverNormally();} else {//            異常恢復commitLog=》this.commitLog.recoverAbnormally();}//        恢復topicQueue信息=》this.recoverTopicQueueTable();    }

進入方法,恢復消息隊列,org.apache.rocketmq.store.DefaultMessageStore#recoverConsumeQueue

private void recoverConsumeQueue() {for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {//                恢復單個消息隊列=》logic.recover();}}    }

進入方法,恢復單個消息隊列,org.apache.rocketmq.store.ConsumeQueue#recover







public void recover() {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {int index = mappedFiles.size() - 3;if (index < 0)index = 0;int mappedFileSizeLogics = this.mappedFileSize;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();//            映射文件處理的offsetlong processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;long maxExtAddr = 1;while (true) {for (int i = 0; i < mappedFileSizeLogics; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();long tagsCode = byteBuffer.getLong();if (offset >= 0 && size > 0) {mappedFileOffset = i + CQ_STORE_UNIT_SIZE;this.maxPhysicOffset = offset;if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}} else {log.info("recover current consume queue file over,  " + mappedFile.getFileName() + " "+ offset + " " + size + " " + tagsCode);break;}}if (mappedFileOffset == mappedFileSizeLogics) {index++;if (index >= mappedFiles.size()) {log.info("recover last consume queue file over, last mapped file "+ mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next consume queue file, " + mappedFile.getFileName());}} else {log.info("recover current consume queue queue over " + mappedFile.getFileName() + " "+ (processOffset + mappedFileOffset));break;}}processOffset += mappedFileOffset;//            設置刷新的offset位置this.mappedFileQueue.setFlushedWhere(processOffset);//            設置commit的offset位置this.mappedFileQueue.setCommittedWhere(processOffset);//            按處理offset刪除髒數據文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);if (isExtReadEnable()) {//                恢復擴展消息=》this.consumeQueueExt.recover();log.info("Truncate consume queue extend file by max {}", maxExtAddr);//                映射文件隊列刪除最大offset的髒數據文件=》this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);}}    }

進入方法,按處理offset刪除髒數據文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles


public void truncateDirtyFiles(long offset) {List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();for (MappedFile file : this.mappedFiles) {//            文件尾offsetlong fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//            文件尾offset大於處理的offsetif (fileTailOffset > offset) {//                處理offset大於文件開始的offsetif (offset >= file.getFileFromOffset()) {//                    設置映射文件寫的位置file.setWrotePosition((int) (offset % this.mappedFileSize));//                    設置文件commit的位置file.setCommittedPosition((int) (offset % this.mappedFileSize));//                    設置文件刷新的位置file.setFlushedPosition((int) (offset % this.mappedFileSize));} else {//                    文件銷燬=》file.destroy(1000);willRemoveFiles.add(file);}}}//        刪除映射文件隊列中的文件=》this.deleteExpiredFile(willRemoveFiles);    }

進入方法,刪除映射文件隊列中的文件,org.apache.rocketmq.store.MappedFileQueue#deleteExpiredFile



void deleteExpiredFile(List<MappedFile> files) {if (!files.isEmpty()) {Iterator<MappedFile> iterator = files.iterator();while (iterator.hasNext()) {MappedFile cur = iterator.next();if (!this.mappedFiles.contains(cur)) {iterator.remove();log.info("This mappedFile {} is not contained by mappedFiles, so skip it.", cur.getFileName());}}try {if (!this.mappedFiles.removeAll(files)) {log.error("deleteExpiredFile remove failed.");}} catch (Exception e) {log.error("deleteExpiredFile has exception.", e);}}    }

返回方法,恢復擴展消息,org.apache.rocketmq.store.ConsumeQueueExt#recover






public void recover() {final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (mappedFiles == null || mappedFiles.isEmpty()) {return;}// load all files, consume queue will truncate extend files.int index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;CqExtUnit extUnit = new CqExtUnit();while (true) {extUnit.readBySkip(byteBuffer);// check whether write sth.if (extUnit.getSize() > 0) {mappedFileOffset += extUnit.getSize();continue;}index++;if (index < mappedFiles.size()) {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("Recover next consume queue extend file, " + mappedFile.getFileName());continue;}log.info("All files of consume queue extend has been recovered over, last mapped file "+ mappedFile.getFileName());break;}processOffset += mappedFileOffset;//       映射文件隊列設置刷新的offset位置this.mappedFileQueue.setFlushedWhere(processOffset);//        映射文件隊列設置offset commit的位置this.mappedFileQueue.setCommittedWhere(processOffset);//        映射文件隊列按處理offset刪除髒數據文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);    }

進入方法,映射文件隊列按處理offset刪除髒數據文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介紹過了。

返回方法,映射文件隊列刪除最大offset的髒數據文件,org.apache.rocketmq.store.ConsumeQueueExt#truncateByMaxAddress




public void truncateByMaxAddress(final long maxAddress) {if (!isExtAddr(maxAddress)) {return;}log.info("Truncate consume queue ext by max {}.", maxAddress);CqExtUnit cqExtUnit = get(maxAddress);if (cqExtUnit == null) {log.error("[BUG] address {} of consume queue extend not found!", maxAddress);return;}final long realOffset = unDecorate(maxAddress);//        刪除髒數據文件=》this.mappedFileQueue.truncateDirtyFiles(realOffset + cqExtUnit.getSize());    }

進入方法,刪除髒數據文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介紹過了。

返回方法,正常恢復commitLog,org.apache.rocketmq.store.CommitLog#recoverNormally


public void recoverNormally() {boolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Began to recover from the last third fileint index = mappedFiles.size() - 3;if (index < 0)index = 0;MappedFile mappedFile = mappedFiles.get(index);ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;while (true) {//                建立轉發請求DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (dispatchRequest.isSuccess() && size > 0) {mappedFileOffset += size;}// Come the end of the file, switch to the next file Since the// return 0 representatives met last hole,// this can not be included in truncate offsetelse if (dispatchRequest.isSuccess() && size == 0) {index++;if (index >= mappedFiles.size()) {// Current branch can not happenlog.info("recover last 3 physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}// Intermediate file read errorelse if (!dispatchRequest.isSuccess()) {log.info("recover physics file end, " + mappedFile.getFileName());break;}}processOffset += mappedFileOffset;//            設置刷新offsetthis.mappedFileQueue.setFlushedWhere(processOffset);//            設置commit offsetthis.mappedFileQueue.setCommittedWhere(processOffset);//            刪除髒數據文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);}    }

進入方法,刪除髒數據文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介紹過了。

返回方法,異常恢復commitLog,org.apache.rocketmq.store.CommitLog#recoverAbnormally






public void recoverAbnormally() {// recover by the minimum time stampboolean checkCRCOnRecover = this.defaultMessageStore.getMessageStoreConfig().isCheckCRCOnRecover();final List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();if (!mappedFiles.isEmpty()) {// Looking beginning to recover from which file 從最後一個文件開始恢復int index = mappedFiles.size() - 1;MappedFile mappedFile = null;for (; index >= 0; index--) {mappedFile = mappedFiles.get(index);//                是否從最後一個文件恢復=》if (this.isMappedFileMatchedRecover(mappedFile)) {log.info("recover from this mapped file " + mappedFile.getFileName());break;}}if (index < 0) {index = 0;mappedFile = mappedFiles.get(index);}ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();long processOffset = mappedFile.getFileFromOffset();long mappedFileOffset = 0;while (true) {DispatchRequest dispatchRequest = this.checkMessageAndReturnSize(byteBuffer, checkCRCOnRecover);int size = dispatchRequest.getMsgSize();// Normal dataif (size > 0) {mappedFileOffset += size;if (this.defaultMessageStore.getMessageStoreConfig().isDuplicationEnable()) {if (dispatchRequest.getCommitLogOffset() < this.defaultMessageStore.getConfirmOffset()) {//                            消息存儲轉發=》this.defaultMessageStore.doDispatch(dispatchRequest);}} else {//                        =》this.defaultMessageStore.doDispatch(dispatchRequest);}}// Intermediate file read errorelse if (size == -1) {log.info("recover physics file end, " + mappedFile.getFileName());break;}// Come the end of the file, switch to the next file// Since the return 0 representatives met last hole, this can// not be included in truncate offsetelse if (size == 0) {index++;if (index >= mappedFiles.size()) {// The current branch under normal circumstances should// not happenlog.info("recover physics file over, last mapped file " + mappedFile.getFileName());break;} else {mappedFile = mappedFiles.get(index);byteBuffer = mappedFile.sliceByteBuffer();processOffset = mappedFile.getFileFromOffset();mappedFileOffset = 0;log.info("recover next physics file, " + mappedFile.getFileName());}}}processOffset += mappedFileOffset;//            設置刷新offset位置this.mappedFileQueue.setFlushedWhere(processOffset);//            設置commitOffsetthis.mappedFileQueue.setCommittedWhere(processOffset);//            刪除髒數據文件=》this.mappedFileQueue.truncateDirtyFiles(processOffset);// Clear ConsumeQueue redundant data 清除消息隊列冗餘數據=》this.defaultMessageStore.truncateDirtyLogicFiles(processOffset);}// Commitlog case files are deletedelse {this.mappedFileQueue.setFlushedWhere(0);this.mappedFileQueue.setCommittedWhere(0);//            銷燬消息隊列=》this.defaultMessageStore.destroyLogics();}    }

進入方法,是否從最後一個文件恢復,org.apache.rocketmq.store.CommitLog#isMappedFileMatchedRecover




private boolean isMappedFileMatchedRecover(final MappedFile mappedFile) {ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();int magicCode = byteBuffer.getInt(MessageDecoder.MESSAGE_MAGIC_CODE_POSTION);if (magicCode != MESSAGE_MAGIC_CODE) {return false;}//        消息存儲時間long storeTimestamp = byteBuffer.getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);if (0 == storeTimestamp) {return false;}if (this.defaultMessageStore.getMessageStoreConfig().isMessageIndexEnable()&& this.defaultMessageStore.getMessageStoreConfig().isMessageIndexSafe()) {if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestampIndex()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));return true;}} else {if (storeTimestamp <= this.defaultMessageStore.getStoreCheckpoint().getMinTimestamp()) {log.info("find check timestamp, {} {}",storeTimestamp,UtilAll.timeMillisToHumanString(storeTimestamp));return true;}}return false;    }

進入方法,刪除髒數據文件,org.apache.rocketmq.store.MappedFileQueue#truncateDirtyFiles介紹過了。

返回方法,清除消息隊列冗餘數據,org.apache.rocketmq.store.DefaultMessageStore#truncateDirtyLogicFiles

public void truncateDirtyLogicFiles(long phyOffset) {ConcurrentMap<String, ConcurrentMap<Integer, ConsumeQueue>> tables = DefaultMessageStore.this.consumeQueueTable;for (ConcurrentMap<Integer, ConsumeQueue> maps : tables.values()) {for (ConsumeQueue logic : maps.values()) {//                =》logic.truncateDirtyLogicFiles(phyOffset);}}    }

進入方法,org.apache.rocketmq.store.ConsumeQueue#truncateDirtyLogicFiles

















public void truncateDirtyLogicFiles(long phyOffet) {int logicFileSize = this.mappedFileSize;this.maxPhysicOffset = phyOffet - 1;long maxExtAddr = 1;while (true) {//            獲取映射隊列中最後的映射文件=》MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();if (mappedFile != null) {ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();mappedFile.setWrotePosition(0);mappedFile.setCommittedPosition(0);mappedFile.setFlushedPosition(0);for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();long tagsCode = byteBuffer.getLong();if (0 == i) {if (offset >= phyOffet) {this.mappedFileQueue.deleteLastMappedFile();break;} else {int pos = i + CQ_STORE_UNIT_SIZE;mappedFile.setWrotePosition(pos);mappedFile.setCommittedPosition(pos);mappedFile.setFlushedPosition(pos);this.maxPhysicOffset = offset;// This maybe not take effect, when not every consume queue has extend file.if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}}} else {if (offset >= 0 && size > 0) {if (offset >= phyOffet) {return;}int pos = i + CQ_STORE_UNIT_SIZE;mappedFile.setWrotePosition(pos);mappedFile.setCommittedPosition(pos);mappedFile.setFlushedPosition(pos);this.maxPhysicOffset = offset;if (isExtAddr(tagsCode)) {maxExtAddr = tagsCode;}if (pos == logicFileSize) {return;}} else {return;}}}} else {break;}}if (isExtReadEnable()) {//            刪除最大位置的消息隊列=》this.consumeQueueExt.truncateByMaxAddress(maxExtAddr);}}public long getLastOffset() {long lastOffset = -1;int logicFileSize = this.mappedFileSize;//        獲取映射文件隊列中最後一個映射文件=》MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();if (mappedFile != null) {//            有點疑問,這裏是獲取的commitOffset嗎int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE;if (position < 0)position = 0;ByteBuffer byteBuffer = mappedFile.sliceByteBuffer();byteBuffer.position(position);for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) {long offset = byteBuffer.getLong();int size = byteBuffer.getInt();byteBuffer.getLong();if (offset >= 0 && size > 0) {lastOffset = offset + size;} else {break;}}}return lastOffset;    }

返回方法,銷燬消息隊列,org.apache.rocketmq.store.DefaultMessageStore#destroyLogics

public void destroyLogics() {for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {//                =》logic.destroy();}}    }

進入方法,org.apache.rocketmq.store.ConsumeQueue#destroy

public void destroy() {this.maxPhysicOffset = -1;this.minLogicOffset = 0;//        映射文件隊列銷燬=》this.mappedFileQueue.destroy();if (isExtReadEnable()) {//            消費隊列銷燬=》this.consumeQueueExt.destroy();}    }

進入方法,映射文件隊列銷燬,org.apache.rocketmq.store.MappedFileQueue#destroy

public void destroy() {for (MappedFile mf : this.mappedFiles) {//        映射文件銷燬=》mf.destroy(1000 * 3);}//        同步刪除映射文件隊列this.mappedFiles.clear();this.flushedWhere = 0;// delete parent directory 刪除父級文件夾File file = new File(storePath);if (file.isDirectory()) {file.delete();}    }

返回方法,恢復topicQueue信息,org.apache.rocketmq.store.DefaultMessageStore#recoverTopicQueueTable

private void recoverTopicQueueTable() {HashMap<String/* topic-queueid */, Long/* offset */> table = new HashMap<String, Long>(1024);//        獲取最小的物理offset=》long minPhyOffset = this.commitLog.getMinOffset();for (ConcurrentMap<Integer, ConsumeQueue> maps : this.consumeQueueTable.values()) {for (ConsumeQueue logic : maps.values()) {String key = logic.getTopic() + "-" + logic.getQueueId();//                設置消息隊列的最大offset=》table.put(key, logic.getMaxOffsetInQueue());//               設置正確的最小的物理offset=》logic.correctMinOffset(minPhyOffset);}}this.commitLog.setTopicQueueTable(table);    }

進入方法,獲取最小的物理offset,org.apache.rocketmq.store.CommitLog#getMinOffset

public long getMinOffset() {//        獲取第一個映射文件=》MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();if (mappedFile != null) {if (mappedFile.isAvailable()) {//                獲取映射文件的起始偏移量return mappedFile.getFileFromOffset();} else {//                獲取下個文件的起始偏移量=》return this.rollNextFile(mappedFile.getFileFromOffset());}}return -1;    }

進入方法,獲取下個文件的起始偏移量,org.apache.rocketmq.store.CommitLog#rollNextFile

public long rollNextFile(final long offset) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();return offset + mappedFileSize - offset % mappedFileSize;    }

進入方法,設置消息隊列的最大offset,org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue

public long getMaxOffsetInQueue() {//        =》return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;    }

進入方法,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset

public long getMaxOffset() {//        獲取存儲映射文件隊列中索引位置最大的映射文件=》MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {//            映射文件的起始offset+映射文件的可讀取的索引位置return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}//        若是隊列中沒有存儲映射文件直接返回0return 0;    }

進入方法,獲取存儲映射文件隊列中索引位置最大的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()


public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;    }

返回方法,設置正確的最小的物理offset,org.apache.rocketmq.store.ConsumeQueue#correctMinOffset


public void correctMinOffset(long phyMinOffset) {//        找到隊列中一個映射文件=》MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();long minExtAddr = 1;if (mappedFile != null) {//           獲取映射文件的bufferSelectMappedBufferResult result = mappedFile.selectMappedBuffer(0);if (result != null) {try {for (int i = 0; i < result.getSize(); i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = result.getByteBuffer().getLong();result.getByteBuffer().getInt();long tagsCode = result.getByteBuffer().getLong();if (offsetPy >= phyMinOffset) {this.minLogicOffset = result.getMappedFile().getFileFromOffset() + i;log.info("Compute logical min offset: {}, topic: {}, queueId: {}",this.getMinOffsetInQueue(), this.topic, this.queueId);// This maybe not take effect, when not every consume queue has extend file.if (isExtAddr(tagsCode)) {minExtAddr = tagsCode;}break;}}} catch (Exception e) {log.error("Exception thrown when correctMinOffset", e);} finally {result.release();}}}if (isExtReadEnable()) {//            刪除最小位置的擴展消息=》this.consumeQueueExt.truncateByMinAddress(minExtAddr);}    }

進入方法,刪除最小位置的擴展消息,org.apache.rocketmq.store.ConsumeQueueExt#truncateByMinAddress






public void truncateByMinAddress(final long minAddress) {if (!isExtAddr(minAddress)) {return;}log.info("Truncate consume queue ext by min {}.", minAddress);List<MappedFile> willRemoveFiles = new ArrayList<MappedFile>();List<MappedFile> mappedFiles = this.mappedFileQueue.getMappedFiles();final long realOffset = unDecorate(minAddress);for (MappedFile file : mappedFiles) {//            文件尾offsetlong fileTailOffset = file.getFileFromOffset() + this.mappedFileSize;//            文件尾offset小於真實的offsetif (fileTailOffset < realOffset) {log.info("Destroy consume queue ext by min: file={}, fileTailOffset={}, minOffset={}", file.getFileName(),fileTailOffset, realOffset);//                文件銷燬=》if (file.destroy(1000)) {willRemoveFiles.add(file);}}}//        刪除指望的文件=》this.mappedFileQueue.deleteExpiredFile(willRemoveFiles);    }

返回方法,註冊處理器,org.apache.rocketmq.broker.BrokerController#registerProcessor









public void registerProcessor() {/*** SendMessageProcessor*///        發送消息處理器=》SendMessageProcessor sendProcessor = new SendMessageProcessor(this);//        發送消息鉤子方法實現sendProcessor.registerSendMessageHook(sendMessageHookList);//      消費消息鉤子方法實現sendProcessor.registerConsumeMessageHook(consumeMessageHookList);//        註冊處理器,這些處理器都是保存在hashMap中=》this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.remotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_MESSAGE_V2, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.SEND_BATCH_MESSAGE, sendProcessor, this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CONSUMER_SEND_MSG_BACK, sendProcessor, this.sendMessageExecutor);/*** PullMessageProcessor*///        拉取消息處理器=》this.remotingServer.registerProcessor(RequestCode.PULL_MESSAGE, this.pullMessageProcessor, this.pullMessageExecutor);//        註冊客戶端消息鉤子方法this.pullMessageProcessor.registerConsumeMessageHook(consumeMessageHookList);/*** QueryMessageProcessor*///        查詢消息處理器NettyRequestProcessor queryProcessor = new QueryMessageProcessor(this);this.remotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.remotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_MESSAGE, queryProcessor, this.queryMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.VIEW_MESSAGE_BY_ID, queryProcessor, this.queryMessageExecutor);/*** ClientManageProcessor*///        client管理處理器ClientManageProcessor clientProcessor = new ClientManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.remotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.remotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.HEART_BEAT, clientProcessor, this.heartbeatExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UNREGISTER_CLIENT, clientProcessor, this.clientManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.CHECK_CLIENT_CONFIG, clientProcessor, this.clientManageExecutor);/*** ConsumerManageProcessor*///        消費者管理處理器ConsumerManageProcessor consumerManageProcessor = new ConsumerManageProcessor(this);this.remotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.remotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.GET_CONSUMER_LIST_BY_GROUP, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.UPDATE_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.QUERY_CONSUMER_OFFSET, consumerManageProcessor, this.consumerManageExecutor);/*** EndTransactionProcessor*///        事務管理處理器this.remotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);this.fastRemotingServer.registerProcessor(RequestCode.END_TRANSACTION, new EndTransactionProcessor(this), this.sendMessageExecutor);/*** Default*///        broker管理服務處理器AdminBrokerProcessor adminProcessor = new AdminBrokerProcessor(this);//        註冊默認的處理器this.remotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);this.fastRemotingServer.registerDefaultProcessor(adminProcessor, this.adminBrokerExecutor);    }

各個處理器前面都介紹過了。

返回方法,broker保護機制,org.apache.rocketmq.broker.BrokerController#protectBroker

public void protectBroker() {//        消費緩慢時是否禁止消費if (this.brokerConfig.isDisableConsumeIfConsumerReadSlowly()) {final Iterator<Map.Entry<String, MomentStatsItem>> it = this.brokerStatsManager.getMomentStatsItemSetFallSize().getStatsItemTable().entrySet().iterator();while (it.hasNext()) {final Map.Entry<String, MomentStatsItem> next = it.next();final long fallBehindBytes = next.getValue().getValue().get();//                若是大於消費閾值if (fallBehindBytes > this.brokerConfig.getConsumerFallbehindThreshold()) {final String[] split = next.getValue().getStatsKey().split("@");final String group = split[2];LOG_PROTECTION.info("[PROTECT_BROKER] the consumer[{}] consume slowly, {} bytes, disable it", group, fallBehindBytes);//                    禁止消費=》this.subscriptionGroupManager.disableConsume(group);}}}    }

進入方法,禁止消費,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#disableConsume

public void disableConsume(final String groupName) {//        消費組訂閱信息SubscriptionGroupConfig old = this.subscriptionGroupTable.get(groupName);if (old != null) {//            設置禁止消費old.setConsumeEnable(false);this.dataVersion.nextVersion();}    }

返回方法,初始化分佈式消息事務,org.apache.rocketmq.broker.BrokerController#initialTransaction

private void initialTransaction() {//        加載分佈式消息事務服務this.transactionalMessageService = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_SERVICE_ID, TransactionalMessageService.class);if (null == this.transactionalMessageService) {this.transactionalMessageService = new TransactionalMessageServiceImpl(new TransactionalMessageBridge(this, this.getMessageStore()));log.warn("Load default transaction message hook service: {}", TransactionalMessageServiceImpl.class.getSimpleName());}//        加載分佈式消息事務檢查監聽器this.transactionalMessageCheckListener = ServiceProvider.loadClass(ServiceProvider.TRANSACTION_LISTENER_ID, AbstractTransactionalMessageCheckListener.class);if (null == this.transactionalMessageCheckListener) {this.transactionalMessageCheckListener = new DefaultTransactionalMessageCheckListener();log.warn("Load default discard message hook service: {}", DefaultTransactionalMessageCheckListener.class.getSimpleName());}this.transactionalMessageCheckListener.setBrokerController(this);this.transactionalMessageCheckService = new TransactionalMessageCheckService(this);    }

返回方法,初始化失敗,關閉控制器,org.apache.rocketmq.broker.BrokerController#shutdown




















public void shutdown() {if (this.brokerStatsManager != null) {this.brokerStatsManager.shutdown();}if (this.clientHousekeepingService != null) {this.clientHousekeepingService.shutdown();}if (this.pullRequestHoldService != null) {this.pullRequestHoldService.shutdown();}if (this.remotingServer != null) {this.remotingServer.shutdown();}if (this.fastRemotingServer != null) {this.fastRemotingServer.shutdown();}if (this.fileWatchService != null) {this.fileWatchService.shutdown();}if (this.messageStore != null) {//            消息存儲服務關閉=》this.messageStore.shutdown();}this.scheduledExecutorService.shutdown();try {this.scheduledExecutorService.awaitTermination(5000, TimeUnit.MILLISECONDS);} catch (InterruptedException e) {}//        取消註冊broker=》this.unregisterBrokerAll();if (this.sendMessageExecutor != null) {this.sendMessageExecutor.shutdown();}if (this.pullMessageExecutor != null) {this.pullMessageExecutor.shutdown();}if (this.adminBrokerExecutor != null) {this.adminBrokerExecutor.shutdown();}if (this.brokerOuterAPI != null) {this.brokerOuterAPI.shutdown();}//        消費者offset持久化=》this.consumerOffsetManager.persist();if (this.filterServerManager != null) {this.filterServerManager.shutdown();}if (this.brokerFastFailure != null) {this.brokerFastFailure.shutdown();}if (this.consumerFilterManager != null) {//            消費者過濾信息持久化=》this.consumerFilterManager.persist();}if (this.clientManageExecutor != null) {this.clientManageExecutor.shutdown();}if (this.queryMessageExecutor != null) {this.queryMessageExecutor.shutdown();}if (this.consumerManageExecutor != null) {this.consumerManageExecutor.shutdown();}if (this.fileWatchService != null) {this.fileWatchService.shutdown();}    }

接下篇。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索