說在前面apache
管理請求 CLEAN_UNUSED_TOPIC 清除無用的topic緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanUnusedTopic併發
public RemotingCommand cleanUnusedTopic() { log.warn("invoke cleanUnusedTopic start."); final RemotingCommand response = RemotingCommand.createResponseCommand(null); // =》 brokerController.getMessageStore().cleanUnusedTopic(brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); log.warn("invoke cleanUnusedTopic end."); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#cleanUnusedTopic清除無用的topicapp
@Override public int cleanUnusedTopic(Set<String> topics) { // 遍歷緩存的消息隊列 Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); String topic = next.getKey(); if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { // 消費隊列銷燬=》 cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", cq.getTopic(), cq.getQueueId() ); // 刪除消息隊列的offset=》 this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); } // topic所在的消息隊列刪除完畢後消費隊列所在的集合元素刪除 it.remove(); log.info("cleanUnusedTopic: {},topic destroyed", topic); } } return 0; }
進入這個方法org.apache.rocketmq.store.ConsumeQueue#destroy銷燬消費隊列ide
public void destroy() { this.maxPhysicOffset = -1; this.minLogicOffset = 0; // 映射文件隊列銷燬=》 this.mappedFileQueue.destroy(); if (isExtReadEnable()) { // 消費隊列銷燬=》 this.consumeQueueExt.destroy(); } }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#destroy映射隊列文件銷燬性能
public void destroy() { for (MappedFile mf : this.mappedFiles) { // 映射文件銷燬=》 mf.destroy(1000 * 3); } // 同步刪除映射文件隊列 this.mappedFiles.clear(); this.flushedWhere = 0; // delete parent directory 刪除父級文件夾 File file = new File(storePath); if (file.isDirectory()) { file.delete(); } }
進入這個方法org.apache.rocketmq.store.MappedFile#destroy映射文件銷燬this
public boolean destroy(final long intervalForcibly) { // =》 this.shutdown(intervalForcibly); if (this.isCleanupOver()) { try { // 關閉文件channel this.fileChannel.close(); log.info("close file channel " + this.fileName + " OK"); long beginTime = System.currentTimeMillis(); // 刪除文件 boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + this.getFlushedPosition() + ", " + UtilAll.computeEclipseTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn("close file channel " + this.fileName + " Failed. ", e); } return true; } else { log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + " Failed. cleanupOver: " + this.cleanupOver); } return false; }
往上返回到這個方法org.apache.rocketmq.store.ConsumeQueueExt#destroy消費隊列銷燬blog
public void destroy() { this.mappedFileQueue.destroy(); }
往上返回到這個方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable按topic、queueId刪除消費隊列隊列
public void removeQueueFromTopicQueueTable(final String topic, final int queueId) { String key = topic + "-" + queueId; // 這個地方同步用synchronized有點疑問,若是併發量比較小,synchronized性能也能夠的,可是併發量達到必定量級lock或者其餘無鎖實現 // 應該會好一點,難道消息隊列過時這種狀況出現過時未消費的機率較低 synchronized (this) { this.topicQueueTable.remove(key); } log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanUnusedTopic結束
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣