rocketmq源碼解析之管理請求獲取最大的offset

說在前面apache

本次繼續解析管理請求,GET_MAX_OFFSET 獲取最大的offset安全

 

源碼解析微信

進入到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMaxOffset獲取最大的offset併發

private RemotingCommand getMaxOffset(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class);
        final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader();
        final GetMaxOffsetRequestHeader requestHeader =
            (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class);
//        根據topic和queueId獲取最大的offset =》
        long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId());
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

進入這個方法org.apache.rocketmq.store.DefaultMessageStore#getMaxOffsetInQueue 根據topic和queueId獲取最大的offsetapp

public long getMaxOffsetInQueue(String topic, int queueId) {
//        根據topic和queueId找到消費者隊列=》
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
//            獲取最大的offset =》
            long offset = logic.getMaxOffsetInQueue();
            return offset;
        }

//        若是不存在指定topic和queueId的消費隊列直接返回0
        return 0;
    }

進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 根據topic和queueId查詢消費隊列this

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

//        按queue id查找消費者隊列
        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
//                消費者隊列存儲地址 user.home/store/consumequeue
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
//                每一個文件存儲默認30W
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue 查詢最大的offset線程

public long getMaxOffsetInQueue() {
//        =》
        return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;
    }

進入這個方法org.apache.rocketmq.store.MappedFileQueue#getMaxOffsetcode

public long getMaxOffset() {
//        獲取存儲映射文件隊列中索引位置最大的映射文件
        MappedFile mappedFile = getLastMappedFile();
        if (mappedFile != null) {
//            映射文件的起始offset+映射文件的可讀取的索引位置
            return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();
        }
//        若是隊列中沒有存儲映射文件直接返回0
        return 0;
    }

進入這個方法org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile() 獲取映射文件集合中最後一個映射文件blog

public MappedFile getLastMappedFile() {
    MappedFile mappedFileLast = null;
    while (!this.mappedFiles.isEmpty()) {
        try {
            mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
            break;
        } catch (IndexOutOfBoundsException e) {
            //continue;
        } catch (Exception e) {
            log.error("getLastMappedFile has exception.", e);
            break;
        }
    }

    return mappedFileLast;
}

能夠看到映射文件集合是用CopyOnWriteArrayList實現索引

//    併發線程安全隊列存儲映射文件
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMaxOffset結束

 

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索