rocketmq源碼解析之管理請求查詢topic被哪些消費者消費

說在前面apache

管理請求 QUERY_TOPIC_CONSUME_BY_WHO 查詢topic被哪些消費者消費緩存

 

源碼解析微信

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryTopicConsumeByWho查詢topic被哪些消費者消費this

private RemotingCommand queryTopicConsumeByWho(ChannelHandlerContext ctx,
        RemotingCommand request) throws RemotingCommandException {
        final RemotingCommand response = RemotingCommand.createResponseCommand(null);
        QueryTopicConsumeByWhoRequestHeader requestHeader =
            (QueryTopicConsumeByWhoRequestHeader) request.decodeCommandCustomHeader(QueryTopicConsumeByWhoRequestHeader.class);
//      根據topic查找消費者的消費組信息=》
        HashSet<String> groups = this.brokerController.getConsumerManager().queryTopicConsumeByWho(requestHeader.getTopic());
        Set<String> groupInOffset = this.brokerController.getConsumerOffsetManager().whichGroupByTopic(requestHeader.getTopic());
        if (groupInOffset != null && !groupInOffset.isEmpty()) {
            groups.addAll(groupInOffset);
        }

        GroupList groupList = new GroupList();
        groupList.setGroupList(groups);
        byte[] body = groupList.encode();
        response.setBody(body);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    }

進入這個方法org.apache.rocketmq.broker.client.ConsumerManager#queryTopicConsumeByWho根據topic從消費組信息中查詢消費組code

public HashSet<String> queryTopicConsumeByWho(final String topic) {
        HashSet<String> groups = new HashSet<>();
//        遍歷換粗中消費組信息
        Iterator<Entry<String, ConsumerGroupInfo>> it = this.consumerTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConsumerGroupInfo> entry = it.next();
//          獲取組中的緩存訂閱信息
            ConcurrentMap<String, SubscriptionData> subscriptionTable =
                entry.getValue().getSubscriptionTable();
            if (subscriptionTable.containsKey(topic)) {
                groups.add(entry.getKey());
            }
        }
        return groups;
    }

往上返回到這個方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#whichGroupByTopic按topic從offset信息中查詢消費組blog

public Set<String> whichGroupByTopic(final String topic) {
        Set<String> groups = new HashSet<String>();
//        遍歷topic和組的offset信息
        Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();
        while (it.hasNext()) {
            Entry<String, ConcurrentMap<Integer, Long>> next = it.next();
            String topicAtGroup = next.getKey();
            String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);
            if (arrays.length == 2) {
                if (topic.equals(arrays[0])) {
                    groups.add(arrays[1]);
                }
            }
        }

        return groups;
    }

往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryTopicConsumeByWho結束ip

 

說在最後get

本次解析僅表明我的觀點,僅供參考。源碼

 

加入技術微信羣it

釘釘技術羣

相關文章
相關標籤/搜索