rocketmq源碼解析請求處理獲取消費者運行信息

說在前面apache

請求處理 獲取消費者運行信息微信

 

源碼解析app

進入這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumerRunningInfoide



private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetConsumerRunningInfoRequestHeader requestHeader =(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);//        獲取消費者的運行信息=》ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());if (null != consumerRunningInfo) {if (requestHeader.isJstackEnable()) {Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();//                獲取全部的線程堆棧信息String jstack = UtilAll.jstack(map);consumerRunningInfo.setJstack(jstack);}response.setCode(ResponseCode.SUCCESS);response.setBody(consumerRunningInfo.encode());} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));}return response;    }

進入這個方法,獲取消費者的運行信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#consumerRunningInfoui





public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {//        獲取消費組的消費者MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);//        獲取消費者的消費者運行信息=》ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();//        獲取namesrv地址列表List<String> nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList();StringBuilder strBuilder = new StringBuilder();if (nsList != null) {for (String addr : nsList) {strBuilder.append(addr).append(";");}}String nsAddr = strBuilder.toString();consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr);consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name());consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION,MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));return consumerRunningInfo;    }

進入這個方法,獲取消費者的消費者運行信息,mqPullConsumer,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#consumerRunningInfothis


@Overridepublic ConsumerRunningInfo consumerRunningInfo() {//        消費者運行信息ConsumerRunningInfo info = new ConsumerRunningInfo();//        mqPullConsumer信息Properties prop = MixAll.object2Properties(this.defaultMQPullConsumer);//        消費開始時間prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));info.setProperties(prop);//        訂閱數據=》info.getSubscriptionSet().addAll(this.subscriptions());return info;    }

進入這個方法,訂閱數據,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#subscriptions線程


@Overridepublic Set<SubscriptionData> subscriptions() {Set<SubscriptionData> result = new HashSet<SubscriptionData>();//        獲取註冊的topicsSet<String> topics = this.defaultMQPullConsumer.getRegisterTopics();if (topics != null) {synchronized (topics) {for (String t : topics) {SubscriptionData ms = null;try {//                        按消費組、topic構建訂閱信息=》ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);} catch (Exception e) {log.error("parse subscription error", e);}ms.setSubVersion(0L);result.add(ms);}}}return result;    }

進入這個方法,按消費組、topic構建訂閱信息,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionDatacode


public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);//        訂閱全部的topicif (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {//            解析tags,多個tag用|分開String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {//                            解析訂閱信息中的tagsubscriptionData.getTagsSet().add(trimString);subscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData;    }這個方法,獲取消費者的消費者運行信息,mqPullConsumer,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#consumerRunningInfo

往上返回到這個方法,獲取消費者的消費者運行信息,mqPushConsumer,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#consumerRunningInfoorm








@Overridepublic ConsumerRunningInfo consumerRunningInfo() {//        消費者運行信息ConsumerRunningInfo info = new ConsumerRunningInfo();//        MQPushConsumer信息Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);//        消費順序prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));//        消費線程池線程數prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize()));//        消費開始時間prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));info.setProperties(prop);//        訂閱信息=》Set<SubscriptionData> subSet = this.subscriptions();info.getSubscriptionSet().addAll(subSet);Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();//            消息處理隊列ProcessQueueInfo pqinfo = new ProcessQueueInfo();pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));pq.fillProcessQueueInfo(pqinfo);info.getMqTable().put(mq, pqinfo);}for (SubscriptionData sd : subSet) {//            消費狀態ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());info.getStatusTable().put(sd.getTopic(), consumeStatus);}return info;    }

進入這個方法,訂閱信息,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscriptionsblog


@Overridepublic Set<SubscriptionData> subscriptions() {Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();//        獲取topic訂閱數據subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());return subSet;    }

往上返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumerRunningInfo結束。

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索