rocketmq源碼解析之NamesrvController啓動②mqclient admin請求處理之按時間查詢offset

說在前面apache

繼續解析管理請求之SEARCH_OFFSET_BY_TIMESTAMP 按時間查詢offset微信

 

源碼解析app

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#searchOffsetByTimestamp 按時間查詢offsetide

private RemotingCommand searchOffsetByTimestamp(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(SearchOffsetResponseHeader.class);
        final SearchOffsetResponseHeader responseHeader = (SearchOffsetResponseHeader) response.readCustomHeader();
        final SearchOffsetRequestHeader requestHeader =
            (SearchOffsetRequestHeader) request.decodeCommandCustomHeader(SearchOffsetRequestHeader.class);
//        按時間查詢offset=》
        long offset = this.brokerController.getMessageStore().getOffsetInQueueByTime(requestHeader.getTopic(), requestHeader.getQueueId(),
            requestHeader.getTimestamp());
        responseHeader.setOffset(offset);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

進入到這個方法org.apache.rocketmq.store.DefaultMessageStore#getOffsetInQueueByTime 按時間查詢offsetthis

public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) {
//        根據topic找到隊列羣 =》
        ConsumeQueue logic = this.findConsumeQueue(topic, queueId);
        if (logic != null) {
//            按時間查詢offset=》
            return logic.getOffsetInQueueByTime(timestamp);
        }

        return 0;
    }

進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 查詢消費隊列3d

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

//        按queue id查找消費者隊列
        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
//                消費者隊列存儲地址 user.home/store/consumequeue
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
//                每一個文件存儲默認30W
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime 按時間查詢offsetcode

public long getOffsetInQueueByTime(final long timestamp) {
//        根據時間找到映射的文件,文件能夠知道最後一次修改的時間 =》
        MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp);
        if (mappedFile != null) {
            long offset = 0;
            int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0;
            int high = 0;
            int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1;
            long leftIndexValue = -1L, rightIndexValue = -1L;
//            獲取最小的物理偏移量 =》
            long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset();
//            =》
            SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0);
            if (null != sbr) {
                ByteBuffer byteBuffer = sbr.getByteBuffer();
                high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE;
                try {
                    while (high >= low) {
                        midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE;
                        byteBuffer.position(midOffset);
                        long phyOffset = byteBuffer.getLong();
                        int size = byteBuffer.getInt();
                        if (phyOffset < minPhysicOffset) {
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            continue;
                        }

//                        按物理offset從commitLog中獲取存儲時間=》
                        long storeTime =
                            this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                        if (storeTime < 0) {
                            return 0;
                        } else if (storeTime == timestamp) {
                            targetOffset = midOffset;
                            break;
                        } else if (storeTime > timestamp) {
                            high = midOffset - CQ_STORE_UNIT_SIZE;
                            rightOffset = midOffset;
                            rightIndexValue = storeTime;
                        } else {
                            low = midOffset + CQ_STORE_UNIT_SIZE;
                            leftOffset = midOffset;
                            leftIndexValue = storeTime;
                        }
                    }

                    if (targetOffset != -1) {

                        offset = targetOffset;
                    } else {
                        if (leftIndexValue == -1) {

                            offset = rightOffset;
                        } else if (rightIndexValue == -1) {

                            offset = leftOffset;
                        } else {
                            offset =
                                Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp
                                    - rightIndexValue) ? rightOffset : leftOffset;
                        }
                    }

                    return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE;
                } finally {
//                    映射文件釋放=》
                    sbr.release();
                }
            }
        }
        return 0;
    }

進入這個方法org.apache.rocketmq.store.MappedFileQueue#getMappedFileByTime 按時間查詢映射文件blog

public MappedFile getMappedFileByTime(final long timestamp) {
        Object[] mfs = this.copyMappedFiles(0);
        if (null == mfs)
            return null;
        for (int i = 0; i < mfs.length; i++) {
            MappedFile mappedFile = (MappedFile) mfs[i];
//            若是文件的最後修改時間大於等於參數時間
            if (mappedFile.getLastModifiedTimestamp() >= timestamp) {
                return mappedFile;
            }
        }

        return (MappedFile) mfs[mfs.length - 1];
    }

往上返回進入到這個方法org.apache.rocketmq.store.DefaultMessageStore#getMinPhyOffset 獲取最小的物理offset索引

@Override
    public long getMinPhyOffset() {
//        獲取commitLog的最小偏移量 =》
        return this.commitLog.getMinOffset();
    }

進入這個方法org.apache.rocketmq.store.CommitLog#getMinOffset隊列

public long getMinOffset() {
        MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile();
        if (mappedFile != null) {
            if (mappedFile.isAvailable()) {
//                獲取映射文件的起始偏移量
                return mappedFile.getFileFromOffset();
            } else {
//                獲取下個文件的起始偏移量
                return this.rollNextFile(mappedFile.getFileFromOffset());
            }
        }

        return -1;
    }

往上返回到這個方法org.apache.rocketmq.store.CommitLog#pickupStoreTimestamp 按offset查詢存儲時間

public long pickupStoreTimestamp(final long offset, final int size) {
        if (offset >= this.getMinOffset()) {
//            =》
            SelectMappedBufferResult result = this.getMessage(offset, size);
            if (null != result) {
                try {
//                    獲取消息存儲時間
                    return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
                } finally {
                    result.release();
                }
            }
        }

        return -1;
    }

進入這個方法org.apache.rocketmq.store.CommitLog#getMessage 獲取時間、offset屬性存儲的映射buffer

public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
//        根據offset找到映射文件 =》
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            return mappedFile.selectMappedBuffer(pos, size);
        }
        return null;
    }

進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean) 按offset獲取映射文件

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
//            獲取隊列中第一個映射文件
            MappedFile firstMappedFile = this.getFirstMappedFile();
//            獲取隊列中最後一個映射文件
            MappedFile lastMappedFile = this.getLastMappedFile();
            if (firstMappedFile != null && lastMappedFile != null) {
//                若是offset不在索引文件的offset範圍內
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
//                   找到映射文件在隊列中的索引位置
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
//                        獲取索引文件
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }

//                    offset在目標文件的起始offset和結束offset範圍內
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }

//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }

//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }

        return null;
    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#searchOffsetByTimestamp 解析完畢。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索