說在前面apache
管理請求 CLEAN_EXPIRED_CONSUMEQUEUE 清除過時的消費隊列微信
源碼解析併發
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanExpiredConsumeQueueapp
public RemotingCommand cleanExpiredConsumeQueue() { log.warn("invoke cleanExpiredConsumeQueue start."); final RemotingCommand response = RemotingCommand.createResponseCommand(null); // =》 brokerController.getMessageStore().cleanExpiredConsumerQueue(); log.warn("invoke cleanExpiredConsumeQueue end."); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#cleanExpiredConsumerQueue清除過時的消費隊列性能
public void cleanExpiredConsumerQueue() { // 從commitLog中獲取最小的offset=》 long minCommitLogOffset = this.commitLog.getMinOffset(); Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); String topic = next.getKey(); if (!topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); Iterator<Entry<Integer, ConsumeQueue>> itQT = queueTable.entrySet().iterator(); while (itQT.hasNext()) { Entry<Integer, ConsumeQueue> nextQT = itQT.next(); // 獲取消息隊列的lastOffset=》 long maxCLOffsetInConsumeQueue = nextQT.getValue().getLastOffset(); if (maxCLOffsetInConsumeQueue == -1) { log.warn("maybe ConsumeQueue was created just now. topic={} queueId={} maxPhysicOffset={} minLogicOffset={}.", nextQT.getValue().getTopic(), nextQT.getValue().getQueueId(), nextQT.getValue().getMaxPhysicOffset(), nextQT.getValue().getMinLogicOffset()); // 正常狀況下應該等於,若是是小於說明有的消息隊列已過時 } else if (maxCLOffsetInConsumeQueue < minCommitLogOffset) { log.info( "cleanExpiredConsumerQueue: {} {} consumer queue destroyed, minCommitLogOffset: {} maxCLOffsetInConsumeQueue: {}", topic, nextQT.getKey(), minCommitLogOffset, maxCLOffsetInConsumeQueue); // 按topic和queueId刪除topic和queue的offset=》 DefaultMessageStore.this.commitLog.removeQueueFromTopicQueueTable(nextQT.getValue().getTopic(), nextQT.getValue().getQueueId()); nextQT.getValue().destroy(); // 消息隊列銷燬了,消息隊列所在元素刪除 itQT.remove(); } } if (queueTable.isEmpty()) { log.info("cleanExpiredConsumerQueue: {},topic destroyed", topic); // topic銷燬了,消息隊列所在的元素刪除 it.remove(); } } } }
進入這個方法org.apache.rocketmq.store.CommitLog#getMinOffset獲取最小的offsetthis
public long getMinOffset() { // 獲取第一個映射文件=》 MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); if (mappedFile != null) { if (mappedFile.isAvailable()) { // 獲取映射文件的起始偏移量 return mappedFile.getFileFromOffset(); } else { // 獲取下個文件的起始偏移量=》 return this.rollNextFile(mappedFile.getFileFromOffset()); } } return -1; }
進入到這個方法org.apache.rocketmq.store.MappedFileQueue#getFirstMappedFile獲取映射隊列中第一個映射文件3d
public MappedFile getFirstMappedFile() { MappedFile mappedFileFirst = null; if (!this.mappedFiles.isEmpty()) { try { mappedFileFirst = this.mappedFiles.get(0); } catch (IndexOutOfBoundsException e) { //ignore } catch (Exception e) { log.error("getFirstMappedFile has exception.", e); } } return mappedFileFirst; }
進入到這個方法org.apache.rocketmq.store.CommitLog#rollNextFile獲取到下個映射文件的起始偏移量blog
public long rollNextFile(final long offset) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); return offset + mappedFileSize - offset % mappedFileSize; }
往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getLastOffset獲取最後的offset隊列
public long getLastOffset() { long lastOffset = -1; int logicFileSize = this.mappedFileSize; // 獲取映射文件隊列中最後一個映射文件=》 MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(); if (mappedFile != null) { // 有點疑問,這裏是獲取的commitOffset嗎 int position = mappedFile.getWrotePosition() - CQ_STORE_UNIT_SIZE; if (position < 0) position = 0; ByteBuffer byteBuffer = mappedFile.sliceByteBuffer(); byteBuffer.position(position); for (int i = 0; i < logicFileSize; i += CQ_STORE_UNIT_SIZE) { long offset = byteBuffer.getLong(); int size = byteBuffer.getInt(); byteBuffer.getLong(); if (offset >= 0 && size > 0) { lastOffset = offset + size; } else { break; } } } return lastOffset; }
進入到這個方法org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()獲取映射隊列中最後的映射文件rem
public MappedFile getLastMappedFile() { MappedFile mappedFileLast = null; while (!this.mappedFiles.isEmpty()) { try { mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1); break; } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error("getLastMappedFile has exception.", e); break; } } return mappedFileLast; }
往上返回到這個方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable按topic、queueId刪除消費隊列
public void removeQueueFromTopicQueueTable(final String topic, final int queueId) { String key = topic + "-" + queueId; // 這個地方同步用synchronized有點疑問,若是併發量比較小,synchronized性能也能夠的,可是併發量達到必定量級lock或者其餘無鎖實現 // 應該會好一點,難道消息隊列過時這種狀況出現過時未消費的機率較低 synchronized (this) { this.topicQueueTable.remove(key); } log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanExpiredConsumeQueue結束
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣