說在前面apache
管理請求 QUERY_TOPIC_CONSUME_BY_WHO 查詢topic被哪些消費者消費緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryTopicConsumeByWho查詢topic被哪些消費者消費this
private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); QueryTopicConsumeByWhoRequestHeader requestHeader = (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class); // 根據topic查找消費者的消費組信息=》 HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic()); Set<String> groupInOffset = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic()); if (groupInOffset != null && !groupInOffset.isEmpty()) { groups.addAll(groupInOffset); } GroupList groupList = new GroupList(); groupList.setGroupList(groups); byte[] body = groupList.encode(); response.setBody(body); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.broker.client.ConsumerManager#queryTopicConsumeByWho根據topic從消費組信息中查詢消費組code
public HashSet<String> queryTopicConsumeByWho(final String topic) { HashSet<String> groups = new HashSet<>(); // 遍歷換粗中消費組信息 Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConsumerGroupInfo> entry = it.next(); // 獲取組中的緩存訂閱信息 ConcurrentMap<String, SubscriptionData> subscriptionTable = entry.getValue().getSubscriptionTable(); if (subscriptionTable.containsKey(topic)) { groups.add(entry.getKey()); } } return groups; }
往上返回到這個方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#whichGroupByTopic按topic從offset信息中查詢消費組blog
public Set<String> whichGroupByTopic(final String topic) { Set<String> groups = new HashSet<String>(); // 遍歷topic和組的offset信息 Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator(); while (it.hasNext()) { Entry<String, ConcurrentMap<Integer, Long>> next = it.next(); String topicAtGroup = next.getKey(); String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR); if (arrays.length == 2) { if (topic.equals(arrays[0])) { groups.add(arrays[1]); } } } return groups; }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryTopicConsumeByWho結束ip
說在最後get
本次解析僅表明我的觀點,僅供參考。源碼
加入技術微信羣it
釘釘技術羣