rocketmq源碼解析之管理請求之查詢broker全部消費組狀態

說在前面java

管理請求之查詢broker全部消費組狀態apache

 

源碼解析緩存

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#fetchAllConsumeStatsInBroker微信









private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);GetConsumeStatsInBrokerHeader requestHeader =(GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);boolean isOrder = requestHeader.isOrder();//        獲取消費組訂閱信息ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroups =brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList =new ArrayList<Map<String, List<ConsumeStats>>>();long totalDiff = 0L;for (String group : subscriptionGroups.keySet()) {Map<String, List<ConsumeStats>> subscripTopicConsumeMap = new HashMap<String, List<ConsumeStats>>();//            消費組訂閱了哪些topicSet<String> topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group);List<ConsumeStats> consumeStatsList = new ArrayList<ConsumeStats>();for (String topic : topics) {ConsumeStats consumeStats = new ConsumeStats();//                查詢topic的配置信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);if (null == topicConfig) {log.warn("consumeStats, topic config not exist, {}", topic);continue;}if (isOrder && !topicConfig.isOrder()) {continue;}{//                    按消費組和topic查詢訂閱信息=》SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);if (null == findSubscriptionData&& this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic);continue;}}//                topic寫隊列的數量默認16for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {MessageQueue mq = new MessageQueue();mq.setTopic(topic);mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());mq.setQueueId(i);OffsetWrapper offsetWrapper = new OffsetWrapper();//                    獲取隊列最大的offset=》long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);if (brokerOffset < 0)brokerOffset = 0;//                    查詢消費者的offset=》long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group,topic,i);if (consumerOffset < 0)consumerOffset = 0;offsetWrapper.setBrokerOffset(brokerOffset);offsetWrapper.setConsumerOffset(consumerOffset);long timeOffset = consumerOffset - 1;if (timeOffset >= 0) {//                        按topic、queueId、timeOffset查詢最後的時間 =》long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);if (lastTimestamp > 0) {offsetWrapper.setLastTimestamp(lastTimestamp);}}consumeStats.getOffsetTable().put(mq, offsetWrapper);}//                按消費組和topic獲取tps=》double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(group, topic);consumeTps += consumeStats.getConsumeTps();consumeStats.setConsumeTps(consumeTps);totalDiff += consumeStats.computeTotalDiff();consumeStatsList.add(consumeStats);}subscripTopicConsumeMap.put(group, consumeStatsList);brokerConsumeStatsList.add(subscripTopicConsumeMap);}ConsumeStatsList consumeStats = new ConsumeStatsList();consumeStats.setBrokerAddr(brokerController.getBrokerAddr());consumeStats.setConsumeStatsList(brokerConsumeStatsList);consumeStats.setTotalDiff(totalDiff);response.setBody(consumeStats.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}

 

進入這個方法,消費組訂閱了哪些topic,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#whichTopicByConsumerapp

 

 


public Set<String> whichTopicByConsumer(final String group) {Set<String> topics = new HashSet<String>();//        遍歷消費者組、topic緩存信息Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2) {if (group.equals(arrays[1])) {topics.add(arrays[0]);}}}return topics;}

 

進入這個方法,查詢topic的配置信息,org.apache.rocketmq.broker.topic.TopicConfigManager#selectTopicConfigide

 

 

public TopicConfig selectTopicConfig(final String topic) {//        從topic配置緩存信息中查詢當前topic的配置return this.topicConfigTable.get(topic);}

 

進入這個方法,按消費組和topic查詢訂閱信息,org.apache.rocketmq.broker.client.ConsumerManager#findSubscriptionDatafetch

public SubscriptionData findSubscriptionData(final String group, final String topic) {//        按組從緩存中獲取消費組信息ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);if (consumerGroupInfo != null) {//            按topic從緩存中獲取訂閱數據return consumerGroupInfo.findSubscriptionData(topic);}return null;}

 

進入這個方法,獲取隊列最大的offset,org.apache.rocketmq.store.DefaultMessageStore#getMaxOffsetInQueueui

public long getMaxOffsetInQueue(String topic, int queueId) {//        根據topic和queueId找到消費者隊列=》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {//            獲取最大的offset =》long offset = logic.getMaxOffsetInQueue();return offset;}//        若是不存在指定topic和queueId的消費隊列直接返回0return 0;}

 

進入這個方法,根據topic和queueId找到消費者隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueuethis

 


public ConsumeQueue findConsumeQueue(String topic, int queueId) {//        找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}//        按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,//                消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}

 

往上返回到這個方法, 獲取最大的offset,org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueuecode

 

public long getMaxOffsetInQueue() {//        =》return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;}

 

進入這個方法,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset

 

public long getMaxOffset() {//        獲取存儲映射文件隊列中索引位置最大的映射文件=》MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {//            映射文件的起始offset+映射文件的可讀取的索引位置return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}//        若是隊列中沒有存儲映射文件直接返回0return 0;}

 

進入這個方法,獲取存儲映射文件隊列中索引位置最大的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()

 


public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}

 

往上返回到這個方法,查詢消費者的offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)

 

public long queryOffset(final String group, final String topic, final int queueId) {// topic@group 從本地offset緩存中查詢String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1;}

 

進入這個方法,按topic、queueId、timeOffset查詢最後的時間,org.apache.rocketmq.store.DefaultMessageStore#getMessageStoreTimeStamp

 

@Overridepublic long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {//        按topic和queueId查詢到消費隊列=》ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);if (logicQueue != null) {//            按消費者的offset查詢存儲時間所在的buffer=》SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);//            =》return getStoreTime(result);}return -1;}

 

進入這個方法,按topic和queueId查詢到消費隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue

 


public ConsumeQueue findConsumeQueue(String topic, int queueId) {//        找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}//        按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,//                消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),//                每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}

 

往上返回到這個方法,按消費者的offset查詢存儲時間所在的buffer,org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer

 

public SelectMappedBufferResult getIndexBuffer(final long startIndex) {int mappedFileSize = this.mappedFileSize;//        獲取最小的物理offsetlong offset = startIndex * CQ_STORE_UNIT_SIZE;if (offset >= this.getMinLogicOffset()) {//            根據offset查詢映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);if (mappedFile != null) {SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));return result;}}return null;}

 

進入這個方法,根據offset查詢映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)

public MappedFile findMappedFileByOffset(final long offset) {//        =》return findMappedFileByOffset(offset, false);}

 

進入這個方法,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)




public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//            獲取隊列中第一個映射文件MappedFile firstMappedFile = this.getFirstMappedFile();//            獲取隊列中最後一個映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {//                若是offset不在索引文件的offset範圍內if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//                   找到映射文件在隊列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//                        獲取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//                    offset在目標文件的起始offset和結束offset範圍內if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;    }

往上返回到這個方法,按消費組和topic獲取tps,org.apache.rocketmq.store.stats.BrokerStatsManager#tpsGroupGetNums

public double tpsGroupGetNums(final String group, final String topic) {final String statsKey = buildStatsKey(topic, group);return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps();    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#fetchAllConsumeStatsInBroker結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索