rocketmq源碼解析消費者管理處理器③

說在前面java

消費者管理處理器,查詢消費者offsetapache

 

源碼解析緩存

進入這個方法,查詢消費者offset,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset微信



private RemotingCommand queryConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryConsumerOffsetResponseHeader.class);final QueryConsumerOffsetResponseHeader responseHeader =(QueryConsumerOffsetResponseHeader) response.readCustomHeader();final QueryConsumerOffsetRequestHeader requestHeader =(QueryConsumerOffsetRequestHeader) request.decodeCommandCustomHeader(QueryConsumerOffsetRequestHeader.class);//        查詢offset=》long offset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());if (offset >= 0) {responseHeader.setOffset(offset);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {//            獲取最小的offset=》long minOffset =this.brokerController.getMessageStore().getMinOffsetInQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (minOffset <= 0//                    檢查持久化的offset=》&& !this.brokerController.getMessageStore().checkInDiskByConsumeOffset(requestHeader.getTopic(), requestHeader.getQueueId(), 0)) {responseHeader.setOffset(0L);response.setCode(ResponseCode.SUCCESS);response.setRemark(null);} else {response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("Not found, V3_0_6_SNAPSHOT maybe this group consumer boot first");}}return response;    }

進入這個方法,查詢offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)app

public long queryOffset(final String group, final String topic, final int queueId) {// topic@group 從本地offset緩存中查詢String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1;    }

返回到這個方法,獲取最小的offset,org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueueide

public long getMinOffsetInQueue(String topic, int queueId) {//        根據topic和queueId查詢消費者隊列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {//            獲取隊列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1;    }

進入這個方法,根據topic和queueId查詢消費者隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueuethis


public ConsumeQueue findConsumeQueue(String topic, int queueId) {//        找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}//        按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,//                消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;    }

往上返回到這個方法,檢查持久化的offset,org.apache.rocketmq.store.DefaultMessageStore#checkInDiskByConsumeOffsetcode



@Overridepublic boolean checkInDiskByConsumeOffset(final String topic, final int queueId, long consumeOffset) {//        從commitLog中獲取最大的offset=》final long maxOffsetPy = this.commitLog.getMaxOffset();//        按隊列id和topic查找消息隊列=》ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);if (consumeQueue != null) {SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(consumeOffset);if (bufferConsumeQueue != null) {try {for (int i = 0; i < bufferConsumeQueue.getSize(); ) {i += ConsumeQueue.CQ_STORE_UNIT_SIZE;long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();return checkInDiskByCommitOffset(offsetPy, maxOffsetPy);}} finally {bufferConsumeQueue.release();}} else {return false;}}return false;    }

進入這個方法,從commitLog中獲取最大的offset,org.apache.rocketmq.store.CommitLog#getMaxOffsetblog

public long getMaxOffset() {return this.mappedFileQueue.getMaxOffset();    }

往上返回到這個方法,按隊列id和topic查找消息隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue上面介紹過了。隊列

往上返回到這個方法,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#queryConsumerOffset結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索