rocketmq源碼解析消費者管理處理器①

說在前面apache

消費者管理處理器,按消費組獲取全部的消費者微信

 

源碼解析this

進入這個方法,按消費組獲取全部的消費者,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#getConsumerListByGroupcode


public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);final GetConsumerListByGroupRequestHeader requestHeader =(GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);//        獲取消費組信息ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (consumerGroupInfo != null) {//            獲取全部的client=》List<String> clientIds = consumerGroupInfo.getAllClientId();if (!clientIds.isEmpty()) {GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();body.setConsumerIdList(clientIds);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;} else {log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}} else {log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());return response;    }

進入這個方法,獲取全部的client,org.apache.rocketmq.broker.client.ConsumerGroupInfo#getAllClientIdblog



//public List<String> getAllClientId() {List<String> result = new ArrayList<>();Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();while (it.hasNext()) {Entry<Channel, ClientChannelInfo> entry = it.next();ClientChannelInfo clientChannelInfo = entry.getValue();result.add(clientChannelInfo.getClientId());}return result;    }

往上返回到這個方法,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#getConsumerListByGroup結束。get

 

說在最後源碼

本次解析僅表明我的觀點,僅供參考。it

 

加入技術微信羣io

釘釘技術羣class

相關文章
相關標籤/搜索