說在前面apache
管理請求 GET_ALL_CONSUMER_OFFSET 獲取全部消費者的offsetjson
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getAllConsumerOffset獲取全部消費者的offsetthis
private RemotingCommand getAllConsumerOffset(ChannelHandlerContext ctx, RemotingCommand request) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); // 消費者offset json編碼 String content = this.brokerController.getConsumerOffsetManager().encode(); if (content != null && content.length() > 0) { try { response.setBody(content.getBytes(MixAll.DEFAULT_CHARSET)); } catch (UnsupportedEncodingException e) { log.error("get all consumer offset from master error.", e); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("UnsupportedEncodingException " + e); return response; } } else { log.error("No consumer offset in this broker, client: {} ", ctx.channel().remoteAddress()); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("No consumer offset in this broker"); return response; } response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#encode(boolean)編碼
public String encode(final boolean prettyFormat) { return RemotingSerializable.toJson(this, prettyFormat); }
消費者offset內存存儲位置3d
// 消費者offset private ConcurrentMap<String/* topic@group */, ConcurrentMap<Integer/*queueId*/, Long/*offset*/>> offsetTable = new ConcurrentHashMap<String, ConcurrentMap<Integer, Long>>(512);
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getAllConsumerOffset結束code
說在最後orm
本次解析僅表明我的觀點,僅供參考。blog
加入技術微信羣內存
釘釘技術羣