說在前面java
管理請求 QUERY_CONSUME_TIME_SPAN 查詢消費時間apache
源碼解析緩存
接上篇微信
按topic、queueId、consumerOffset查詢最大時間,往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getMessageStoreTimeStampapp
@Override public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { // 按topic和queueId查詢到消費隊列=》 ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); if (logicQueue != null) { // 按消費者的offset查詢存儲時間所在的buffer=》 SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset); // =》 return getStoreTime(result); } return -1; }
按topic和queueId查詢到消費隊列,進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueueide
public ConsumeQueue findConsumeQueue(String topic, int queueId) { // 找到topic的全部消息隊列 ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } // 按queue id查找消費者隊列 ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, queueId, // 消費者隊列存儲地址 user.home/store/consumequeue StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // 每一個文件存儲默認30W this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }
按消費者的offset查詢存儲時間所在的buffer,往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getIndexBufferthis
public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; // 獲取最小的物理offset long offset = startIndex * CQ_STORE_UNIT_SIZE; if (offset >= this.getMinLogicOffset()) { // 根據offset查詢映射文件 =》 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); return result; } } return null; }
根據offset查詢映射文件,進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)code
public MappedFile findMappedFileByOffset(final long offset) { // =》 return findMappedFileByOffset(offset, false); }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)blog
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { // 獲取隊列中第一個映射文件 MappedFile firstMappedFile = this.getFirstMappedFile(); // 獲取隊列中最後一個映射文件 MappedFile lastMappedFile = this.getLastMappedFile(); if (firstMappedFile != null && lastMappedFile != null) { // 若是offset不在索引文件的offset範圍內 if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, this.mappedFiles.size()); } else { // 找到映射文件在隊列中的索引位置 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 獲取索引文件 targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } // offset在目標文件的起始offset和結束offset範圍內 if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } // 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意 if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); } return null; }
往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getStoreTime索引
private long getStoreTime(SelectMappedBufferResult result) { if (result != null) { try { final long phyOffset = result.getByteBuffer().getLong(); final int size = result.getByteBuffer().getInt(); // 根據SelectMappedBufferResult的offset和大小查找存儲時間=》 long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); return storeTime; } catch (Exception e) { } finally { result.release(); } } return -1; }
根據SelectMappedBufferResult的offset和大小查找存儲時間,進入這個方法org.apache.rocketmq.store.CommitLog#pickupStoreTimestamp
public long pickupStoreTimestamp(final long offset, final int size) { if (offset >= this.getMinOffset()) { // =》 SelectMappedBufferResult result = this.getMessage(offset, size); if (null != result) { try { // 獲取消息存儲時間 return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); } finally { result.release(); } } } return -1; }
進入這個方法org.apache.rocketmq.store.CommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); // 根據offset找到映射文件 =》 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); } return null; }
根據offset找到映射文件,進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { // 獲取隊列中第一個映射文件 MappedFile firstMappedFile = this.getFirstMappedFile(); // 獲取隊列中最後一個映射文件 MappedFile lastMappedFile = this.getLastMappedFile(); if (firstMappedFile != null && lastMappedFile != null) { // 若是offset不在索引文件的offset範圍內 if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, this.mappedFiles.size()); } else { // 找到映射文件在隊列中的索引位置 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 獲取索引文件 targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } // offset在目標文件的起始offset和結束offset範圍內 if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } // 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意 if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); } return null; }
按consumerGroup、topic、queueId查詢consumerOffset,往上返回到這個方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)
public long queryOffset(final String group, final String topic, final int queueId) { // topic@group 從本地offset緩存中查詢 String key = topic + TOPIC_GROUP_SEPARATOR + group; ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) return offset; } return -1; }
按topic、queueId、consumerOffset查詢消費時間,往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getMessageStoreTimeStamp
@Override public long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) { // 按topic和queueId查詢到消費隊列=》 ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId); if (logicQueue != null) { // 按消費者的offset查詢存儲時間所在的buffer=》 SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset); // =》 return getStoreTime(result); } return -1; }
按topic和queueId查詢到消費隊列,進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue
public ConsumeQueue findConsumeQueue(String topic, int queueId) { // 找到topic的全部消息隊列 ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } // 按queue id查找消費者隊列 ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, queueId, // 消費者隊列存儲地址 user.home/store/consumequeue StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // 每一個文件存儲默認30W this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }
按消費者的offset查詢存儲時間所在的buffer,往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer
public SelectMappedBufferResult getIndexBuffer(final long startIndex) { int mappedFileSize = this.mappedFileSize; // 獲取最小的物理offset long offset = startIndex * CQ_STORE_UNIT_SIZE; if (offset >= this.getMinLogicOffset()) { // 根據offset查詢映射文件 =》 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset); if (mappedFile != null) { SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize)); return result; } } return null; }
根據offset查詢映射文件,進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)
public MappedFile findMappedFileByOffset(final long offset) { // =》 return findMappedFileByOffset(offset, false); }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { // 獲取隊列中第一個映射文件 MappedFile firstMappedFile = this.getFirstMappedFile(); // 獲取隊列中最後一個映射文件 MappedFile lastMappedFile = this.getLastMappedFile(); if (firstMappedFile != null && lastMappedFile != null) { // 若是offset不在索引文件的offset範圍內 if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, this.mappedFiles.size()); } else { // 找到映射文件在隊列中的索引位置 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 獲取索引文件 targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } // offset在目標文件的起始offset和結束offset範圍內 if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } // 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意 if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); } return null; }
往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getStoreTime
private long getStoreTime(SelectMappedBufferResult result) { if (result != null) { try { final long phyOffset = result.getByteBuffer().getLong(); final int size = result.getByteBuffer().getInt(); // 根據SelectMappedBufferResult的offset和大小查找存儲時間=》 long storeTime = this.getCommitLog().pickupStoreTimestamp(phyOffset, size); return storeTime; } catch (Exception e) { } finally { result.release(); } } return -1; }
根據SelectMappedBufferResult的offset和大小查找存儲時間,進入這個方法org.apache.rocketmq.store.CommitLog#pickupStoreTimestamp
public long pickupStoreTimestamp(final long offset, final int size) { if (offset >= this.getMinOffset()) { // =》 SelectMappedBufferResult result = this.getMessage(offset, size); if (null != result) { try { // 獲取消息存儲時間 return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); } finally { result.release(); } } } return -1; }
進入這個方法org.apache.rocketmq.store.CommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); // 根據offset找到映射文件 =》 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); } return null; }
根據offset找到映射文件,進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { // 獲取隊列中第一個映射文件 MappedFile firstMappedFile = this.getFirstMappedFile(); // 獲取隊列中最後一個映射文件 MappedFile lastMappedFile = this.getLastMappedFile(); if (firstMappedFile != null && lastMappedFile != null) { // 若是offset不在索引文件的offset範圍內 if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, this.mappedFiles.size()); } else { // 找到映射文件在隊列中的索引位置 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 獲取索引文件 targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } // offset在目標文件的起始offset和結束offset範圍內 if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } // 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意 if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); } return null; }
接下篇
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣