rocketmq源碼解析之NamesrvController啓動②mqclient admin請求處理刪除topic

說在前面apache

今天主要介紹DELETE_TOPIC_IN_BROKER的處理邏輯。緩存

 

源碼解析微信

進入到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#deleteTopic併發

private synchronized RemotingCommand deleteTopic(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        DeleteTopicRequestHeader requestHeader =
            (DeleteTopicRequestHeader) request.decodeCommandCustomHeader(DeleteTopicRequestHeader.class);
        log.info("deleteTopic called by {}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
//        刪除topic=》
        this.brokerController.getTopicConfigManager().deleteTopicConfig(requestHeader.getTopic());
//        刪除客戶端消息隊列中的無用的topic=》
        this.brokerController.getMessageStore()
            .cleanUnusedTopic(this.brokerController.getTopicConfigManager().getTopicConfigTable().keySet());
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

進入到這個方法org.apache.rocketmq.broker.topic.TopicConfigManager#deleteTopicConfig 刪除內存和持久化的topicapp

public void deleteTopicConfig(final String topic) {
//        先刪除緩存
        TopicConfig old = this.topicConfigTable.remove(topic);
        if (old != null) {
            log.info("delete topic config OK, topic: {}", old);
//            更新數據版本號
            this.dataVersion.nextVersion();
//            刪除持久化文件中的topic=》
            this.persist();
        } else {
            log.warn("delete topic config failed, topic: {} not exists", topic);
        }
    }

進入到這個方法org.apache.rocketmq.store.DefaultMessageStore#cleanUnusedTopic 刪除無用的topicide

 

@Override
    public int cleanUnusedTopic(Set<String> topics) {
//        遍歷緩存的消息隊列
        Iterator<Entry<String, ConcurrentMap<Integer, ConsumeQueue>>> it = this.consumeQueueTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConcurrentMap<Integer, ConsumeQueue>> next = it.next();
            String topic = next.getKey();
            if (!topics.contains(topic) && !topic.equals(ScheduleMessageService.SCHEDULE_TOPIC)) {
                ConcurrentMap<Integer, ConsumeQueue> queueTable = next.getValue();
                for (ConsumeQueue cq : queueTable.values()) {
//                    消費隊列銷燬=》
                    cq.destroy();
                    log.info("cleanUnusedTopic: {} {} ConsumeQueue cleaned",
                        cq.getTopic(),
                        cq.getQueueId()
                    );
//                    刪除消息隊列=》
                    this.commitLog.removeQueueFromTopicQueueTable(cq.getTopic(), cq.getQueueId());
                }
//                topic所在的消息隊列刪除完畢後消費隊列所在的集合元素刪除
                it.remove();
                log.info("cleanUnusedTopic: {},topic destroyed", topic);
            }
        }

        return 0;
    }

進入到這個方法org.apache.rocketmq.store.ConsumeQueue#destroy 消息隊列銷燬性能

public void destroy() {
        this.maxPhysicOffset = -1;
        this.minLogicOffset = 0;
//        映射文件隊列銷燬=》
        this.mappedFileQueue.destroy();
        if (isExtReadEnable()) {
//            消費隊列銷燬=》
            this.consumeQueueExt.destroy();
        }
    }

進入這個方法org.apache.rocketmq.store.MappedFileQueue#destroy 映射文件隊列銷燬this

public void destroy() {
        for (MappedFile mf : this.mappedFiles) {
//        映射文件銷燬=》
            mf.destroy(1000 * 3);
        }
//        同步刪除映射文件隊列
        this.mappedFiles.clear();
        this.flushedWhere = 0;
        // delete parent directory 刪除父級文件夾
        File file = new File(storePath);
        if (file.isDirectory()) {
            file.delete();
        }
    }

進入這個方法org.apache.rocketmq.store.MappedFile#destroy 刪除映射文件code

public boolean destroy(final long intervalForcibly) {
//        =》
        this.shutdown(intervalForcibly);
        if (this.isCleanupOver()) {
            try {
//                關閉文件channel
                this.fileChannel.close();
                log.info("close file channel " + this.fileName + " OK");
                long beginTime = System.currentTimeMillis();
//                刪除文件
                boolean result = this.file.delete();
                log.info("delete file[REF:" + this.getRefCount() + "] " + this.fileName
                    + (result ? " OK, " : " Failed, ") + "W:" + this.getWrotePosition() + " M:"
                    + this.getFlushedPosition() + ", "
                    + UtilAll.computeEclipseTimeMilliseconds(beginTime));
            } catch (Exception e) {
                log.warn("close file channel " + this.fileName + " Failed. ", e);
            }

            return true;
        } else {
            log.warn("destroy mapped file[REF:" + this.getRefCount() + "] " + this.fileName
                + " Failed. cleanupOver: " + this.cleanupOver);
        }

        return false;
    }

進入這個方法org.apache.rocketmq.store.MappedFile#cleanup 清除資源blog

@Override
    public boolean cleanup(final long currentRef) {
        if (this.isAvailable()) {
            log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have not shutdown, stop unmapping.");
            return false;
        }

        if (this.isCleanupOver()) {
            log.error("this file[REF:" + currentRef + "] " + this.fileName
                + " have cleanup, do not do it again.");
            return true;
        }

//        清除映射緩衝區=》
        clean(this.mappedByteBuffer);
//        添加映射文件所佔虛擬內存
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(this.fileSize * (-1));
//        改變映射文件數量
        TOTAL_MAPPED_FILES.decrementAndGet();
        log.info("unmap file[REF:" + currentRef + "] " + this.fileName + " OK");
        return true;
    }

進入這個方法org.apache.rocketmq.store.MappedFile#clean 清除緩衝區

public static void clean(final ByteBuffer buffer) {
//        緩衝區合法且不是直接緩衝區
        if (buffer == null || !buffer.isDirect() || buffer.capacity() == 0)
            return;
//        執行刪除
        invoke(invoke(viewed(buffer), "cleaner"), "clean");
    }

往上返回到這個方法org.apache.rocketmq.store.CommitLog#removeQueueFromTopicQueueTable 從topic隊列信息中刪除隊列

public void removeQueueFromTopicQueueTable(final String topic, final int queueId) {
        String key = topic + "-" + queueId;
//        這個地方同步用synchronized有點疑問,若是併發量比較小,synchronized性能也能夠的,可是併發量達到必定量級lock或者其餘無鎖實現
//        應該會好一點,難道消息隊列過時這種狀況出現過時未消費的機率較低
        synchronized (this) {
            this.topicQueueTable.remove(key);
        }

        log.info("removeQueueFromTopicQueueTable OK Topic: {} QueueId: {}", topic, queueId);
    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#deleteTopic 解析完畢。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索