說在前面apache
消費者管理處理器,按消費組獲取全部的消費者微信
源碼解析this
進入這個方法,按消費組獲取全部的消費者,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#getConsumerListByGroupcode
public RemotingCommand getConsumerListByGroup(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(GetConsumerListByGroupResponseHeader.class);final GetConsumerListByGroupRequestHeader requestHeader =(GetConsumerListByGroupRequestHeader) request.decodeCommandCustomHeader(GetConsumerListByGroupRequestHeader.class);// 獲取消費組信息ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (consumerGroupInfo != null) {// 獲取全部的client=》List<String> clientIds = consumerGroupInfo.getAllClientId();if (!clientIds.isEmpty()) {GetConsumerListByGroupResponseBody body = new GetConsumerListByGroupResponseBody();body.setConsumerIdList(clientIds);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;} else {log.warn("getAllClientId failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}} else {log.warn("getConsumerGroupInfo failed, {} {}", requestHeader.getConsumerGroup(),RemotingHelper.parseChannelRemoteAddr(ctx.channel()));}response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("no consumer for this group, " + requestHeader.getConsumerGroup());return response; }
進入這個方法,獲取全部的client,org.apache.rocketmq.broker.client.ConsumerGroupInfo#getAllClientIdblog
//public List<String> getAllClientId() {List<String> result = new ArrayList<>();Iterator<Entry<Channel, ClientChannelInfo>> it = this.channelInfoTable.entrySet().iterator();while (it.hasNext()) {Entry<Channel, ClientChannelInfo> entry = it.next();ClientChannelInfo clientChannelInfo = entry.getValue();result.add(clientChannelInfo.getClientId());}return result; }
往上返回到這個方法,org.apache.rocketmq.broker.processor.ConsumerManageProcessor#getConsumerListByGroup結束。get
說在最後源碼
本次解析僅表明我的觀點,僅供參考。it
加入技術微信羣io
釘釘技術羣class