說在前面java
管理請求之查詢broker全部消費組狀態apache
源碼解析緩存
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#fetchAllConsumeStatsInBroker微信
private RemotingCommand fetchAllConsumeStatsInBroker(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);GetConsumeStatsInBrokerHeader requestHeader =(GetConsumeStatsInBrokerHeader) request.decodeCommandCustomHeader(GetConsumeStatsInBrokerHeader.class);boolean isOrder = requestHeader.isOrder();// 獲取消費組訂閱信息ConcurrentMap<String, SubscriptionGroupConfig> subscriptionGroups =brokerController.getSubscriptionGroupManager().getSubscriptionGroupTable();List<Map<String/* subscriptionGroupName */, List<ConsumeStats>>> brokerConsumeStatsList =new ArrayList<Map<String, List<ConsumeStats>>>();long totalDiff = 0L;for (String group : subscriptionGroups.keySet()) {Map<String, List<ConsumeStats>> subscripTopicConsumeMap = new HashMap<String, List<ConsumeStats>>();// 消費組訂閱了哪些topicSet<String> topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(group);List<ConsumeStats> consumeStatsList = new ArrayList<ConsumeStats>();for (String topic : topics) {ConsumeStats consumeStats = new ConsumeStats();// 查詢topic的配置信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);if (null == topicConfig) {log.warn("consumeStats, topic config not exist, {}", topic);continue;}if (isOrder && !topicConfig.isOrder()) {continue;}{// 按消費組和topic查詢訂閱信息=》SubscriptionData findSubscriptionData = this.brokerController.getConsumerManager().findSubscriptionData(group, topic);if (null == findSubscriptionData&& this.brokerController.getConsumerManager().findSubscriptionDataCount(group) > 0) {log.warn("consumeStats, the consumer group[{}], topic[{}] not exist", group, topic);continue;}}// topic寫隊列的數量默認16for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) {MessageQueue mq = new MessageQueue();mq.setTopic(topic);mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());mq.setQueueId(i);OffsetWrapper offsetWrapper = new OffsetWrapper();// 獲取隊列最大的offset=》long brokerOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i);if (brokerOffset < 0)brokerOffset = 0;// 查詢消費者的offset=》long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group,topic,i);if (consumerOffset < 0)consumerOffset = 0;offsetWrapper.setBrokerOffset(brokerOffset);offsetWrapper.setConsumerOffset(consumerOffset);long timeOffset = consumerOffset - 1;if (timeOffset >= 0) {// 按topic、queueId、timeOffset查詢最後的時間 =》long lastTimestamp = this.brokerController.getMessageStore().getMessageStoreTimeStamp(topic, i, timeOffset);if (lastTimestamp > 0) {offsetWrapper.setLastTimestamp(lastTimestamp);}}consumeStats.getOffsetTable().put(mq, offsetWrapper);}// 按消費組和topic獲取tps=》double consumeTps = this.brokerController.getBrokerStatsManager().tpsGroupGetNums(group, topic);consumeTps += consumeStats.getConsumeTps();consumeStats.setConsumeTps(consumeTps);totalDiff += consumeStats.computeTotalDiff();consumeStatsList.add(consumeStats);}subscripTopicConsumeMap.put(group, consumeStatsList);brokerConsumeStatsList.add(subscripTopicConsumeMap);}ConsumeStatsList consumeStats = new ConsumeStatsList();consumeStats.setBrokerAddr(brokerController.getBrokerAddr());consumeStats.setConsumeStatsList(brokerConsumeStatsList);consumeStats.setTotalDiff(totalDiff);response.setBody(consumeStats.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
進入這個方法,消費組訂閱了哪些topic,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#whichTopicByConsumerapp
public Set<String> whichTopicByConsumer(final String group) {Set<String> topics = new HashSet<String>();// 遍歷消費者組、topic緩存信息Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2) {if (group.equals(arrays[1])) {topics.add(arrays[0]);}}}return topics;}
進入這個方法,查詢topic的配置信息,org.apache.rocketmq.broker.topic.TopicConfigManager#selectTopicConfigide
public TopicConfig selectTopicConfig(final String topic) {// 從topic配置緩存信息中查詢當前topic的配置return this.topicConfigTable.get(topic);}
進入這個方法,按消費組和topic查詢訂閱信息,org.apache.rocketmq.broker.client.ConsumerManager#findSubscriptionDatafetch
public SubscriptionData findSubscriptionData(final String group, final String topic) {// 按組從緩存中獲取消費組信息ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);if (consumerGroupInfo != null) {// 按topic從緩存中獲取訂閱數據return consumerGroupInfo.findSubscriptionData(topic);}return null;}
進入這個方法,獲取隊列最大的offset,org.apache.rocketmq.store.DefaultMessageStore#getMaxOffsetInQueueui
public long getMaxOffsetInQueue(String topic, int queueId) {// 根據topic和queueId找到消費者隊列=》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {// 獲取最大的offset =》long offset = logic.getMaxOffsetInQueue();return offset;}// 若是不存在指定topic和queueId的消費隊列直接返回0return 0;}
進入這個方法,根據topic和queueId找到消費者隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueuethis
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}
往上返回到這個方法, 獲取最大的offset,org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueuecode
public long getMaxOffsetInQueue() {// =》return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE;}
進入這個方法,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset
public long getMaxOffset() {// 獲取存儲映射文件隊列中索引位置最大的映射文件=》MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {// 映射文件的起始offset+映射文件的可讀取的索引位置return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}// 若是隊列中沒有存儲映射文件直接返回0return 0;}
進入這個方法,獲取存儲映射文件隊列中索引位置最大的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()
public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast;}
往上返回到這個方法,查詢消費者的offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String, int)
public long queryOffset(final String group, final String topic, final int queueId) {// topic@group 從本地offset緩存中查詢String key = topic + TOPIC_GROUP_SEPARATOR + group;ConcurrentMap<Integer, Long> map = this.offsetTable.get(key);if (null != map) {Long offset = map.get(queueId);if (offset != null)return offset;}return -1;}
進入這個方法,按topic、queueId、timeOffset查詢最後的時間,org.apache.rocketmq.store.DefaultMessageStore#getMessageStoreTimeStamp
@Overridepublic long getMessageStoreTimeStamp(String topic, int queueId, long consumeQueueOffset) {// 按topic和queueId查詢到消費隊列=》ConsumeQueue logicQueue = this.findConsumeQueue(topic, queueId);if (logicQueue != null) {// 按消費者的offset查詢存儲時間所在的buffer=》SelectMappedBufferResult result = logicQueue.getIndexBuffer(consumeQueueOffset);// =》return getStoreTime(result);}return -1;}
進入這個方法,按topic和queueId查詢到消費隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}
往上返回到這個方法,按消費者的offset查詢存儲時間所在的buffer,org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {int mappedFileSize = this.mappedFileSize;// 獲取最小的物理offsetlong offset = startIndex * CQ_STORE_UNIT_SIZE;if (offset >= this.getMinLogicOffset()) {// 根據offset查詢映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);if (mappedFile != null) {SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));return result;}}return null;}
進入這個方法,根據offset查詢映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)
public MappedFile findMappedFileByOffset(final long offset) {// =》return findMappedFileByOffset(offset, false);}
進入這個方法,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 獲取隊列中第一個映射文件MappedFile firstMappedFile = this.getFirstMappedFile();// 獲取隊列中最後一個映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {// 若是offset不在索引文件的offset範圍內if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 找到映射文件在隊列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {// 獲取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// offset在目標文件的起始offset和結束offset範圍內if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}// 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}// 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null; }
往上返回到這個方法,按消費組和topic獲取tps,org.apache.rocketmq.store.stats.BrokerStatsManager#tpsGroupGetNums
public double tpsGroupGetNums(final String group, final String topic) {final String statsKey = buildStatsKey(topic, group);return this.statsTable.get(GROUP_GET_NUMS).getStatsDataInMinute(statsKey).getTps(); }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#fetchAllConsumeStatsInBroker結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣