rocketmq源碼解析之管理請求清除無用的topic

說在前面apache

管理請求 CLEAN_UNUSED_TOPIC 清除無用的topic緩存

 

源碼解析微信

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanUnusedTopic併發

public RemotingCommand cleanUnusedTopic() {
        log.warn("invoke cleanUnusedTopic start.");
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
//        =》
        brokerController.getMessageStore().cleanUnusedTopic(brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
        log.warn("invoke cleanUnusedTopic end.");
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

進入這個方法org.apache.rocketmq.store.DefaultMessageStore#cleanUnusedTopic清除無用的topicapp

@Override
    public int cleanUnusedTopic(Set<String> topics) {
//        遍歷緩存的消息隊列
        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
            String topic = next.getKey();
            if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
                for (ConsumeQueue cq : queueTable.values()) {
//                    消費隊列銷燬=》
                    cq.destroy();
                    log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
                        cq.getTopic(),
                        cq.getQueueId()
                    );
//                    刪除消息隊列的offset=》
                    this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
                }
//                topic所在的消息隊列刪除完畢後消費隊列所在的集合元素刪除
                it.remove();
                log.info("cleanUnusedTopic: {},topic destroyed", topic);
            }
        }

        return 0;
    }

進入這個方法org.apache.rocketmq.store.ConsumeQueue#destroy銷燬消費隊列ide

public void destroy() {
        this.maxPhysicOffset = -1;
        this.minLogicOffset = 0;
//        映射文件隊列銷燬=》
        this.mappedFileQueue.destroy();
        if (isExtReadEnable()) {
//            消費隊列銷燬=》
            this.consumeQueueExt.destroy();
        }
    }

進入這個方法org.apache.rocketmq.store.MappedFileQueue#destroy映射隊列文件銷燬性能

public void destroy() {
        for (MappedFile mf : this.mappedFiles) {
//        映射文件銷燬=》
            mf.destroy(1000 * 3);
        }
//        同步刪除映射文件隊列
        this.mappedFiles.clear();
        this.flushedWhere = 0;
        // delete parent directory 刪除父級文件夾
        File file = new File(storePath);
        if (file.isDirectory()) {
            file.delete();
        }
    }

進入這個方法org.apache.rocketmq.store.MappedFile#destroy映射文件銷燬this

public boolean destroy(final long intervalForcibly) {
//        =》
        this.shutdown(intervalForcibly);
        if (this.isCleanupOver()) {
            try {
//                關閉文件channel
                this.fileChannel.close();
                log.info("close file channel " + this.fileName + " OK");
                long beginTime = System.currentTimeMillis();
//                刪除文件
                boolean result = this.file.delete();
                log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                    + this.getFlushedPosition() + ", "
                    + UtilAll.computeEclipseTimeMilliseconds(beginTime));
            } catch (Exception e) {
                log.warn("close file channel " + this.fileName + " Failed. ", e);
            }

            return true;
        } else {
            log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
                + " Failed. cleanupOver: " + this.cleanupOver);
        }

        return false;
    }

往上返回到這個方法org.apache.rocketmq.store.ConsumeQueueExt#destroy消費隊列銷燬blog

public void destroy() {
    this.mappedFileQueue.destroy();
}

往上返回到這個方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable按topic、queueId刪除消費隊列隊列

public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
        String key = topic + "-" + queueId;
//        這個地方同步用synchronized有點疑問,若是併發量比較小,synchronized性能也能夠的,可是併發量達到必定量級lock或者其餘無鎖實現
//        應該會好一點,難道消息隊列過時這種狀況出現過時未消費的機率較低
        synchronized (this) {
            this.topicQueueTable.remove(key);
        }

        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cleanUnusedTopic結束

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索