rocketmq源碼解析之管理請求獲取最小的offset

說在前面apache

管理請求 GET_MIN_OFFSET 獲取最小的offset 微信

 

源碼解析this

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMinOffset 獲取最小的offsetcode

private RemotingCommand getMinOffset(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class);
        final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader();
        final GetMinOffsetRequestHeader requestHeader =
            (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class);
//        根據topic和queueId查找最小的offset =》
        long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

進入這個方法org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue 根據topic和queueId查詢最小的offsetblog

public long getMinOffsetInQueue(String topic, int queueId) {
//        根據topic和queueId查詢消費者隊列 =》
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
//            獲取隊列中的最小offset
            return logic.getMinOffsetInQueue();
        }

        return -1;
    }

進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 按topic和queueId查詢消費隊列,這個方法前面已介紹過隊列

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

//        按queue id查找消費者隊列
        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
//                消費者隊列存儲地址 user.home/store/consumequeue
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
//                每一個文件存儲默認30W
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMinOffset結束get

 

說在最後源碼

本次解析僅表明我的觀點,僅供參考。io

 

加入技術微信羣class

釘釘技術羣

相關文章
相關標籤/搜索