說在前面apache
管理請求 GET_CONSUMER_RUNNING_INFO 查詢消費者運行時信息緩存
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getConsumerRunningInfoide
private RemotingCommand getConsumerRunningInfo(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final GetConsumerRunningInfoRequestHeader requestHeader = (GetConsumerRunningInfoRequestHeader) request.decodeCommandCustomHeader(GetConsumerRunningInfoRequestHeader.class); // =》 return this.callConsumer(RequestCode.GET_CONSUMER_RUNNING_INFO, request, requestHeader.getConsumerGroup(), requestHeader.getClientId()); }
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#callConsumerthis
private RemotingCommand callConsumer( final int requestCode, final RemotingCommand request, final String consumerGroup, final String clientId) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); // 按消費組和clentId查詢client的渠道信息 =》 ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId); if (null == clientChannelInfo) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId)); return response; } if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT", clientId, MQVersion.getVersionDesc(clientChannelInfo.getVersion()))); return response; } try { RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null); newRequest.setExtFields(request.getExtFields()); newRequest.setBody(request.getBody()); // 調用client=》 return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest); } catch (RemotingTimeoutException e) { response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT); response .setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); return response; } catch (Exception e) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark( String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e))); return response; } }
進入這個方法org.apache.rocketmq.broker.client.ConsumerManager#findChannel根據consumerGroup、clientId查詢client渠道信息.net
public ClientChannelInfo findChannel(final String group, final String clientId) { // 獲取組的消費組信息 ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group); if (consumerGroupInfo != null) { // 按clineId從消費組信息中獲取client渠道信息 return consumerGroupInfo.findChannel(clientId); } return null; }
進入這個方法org.apache.rocketmq.broker.client.net.Broker2Client#callClient rpc調用netty
public RemotingCommand callClient(final Channel channel, final RemotingCommand request ) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException { // =》 return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000); }
同步請求,進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeSyncImpl 以前介紹過了code
public RemotingCommand invokeSyncImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingSendRequestException, RemotingTimeoutException { final int opaque = request.getOpaque(); try { final ResponseFuture responseFuture = new ResponseFuture(channel, opaque, timeoutMillis, null, null); // 緩存正在進行的響應 this.responseTable.put(opaque, responseFuture); final SocketAddress addr = channel.remoteAddress(); channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { if (f.isSuccess()) { responseFuture.setSendRequestOK(true); return; } else { responseFuture.setSendRequestOK(false); } responseTable.remove(opaque); responseFuture.setCause(f.cause()); // 響應解析完畢會解除countDownLatch的阻塞 =》 responseFuture.putResponse(null); log.warn("send a request command to channel <" + addr + "> failed."); } }); // 這裏用countDownLatch實現的阻塞 =》 RemotingCommand responseCommand = responseFuture.waitResponse(timeoutMillis); if (null == responseCommand) { if (responseFuture.isSendRequestOK()) { throw new RemotingTimeoutException(RemotingHelper.parseSocketAddressAddr(addr), timeoutMillis, responseFuture.getCause()); } else { throw new RemotingSendRequestException(RemotingHelper.parseSocketAddressAddr(addr), responseFuture.getCause()); } } return responseCommand; } finally { this.responseTable.remove(opaque); } }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getConsumerRunningInfo結束orm
說在最後blog
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣