說在前面apache
請求處理 獲取消費者運行信息微信
源碼解析app
進入這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumerRunningInfoide
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final GetConsumerRunningInfoRequestHeader requestHeader =(GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class);// 獲取消費者的運行信息=》ConsumerRunningInfo consumerRunningInfo = this.mqClientFactory.consumerRunningInfo(requestHeader.getConsumerGroup());if (null != consumerRunningInfo) {if (requestHeader.isJstackEnable()) {Map<Thread, StackTraceElement[]> map = Thread.getAllStackTraces();// 獲取全部的線程堆棧信息String jstack = UtilAll.jstack(map);consumerRunningInfo.setJstack(jstack);}response.setCode(ResponseCode.SUCCESS);response.setBody(consumerRunningInfo.encode());} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer Group <%s> not exist in this consumer", requestHeader.getConsumerGroup()));}return response; }
進入這個方法,獲取消費者的運行信息,org.apache.rocketmq.client.impl.factory.MQClientInstance#consumerRunningInfoui
public ConsumerRunningInfo consumerRunningInfo(final String consumerGroup) {// 獲取消費組的消費者MQConsumerInner mqConsumerInner = this.consumerTable.get(consumerGroup);// 獲取消費者的消費者運行信息=》ConsumerRunningInfo consumerRunningInfo = mqConsumerInner.consumerRunningInfo();// 獲取namesrv地址列表List<String> nsList = this.mQClientAPIImpl.getRemotingClient().getNameServerAddressList();StringBuilder strBuilder = new StringBuilder();if (nsList != null) {for (String addr : nsList) {strBuilder.append(addr).append(";");}}String nsAddr = strBuilder.toString();consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_NAMESERVER_ADDR, nsAddr);consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CONSUME_TYPE, mqConsumerInner.consumeType().name());consumerRunningInfo.getProperties().put(ConsumerRunningInfo.PROP_CLIENT_VERSION,MQVersion.getVersionDesc(MQVersion.CURRENT_VERSION));return consumerRunningInfo; }
進入這個方法,獲取消費者的消費者運行信息,mqPullConsumer,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#consumerRunningInfothis
@Overridepublic ConsumerRunningInfo consumerRunningInfo() {// 消費者運行信息ConsumerRunningInfo info = new ConsumerRunningInfo();// mqPullConsumer信息Properties prop = MixAll.object2Properties(this.defaultMQPullConsumer);// 消費開始時間prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));info.setProperties(prop);// 訂閱數據=》info.getSubscriptionSet().addAll(this.subscriptions());return info; }
進入這個方法,訂閱數據,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#subscriptions線程
@Overridepublic Set<SubscriptionData> subscriptions() {Set<SubscriptionData> result = new HashSet<SubscriptionData>();// 獲取註冊的topicsSet<String> topics = this.defaultMQPullConsumer.getRegisterTopics();if (topics != null) {synchronized (topics) {for (String t : topics) {SubscriptionData ms = null;try {// 按消費組、topic構建訂閱信息=》ms = FilterAPI.buildSubscriptionData(this.groupName(), t, SubscriptionData.SUB_ALL);} catch (Exception e) {log.error("parse subscription error", e);}ms.setSubVersion(0L);result.add(ms);}}}return result; }
進入這個方法,按消費組、topic構建訂閱信息,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionDatacode
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);// 訂閱全部的topicif (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {// 解析tags,多個tag用|分開String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {// 解析訂閱信息中的tagsubscriptionData.getTagsSet().add(trimString);subscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData; }這個方法,獲取消費者的消費者運行信息,mqPullConsumer,org.apache.rocketmq.client.impl.consumer.DefaultMQPullConsumerImpl#consumerRunningInfo
往上返回到這個方法,獲取消費者的消費者運行信息,mqPushConsumer,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#consumerRunningInfoorm
@Overridepublic ConsumerRunningInfo consumerRunningInfo() {// 消費者運行信息ConsumerRunningInfo info = new ConsumerRunningInfo();// MQPushConsumer信息Properties prop = MixAll.object2Properties(this.defaultMQPushConsumer);// 消費順序prop.put(ConsumerRunningInfo.PROP_CONSUME_ORDERLY, String.valueOf(this.consumeOrderly));// 消費線程池線程數prop.put(ConsumerRunningInfo.PROP_THREADPOOL_CORE_SIZE, String.valueOf(this.consumeMessageService.getCorePoolSize()));// 消費開始時間prop.put(ConsumerRunningInfo.PROP_CONSUMER_START_TIMESTAMP, String.valueOf(this.consumerStartTimestamp));info.setProperties(prop);// 訂閱信息=》Set<SubscriptionData> subSet = this.subscriptions();info.getSubscriptionSet().addAll(subSet);Iterator<Entry<MessageQueue, ProcessQueue>> it = this.rebalanceImpl.getProcessQueueTable().entrySet().iterator();while (it.hasNext()) {Entry<MessageQueue, ProcessQueue> next = it.next();MessageQueue mq = next.getKey();ProcessQueue pq = next.getValue();// 消息處理隊列ProcessQueueInfo pqinfo = new ProcessQueueInfo();pqinfo.setCommitOffset(this.offsetStore.readOffset(mq, ReadOffsetType.MEMORY_FIRST_THEN_STORE));pq.fillProcessQueueInfo(pqinfo);info.getMqTable().put(mq, pqinfo);}for (SubscriptionData sd : subSet) {// 消費狀態ConsumeStatus consumeStatus = this.mQClientFactory.getConsumerStatsManager().consumeStatus(this.groupName(), sd.getTopic());info.getStatusTable().put(sd.getTopic(), consumeStatus);}return info; }
進入這個方法,訂閱信息,org.apache.rocketmq.client.impl.consumer.DefaultMQPushConsumerImpl#subscriptionsblog
@Overridepublic Set<SubscriptionData> subscriptions() {Set<SubscriptionData> subSet = new HashSet<SubscriptionData>();// 獲取topic訂閱數據subSet.addAll(this.rebalanceImpl.getSubscriptionInner().values());return subSet; }
往上返回到這個方法,org.apache.rocketmq.client.impl.ClientRemotingProcessor#getConsumerRunningInfo結束。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣