rocketmq源碼解析請求處理從client獲取消費者狀態

說在前面apache

請求處理 從client獲取消費者狀態微信

 

源碼解析ide

進入這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumeStatusthis

public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {MQConsumerInner impl = this.consumerTable.get(group);if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;return consumer.getOffsetStore().cloneOffsetTable(topic);} else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;return consumer.getOffsetStore().cloneOffsetTable(topic);} else {return Collections.EMPTY_MAP;}    }

進入這個方法, 獲取topic的消費隊列offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#cloneOffsetTablecode

@Overridepublic Map<MessageQueue, Long> cloneOffsetTable(String topic) {Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {continue;}cloneOffsetTable.put(mq, entry.getValue().get());}return cloneOffsetTable;    }

返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumeStatus結束。blog

 

說在最後隊列

本次解析僅表明我的觀點,僅供參考。get

 

加入技術微信羣源碼

釘釘技術羣io

相關文章
相關標籤/搜索