說在前面java
消費者管理處理器,查詢消費者offsetapache
源碼解析緩存
進入這個方法,查詢消費者offset,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset微信
private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);final QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.readCustomHeader();final QueryConsumerOffsetRequestHeader requestHeader =(QueryConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);// 查詢offset=》long offset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());if (offset >= 0) {responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {// 獲取最小的offset=》long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (minOffset <= 0// 檢查持久化的offset=》&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}}return response; }
進入這個方法,查詢offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)app
public long queryOffset(final String group, final String topic, final int queueId) {// topic@group 從本地offset緩存中查詢String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1; }
返回到這個方法,獲取最小的offset,org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueueide
public long getMinOffsetInQueue(String topic, int queueId) {// 根據topic和queueId查詢消費者隊列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {// 獲取隊列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1; }
進入這個方法,根據topic和queueId查詢消費者隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueuethis
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic; }
往上返回到這個方法,檢查持久化的offset,org.apache.rocketmq.store.DefaultMessageStore#checkInDiskByConsumeOffsetcode
@Overridepublic boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) {// 從commitLog中獲取最大的offset=》final long maxOffsetPy = this.commitLog.getMaxOffset();// 按隊列id和topic查找消息隊列=》ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);if (consumeQueue != null) {SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset);if (bufferConsumeQueue != null) {try {for (int i = 0; i < bufferConsumeQueue.getSize(); ) {i += ConsumeQueue.CQ_STORE_UNIT_SIZE;long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);}} finally {bufferConsumeQueue.release();}} else {return false;}}return false; }
進入這個方法,從commitLog中獲取最大的offset,org.apache.rocketmq.store.CommitLog#getMaxOffsetblog
public long getMaxOffset() {return this.mappedFileQueue.getMaxOffset(); }
往上返回到這個方法,按隊列id和topic查找消息隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue上面介紹過了。隊列
往上返回到這個方法,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣