說在前面apache
今天主要介紹DELETE_TOPIC_IN_BROKER的處理邏輯。緩存
源碼解析微信
進入到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#deleteTopic併發
private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); DeleteTopicRequestHeader requestHeader = (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class); log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel())); // 刪除topic=》 this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic()); // 刪除客戶端消息隊列中的無用的topic=》 this.brokerController.getMessageStore() .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet()); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入到這個方法org.apache.rocketmq.broker.topic.TopicConfigManager#deleteTopicConfig 刪除內存和持久化的topicapp
public void deleteTopicConfig(final String topic) { // 先刪除緩存 TopicConfig old = this.topicConfigTable.remove(topic); if (old != null) { log.info("delete topic config OK, topic: {}", old); // 更新數據版本號 this.dataVersion.nextVersion(); // 刪除持久化文件中的topic=》 this.persist(); } else { log.warn("delete topic config failed, topic: {} not exists", topic); } }
進入到這個方法org.apache.rocketmq.store.DefaultMessageStore#cleanUnusedTopic 刪除無用的topicide
@Override public int cleanUnusedTopic(Set<String> topics) { // 遍歷緩存的消息隊列 Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next(); String topic = next.getKey(); if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) { ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue(); for (ConsumeQueue cq : queueTable.values()) { // 消費隊列銷燬=》 cq.destroy(); log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned", cq.getTopic(), cq.getQueueId() ); // 刪除消息隊列=》 this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId()); } // topic所在的消息隊列刪除完畢後消費隊列所在的集合元素刪除 it.remove(); log.info("cleanUnusedTopic: {},topic destroyed", topic); } } return 0; }
進入到這個方法org.apache.rocketmq.store.ConsumeQueue#destroy 消息隊列銷燬性能
public void destroy() { this.maxPhysicOffset = -1; this.minLogicOffset = 0; // 映射文件隊列銷燬=》 this.mappedFileQueue.destroy(); if (isExtReadEnable()) { // 消費隊列銷燬=》 this.consumeQueueExt.destroy(); } }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#destroy 映射文件隊列銷燬this
public void destroy() { for (MappedFile mf : this.mappedFiles) { // 映射文件銷燬=》 mf.destroy(1000 * 3); } // 同步刪除映射文件隊列 this.mappedFiles.clear(); this.flushedWhere = 0; // delete parent directory 刪除父級文件夾 File file = new File(storePath); if (file.isDirectory()) { file.delete(); } }
進入這個方法org.apache.rocketmq.store.MappedFile#destroy 刪除映射文件code
public boolean destroy(final long intervalForcibly) { // =》 this.shutdown(intervalForcibly); if (this.isCleanupOver()) { try { // 關閉文件channel this.fileChannel.close(); log.info("close file channel " + this.fileName + " OK"); long beginTime = System.currentTimeMillis(); // 刪除文件 boolean result = this.file.delete(); log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:" + this.getFlushedPosition() + ", " + UtilAll.computeEclipseTimeMilliseconds(beginTime)); } catch (Exception e) { log.warn("close file channel " + this.fileName + " Failed. ", e); } return true; } else { log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName + " Failed. cleanupOver: " + this.cleanupOver); } return false; }
進入這個方法org.apache.rocketmq.store.MappedFile#cleanup 清除資源blog
@Override public boolean cleanup(final long currentRef) { if (this.isAvailable()) { log.error("this file[REF:" + currentRef + "] " + this.fileName + " have not shutdown, stop unmapping."); return false; } if (this.isCleanupOver()) { log.error("this file[REF:" + currentRef + "] " + this.fileName + " have cleanup, do not do it again."); return true; } // 清除映射緩衝區=》 clean(this.mappedByteBuffer); // 添加映射文件所佔虛擬內存 TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1)); // 改變映射文件數量 TOTAL_MAPPED_FILES.decrementAndGet(); log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK"); return true; }
進入這個方法org.apache.rocketmq.store.MappedFile#clean 清除緩衝區
public static void clean(final ByteBuffer buffer) { // 緩衝區合法且不是直接緩衝區 if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0) return; // 執行刪除 invoke(invoke(viewed(buffer), "cleaner"), "clean"); }
往上返回到這個方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable 從topic隊列信息中刪除隊列
public void removeQueueFromTopicQueueTable(final String topic, final int queueId) { String key = topic + "-" + queueId; // 這個地方同步用synchronized有點疑問,若是併發量比較小,synchronized性能也能夠的,可是併發量達到必定量級lock或者其餘無鎖實現 // 應該會好一點,難道消息隊列過時這種狀況出現過時未消費的機率較低 synchronized (this) { this.topicQueueTable.remove(key); } log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId); }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#deleteTopic 解析完畢。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣