說在前面apache
請求處理 從client獲取消費者狀態微信
源碼解析ide
進入這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumeStatusthis
public Map<MessageQueue, Long> getConsumerStatus(String topic, String group) {MQConsumerInner impl = this.consumerTable.get(group);if (impl != null && impl instanceof DefaultMQPushConsumerImpl) {DefaultMQPushConsumerImpl consumer = (DefaultMQPushConsumerImpl) impl;return consumer.getOffsetStore().cloneOffsetTable(topic);} else if (impl != null && impl instanceof DefaultMQPullConsumerImpl) {DefaultMQPullConsumerImpl consumer = (DefaultMQPullConsumerImpl) impl;return consumer.getOffsetStore().cloneOffsetTable(topic);} else {return Collections.EMPTY_MAP;} }
進入這個方法, 獲取topic的消費隊列offset,org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#cloneOffsetTablecode
@Overridepublic Map<MessageQueue, Long> cloneOffsetTable(String topic) {Map<MessageQueue, Long> cloneOffsetTable = new HashMap<MessageQueue, Long>();for (Map.Entry<MessageQueue, AtomicLong> entry : this.offsetTable.entrySet()) {MessageQueue mq = entry.getKey();if (!UtilAll.isBlank(topic) && !topic.equals(mq.getTopic())) {continue;}cloneOffsetTable.put(mq, entry.getValue().get());}return cloneOffsetTable; }
返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumeStatus結束。blog
說在最後隊列
本次解析僅表明我的觀點,僅供參考。get
加入技術微信羣源碼
釘釘技術羣io