說在前面apache
管理請求 INVOKE_BROKER_TO_GET_CONSUMER_STATUS 獲取消費者狀態緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getConsumerStatus獲取消費者狀態ide
public RemotingCommand getConsumerStatus(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final GetConsumerStatusRequestHeader requestHeader = (GetConsumerStatusRequestHeader) request.decodeCommandCustomHeader(GetConsumerStatusRequestHeader.class); log.info("[get-consumer-status] get consumer status by {}. topic={}, group={}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup()); // =》 return this.brokerController.getBroker2Client().getConsumeStatus(requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getClientAddr()); }
進入這個方法org.apache.rocketmq.broker.client.net.Broker2Client#getConsumeStatusthis
public RemotingCommand getConsumeStatus(String topic, String group, String originClientId) { final RemotingCommand result = RemotingCommand.createResponseCommand(null); GetConsumerStatusRequestHeader requestHeader = new GetConsumerStatusRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_CONSUMER_STATUS_FROM_CLIENT, requestHeader); Map<String, Map<MessageQueue, Long>> consumerStatusTable = new HashMap<String, Map<MessageQueue, Long>>(); // 獲取消費組的渠道緩存信息 ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = this.brokerController.getConsumerManager().getConsumerGroupInfo(group).getChannelInfoTable(); if (null == channelInfoTable || channelInfoTable.isEmpty()) { result.setCode(ResponseCode.SYSTEM_ERROR); result.setRemark(String.format("No Any Consumer online in the consumer group: [%s]", group)); return result; } for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { int version = entry.getValue().getVersion(); String clientId = entry.getValue().getClientId(); if (version < MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { result.setCode(ResponseCode.SYSTEM_ERROR); result.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc(version)); log.warn("[get-consumer-status] the client does not support this feature. version={}", RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); return result; } else if (UtilAll.isBlank(originClientId) || originClientId.equals(clientId)) { try { // countDownLatch實現的同步執行 =》 RemotingCommand response = this.brokerController.getRemotingServer().invokeSync(entry.getKey(), request, 5000); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { if (response.getBody() != null) { GetConsumerStatusBody body = GetConsumerStatusBody.decode(response.getBody(), GetConsumerStatusBody.class); consumerStatusTable.put(clientId, body.getMessageQueueTable()); log.info( "[get-consumer-status] get consumer status success. topic={}, group={}, channelRemoteAddr={}", topic, group, clientId); } } default: break; } } catch (Exception e) { log.error( "[get-consumer-status] get consumer status exception. topic={}, group={}, offset={}", new Object[] {topic, group}, e); } if (!UtilAll.isBlank(originClientId) && originClientId.equals(clientId)) { break; } } } result.setCode(ResponseCode.SUCCESS); GetConsumerStatusBody resBody = new GetConsumerStatusBody(); resBody.setConsumerTable(consumerStatusTable); result.setBody(resBody.encode()); return result; } }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl同步執行rpc請求.net
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 緩存正在進行的響應 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); // 響應解析完畢會解除countDownLatch的阻塞 =》 responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); // 這裏用countDownLatch實現的阻塞 =》 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
這裏是countDownLatch實現的同步netty
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getConsumerStatus結束code
說在最後orm
本次解析僅表明我的觀點,僅供參考。blog
加入技術微信羣
釘釘技術羣