說在前面apache
管理請求 GET_MIN_OFFSET 獲取最小的offset 微信
源碼解析this
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMinOffset 獲取最小的offsetcode
private RemotingCommand getMinOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetMinOffsetResponseHeader.class); final GetMinOffsetResponseHeader responseHeader = (GetMinOffsetResponseHeader) response.readCustomHeader(); final GetMinOffsetRequestHeader requestHeader = (GetMinOffsetRequestHeader) request.decodeCommandCustomHeader(GetMinOffsetRequestHeader.class); // 根據topic和queueId查找最小的offset =》 long offset = this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue 根據topic和queueId查詢最小的offsetblog
public long getMinOffsetInQueue(String topic, int queueId) { // 根據topic和queueId查詢消費者隊列 =》 ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { // 獲取隊列中的最小offset return logic.getMinOffsetInQueue(); } return -1; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 按topic和queueId查詢消費隊列,這個方法前面已介紹過隊列
public ConsumeQueue findConsumeQueue(String topic, int queueId) { ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } // 按queue id查找消費者隊列 ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, queueId, // 消費者隊列存儲地址 user.home/store/consumequeue StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // 每一個文件存儲默認30W this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMinOffset結束get
說在最後源碼
本次解析僅表明我的觀點,僅供參考。io
加入技術微信羣class
釘釘技術羣