rocketmq源碼解析之管理請求查詢消費時間②

說在前面java

管理請求 QUERY_CONSUME_TIME_SPAN 查詢消費時間apache

 

源碼解析緩存

接上篇微信

按topic、queueId、consumerOffset查詢最大時間,往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getMessageStoreTimeStampapp

@Override
    public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
//        按topic和queueId查詢到消費隊列=》
        ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
        if (logicQueue != null) {
//            按消費者的offset查詢存儲時間所在的buffer=》
            SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
//            =》
            return getStoreTime(result);
        }

        return -1;
    }

按topic和queueId查詢到消費隊列,進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueueide

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
//        找到topic的全部消息隊列
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

//        按queue id查找消費者隊列
        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
//                消費者隊列存儲地址 user.home/store/consumequeue
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
//                每一個文件存儲默認30W
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

按消費者的offset查詢存儲時間所在的buffer,往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getIndexBufferthis

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
//        獲取最小的物理offset
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        if (offset >= this.getMinLogicOffset()) {
//            根據offset查詢映射文件 =》
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                return result;
            }
        }
        return null;
    }

根據offset查詢映射文件,進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)code

public MappedFile findMappedFileByOffset(final long offset) {
//        =》
        return findMappedFileByOffset(offset, false);
    }

進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)blog

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
//            獲取隊列中第一個映射文件
            MappedFile firstMappedFile = this.getFirstMappedFile();
//            獲取隊列中最後一個映射文件
            MappedFile lastMappedFile = this.getLastMappedFile();
            if (firstMappedFile != null && lastMappedFile != null) {
//                若是offset不在索引文件的offset範圍內
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
//                   找到映射文件在隊列中的索引位置
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
//                        獲取索引文件
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }

//                    offset在目標文件的起始offset和結束offset範圍內
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }

//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }

//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }

        return null;
    }

往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getStoreTime索引

private long getStoreTime(SelectMappedBufferResult result) {
        if (result != null) {
            try {
                final long phyOffset = result.getByteBuffer().getLong();
                final int size = result.getByteBuffer().getInt();
//                根據SelectMappedBufferResult的offset和大小查找存儲時間=》
                long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                return storeTime;
            } catch (Exception e) {
            } finally {
                result.release();
            }
        }
        return -1;
    }

根據SelectMappedBufferResult的offset和大小查找存儲時間,進入這個方法org.apache.rocketmq.store.CommitLog#pickupStoreTimestamp

public long pickupStoreTimestamp(final long offset, final int size) {
        if (offset >= this.getMinOffset()) {
//            =》
            SelectMappedBufferResult result = this.getMessage(offset, size);
            if (null != result) {
                try {
//                    獲取消息存儲時間
                    return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
                } finally {
                    result.release();
                }
            }
        }

        return -1;
    }

進入這個方法org.apache.rocketmq.store.CommitLog#getMessage

public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
//        根據offset找到映射文件 =》
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            return mappedFile.selectMappedBuffer(pos, size);
        }
        return null;
    }

根據offset找到映射文件,進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
//            獲取隊列中第一個映射文件
            MappedFile firstMappedFile = this.getFirstMappedFile();
//            獲取隊列中最後一個映射文件
            MappedFile lastMappedFile = this.getLastMappedFile();
            if (firstMappedFile != null && lastMappedFile != null) {
//                若是offset不在索引文件的offset範圍內
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
//                   找到映射文件在隊列中的索引位置
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
//                        獲取索引文件
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }

//                    offset在目標文件的起始offset和結束offset範圍內
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }

//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }

//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }

        return null;
    }

按consumerGroup、topic、queueId查詢consumerOffset,往上返回到這個方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)

public long queryOffset(final String group, final String topic, final int queueId) {
    // topic@group 從本地offset緩存中查詢
    String key = topic + TOPIC_GROUP_SEPARATOR + group;
    ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);
    if (null != map) {
        Long offset = map.get(queueId);
        if (offset != null)
            return offset;
    }

    return -1;
}

按topic、queueId、consumerOffset查詢消費時間,往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getMessageStoreTimeStamp

@Override
    public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {
//        按topic和queueId查詢到消費隊列=》
        ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);
        if (logicQueue != null) {
//            按消費者的offset查詢存儲時間所在的buffer=》
            SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);
//            =》
            return getStoreTime(result);
        }

        return -1;
    }

按topic和queueId查詢到消費隊列,進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue

public ConsumeQueue findConsumeQueue(String topic, int queueId) {
//        找到topic的全部消息隊列
        ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
        if (null == map) {
            ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
            ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
            if (oldMap != null) {
                map = oldMap;
            } else {
                map = newMap;
            }
        }

//        按queue id查找消費者隊列
        ConsumeQueue logic = map.get(queueId);
        if (null == logic) {
            ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
//                消費者隊列存儲地址 user.home/store/consumequeue
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
//                每一個文件存儲默認30W
                this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),
                this);
            ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
            if (oldLogic != null) {
                logic = oldLogic;
            } else {
                logic = newLogic;
            }
        }

        return logic;
    }

按消費者的offset查詢存儲時間所在的buffer,往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {
        int mappedFileSize = this.mappedFileSize;
//        獲取最小的物理offset
        long offset = startIndex * CQ_STORE_UNIT_SIZE;
        if (offset >= this.getMinLogicOffset()) {
//            根據offset查詢映射文件 =》
            MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);
            if (mappedFile != null) {
                SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));
                return result;
            }
        }
        return null;
    }

根據offset查詢映射文件,進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)

public MappedFile findMappedFileByOffset(final long offset) {
//        =》
        return findMappedFileByOffset(offset, false);
    }

進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
//            獲取隊列中第一個映射文件
            MappedFile firstMappedFile = this.getFirstMappedFile();
//            獲取隊列中最後一個映射文件
            MappedFile lastMappedFile = this.getLastMappedFile();
            if (firstMappedFile != null && lastMappedFile != null) {
//                若是offset不在索引文件的offset範圍內
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
//                   找到映射文件在隊列中的索引位置
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
//                        獲取索引文件
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }

//                    offset在目標文件的起始offset和結束offset範圍內
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }

//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }

//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }

        return null;
    }

往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getStoreTime

private long getStoreTime(SelectMappedBufferResult result) {
        if (result != null) {
            try {
                final long phyOffset = result.getByteBuffer().getLong();
                final int size = result.getByteBuffer().getInt();
//                根據SelectMappedBufferResult的offset和大小查找存儲時間=》
                long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size);
                return storeTime;
            } catch (Exception e) {
            } finally {
                result.release();
            }
        }
        return -1;
    }

根據SelectMappedBufferResult的offset和大小查找存儲時間,進入這個方法org.apache.rocketmq.store.CommitLog#pickupStoreTimestamp

public long pickupStoreTimestamp(final long offset, final int size) {
        if (offset >= this.getMinOffset()) {
//            =》
            SelectMappedBufferResult result = this.getMessage(offset, size);
            if (null != result) {
                try {
//                    獲取消息存儲時間
                    return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION);
                } finally {
                    result.release();
                }
            }
        }

        return -1;
    }

進入這個方法org.apache.rocketmq.store.CommitLog#getMessage

public SelectMappedBufferResult getMessage(final long offset, final int size) {
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();
//        根據offset找到映射文件 =》
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            int pos = (int) (offset % mappedFileSize);
            return mappedFile.selectMappedBuffer(pos, size);
        }
        return null;
    }

根據offset找到映射文件,進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {
        try {
//            獲取隊列中第一個映射文件
            MappedFile firstMappedFile = this.getFirstMappedFile();
//            獲取隊列中最後一個映射文件
            MappedFile lastMappedFile = this.getLastMappedFile();
            if (firstMappedFile != null && lastMappedFile != null) {
//                若是offset不在索引文件的offset範圍內
                if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {
                    LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",
                        offset,
                        firstMappedFile.getFileFromOffset(),
                        lastMappedFile.getFileFromOffset() + this.mappedFileSize,
                        this.mappedFileSize,
                        this.mappedFiles.size());
                } else {
//                   找到映射文件在隊列中的索引位置
                    int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));
                    MappedFile targetFile = null;
                    try {
//                        獲取索引文件
                        targetFile = this.mappedFiles.get(index);
                    } catch (Exception ignored) {
                    }

//                    offset在目標文件的起始offset和結束offset範圍內
                    if (targetFile != null && offset >= targetFile.getFileFromOffset()
                        && offset < targetFile.getFileFromOffset() + this.mappedFileSize) {
                        return targetFile;
                    }

//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件
                    for (MappedFile tmpMappedFile : this.mappedFiles) {
                        if (offset >= tmpMappedFile.getFileFromOffset()
                            && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {
                            return tmpMappedFile;
                        }
                    }
                }

//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意
                if (returnFirstOnNotFound) {
                    return firstMappedFile;
                }
            }
        } catch (Exception e) {
            log.error("findMappedFileByOffset Exception", e);
        }

        return null;
    }

接下篇

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索