rocketmq源碼解析之管理請求clone消費組的offset

說在前面apache

管理請求之clone消費組的offset緩存

 

源碼解析微信

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cloneGroupOffsetthis






private RemotingCommand cloneGroupOffset(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);CloneGroupOffsetRequestHeader requestHeader =(CloneGroupOffsetRequestHeader) request.decodeCommandCustomHeader(CloneGroupOffsetRequestHeader.class);Set<String> topics;if (UtilAll.isBlank(requestHeader.getTopic())) {//            查詢那些topic被這個消費組在消費=》topics = this.brokerController.getConsumerOffsetManager().whichTopicByConsumer(requestHeader.getSrcGroup());} else {topics = new HashSet<String>();topics.add(requestHeader.getTopic());}for (String topic : topics) {//            按topic查詢到topic的配置信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic);if (null == topicConfig) {log.warn("[cloneGroupOffset], topic config not exist, {}", topic);continue;}if (!requestHeader.isOffline()) {//                查詢topic、消費組的訂閱信息SubscriptionData findSubscriptionData =this.brokerController.getConsumerManager().findSubscriptionData(requestHeader.getSrcGroup(), topic);if (this.brokerController.getConsumerManager().findSubscriptionDataCount(requestHeader.getSrcGroup()) > 0&& findSubscriptionData == null) {log.warn("[cloneGroupOffset], the consumer group[{}], topic[{}] not exist", requestHeader.getSrcGroup(), topic);continue;}}//            clone offset=》this.brokerController.getConsumerOffsetManager().cloneOffset(requestHeader.getSrcGroup(), requestHeader.getDestGroup(),requestHeader.getTopic());}response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}

 

進入這個方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#whichTopicByConsumer,查詢那些topic被這個消費組在消費code

 


public Set<String> whichTopicByConsumer(final String group) {Set<String> topics = new HashSet<String>();//        遍歷消費者組、topic緩存信息Iterator<Entry<String, ConcurrentMap<Integer, Long>>> it = this.offsetTable.entrySet().iterator();while (it.hasNext()) {Entry<String, ConcurrentMap<Integer, Long>> next = it.next();String topicAtGroup = next.getKey();String[] arrays = topicAtGroup.split(TOPIC_GROUP_SEPARATOR);if (arrays.length == 2) {if (group.equals(arrays[1])) {topics.add(arrays[0]);}}}return topics;}

 

進入這個方法org.apache.rocketmq.broker.topic.TopicConfigManager#selectTopicConfig 按topic查詢到topic的配置信息blog

public TopicConfig selectTopicConfig(final String topic) {//        從topic配置緩存信息中查詢當前topic的配置return this.topicConfigTable.get(topic);}

 

進入這個方法,查詢topic、消費組的訂閱信息,org.apache.rocketmq.broker.client.ConsumerManager#findSubscriptionDataip

 

public SubscriptionData findSubscriptionData(final String group, final String topic) {//        按組從緩存中獲取消費組信息ConsumerGroupInfo consumerGroupInfo = this.getConsumerGroupInfo(group);if (consumerGroupInfo != null) {//            按topic從緩存中獲取訂閱數據return consumerGroupInfo.findSubscriptionData(topic);}return null;}

 

進入這個方法,clone offset,org.apache.rocketmq.broker.offset.ConsumerOffsetManager#cloneOffsetget

 

public void cloneOffset(final String srcGroup, final String destGroup, final String topic) {ConcurrentMap<Integer, Long> offsets = this.offsetTable.get(topic + TOPIC_GROUP_SEPARATOR + srcGroup);if (offsets != null) {this.offsetTable.put(topic + TOPIC_GROUP_SEPARATOR + destGroup, new ConcurrentHashMap<Integer, Long>(offsets));}}

 

往上返回到這個方法,org.apache.rocketmq.broker.processor.AdminBrokerProcessor#cloneGroupOffset結束。源碼

 

說在最後it

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索