rocketmq源碼解析之管理請求查詢消費隊列

說在前面java

管理請求之查詢消費隊列apache

 

源碼解析緩存

進入這個方法,org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryConsumeQueue微信











private RemotingCommand queryConsumeQueue(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {QueryConsumeQueueRequestHeader requestHeader =(QueryConsumeQueueRequestHeader) request.decodeCommandCustomHeader(QueryConsumeQueueRequestHeader.class);RemotingCommand response = RemotingCommand.createResponseCommand(null);//        獲取消費隊列=》ConsumeQueue consumeQueue = this.brokerController.getMessageStore().getConsumeQueue(requestHeader.getTopic(),requestHeader.getQueueId());if (consumeQueue == null) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("%d@%s is not exist!", requestHeader.getQueueId(), requestHeader.getTopic()));return response;}QueryConsumeQueueResponseBody body = new QueryConsumeQueueResponseBody();response.setCode(ResponseCode.SUCCESS);response.setBody(body.encode());body.setMaxQueueIndex(consumeQueue.getMaxOffsetInQueue());body.setMinQueueIndex(consumeQueue.getMinOffsetInQueue());MessageFilter messageFilter = null;if (requestHeader.getConsumerGroup() != null) {//            按消費組、topic查詢訂閱信息=》SubscriptionData subscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getConsumerGroup(), requestHeader.getTopic());body.setSubscriptionData(subscriptionData);if (subscriptionData == null) {body.setFilterData(String.format("%s@%s is not online!", requestHeader.getConsumerGroup(), requestHeader.getTopic()));} else {//                獲取消費者過濾信息=》ConsumerFilterData filterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(), requestHeader.getConsumerGroup());body.setFilterData(JSON.toJSONString(filterData, true));messageFilter = new ExpressionMessageFilter(subscriptionData, filterData,this.brokerController.getConsumerFilterManager());}}//        根據index獲取selectMappedBufferResult=》SelectMappedBufferResult result = consumeQueue.getIndexBuffer(requestHeader.getIndex());if (result == null) {response.setRemark(String.format("Index %d of %d@%s is not exist!", requestHeader.getIndex(), requestHeader.getQueueId(), requestHeader.getTopic()));return response;}try {List<ConsumeQueueData> queues = new ArrayList<>();for (int i = 0; i < result.getSize() && i < requestHeader.getCount() * ConsumeQueue.CQ_STORE_UNIT_SIZE; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {ConsumeQueueData one = new ConsumeQueueData();one.setPhysicOffset(result.getByteBuffer().getLong());one.setPhysicSize(result.getByteBuffer().getInt());one.setTagsCode(result.getByteBuffer().getLong());if (!consumeQueue.isExtAddr(one.getTagsCode())) {queues.add(one);continue;}ConsumeQueueExt.CqExtUnit cqExtUnit = consumeQueue.getExt(one.getTagsCode());if (cqExtUnit != null) {one.setExtendDataJson(JSON.toJSONString(cqExtUnit));if (cqExtUnit.getFilterBitMap() != null) {one.setBitMap(BitsArray.create(cqExtUnit.getFilterBitMap()).toString());}if (messageFilter != null) {one.setEval(messageFilter.isMatchedByConsumeQueue(cqExtUnit.getTagsCode(), cqExtUnit));}} else {one.setMsg("Cq extend not exist!addr: " + one.getTagsCode());}queues.add(one);}body.setQueueData(queues);} finally {result.release();}return response;    }

進入這個方法, 獲取消費隊列,org.apache.rocketmq.store.DefaultMessageStore#getConsumeQueueapp

@Overridepublic ConsumeQueue getConsumeQueue(String topic, int queueId) {ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (map == null) {return null;}return map.get(queueId);    }

往上返回到這個方法,按消費組、topic查詢訂閱信息,org.apache.rocketmq.broker.client.ConsumerManager#findSubscriptionDataide

public SubscriptionData findSubscriptionData(final String group, final String topic) {//        按組從緩存中獲取消費組信息ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);if (consumerGroupInfo != null) {//            按topic從緩存中獲取訂閱數據return consumerGroupInfo.findSubscriptionData(topic);}return null;    }

往上返回到這個方法,獲取消費者過濾信息,org.apache.rocketmq.broker.filter.ConsumerFilterManager#get(java.lang.String, java.lang.String)this

public ConsumerFilterData get(final String topic, final String consumerGroup) {if (!this.filterDataByTopic.containsKey(topic)) {return null;}if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {return null;}return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup);    }

往上返回到這個方法,根據index獲取selectMappedBufferResult,org.apache.rocketmq.store.ConsumeQueue#getIndexBuffercode

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {int mappedFileSize = this.mappedFileSize;//        獲取最小的物理offsetlong offset = startIndex * CQ_STORE_UNIT_SIZE;if (offset >= this.getMinLogicOffset()) {//            根據offset查詢映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);if (mappedFile != null) {SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));return result;}}return null;    }

進入這個方法, 根據offset查詢映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)orm

public MappedFile findMappedFileByOffset(final long offset) {//        =》return findMappedFileByOffset(offset, false);    }

進入這個方法,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)blog




public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//            獲取隊列中第一個映射文件MappedFile firstMappedFile = this.getFirstMappedFile();//            獲取隊列中最後一個映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {//                若是offset不在索引文件的offset範圍內if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//                   找到映射文件在隊列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//                        獲取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//                    offset在目標文件的起始offset和結束offset範圍內if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryConsumeQueue結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索