說在前面java
管理請求 QUERY_CORRECTION_OFFSET 查詢修改後的offsetapache
源碼解析微信
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryCorrectionOffsetthis
private RemotingCommand queryCorrectionOffset(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);QueryCorrectionOffsetHeader requestHeader =(QueryCorrectionOffsetHeader) request.decodeCommandCustomHeader(QueryCorrectionOffsetHeader.class);// 在全部的消費組中查詢最小的offset=》Map<Integer, Long> correctionOffset = this.brokerController.getConsumerOffsetManager().queryMinOffsetInAllGroup(requestHeader.getTopic(), requestHeader.getFilterGroups());// =》按topic和消費組查找offset=》Map<Integer, Long> compareOffset =this.brokerController.getConsumerOffsetManager().queryOffset(requestHeader.getTopic(), requestHeader.getCompareGroup());if (compareOffset != null && !compareOffset.isEmpty()) {for (Map.Entry<Integer, Long> entry : compareOffset.entrySet()) {Integer queueId = entry.getKey();correctionOffset.put(queueId,correctionOffset.get(queueId) > entry.getValue() ? Long.MAX_VALUE : correctionOffset.get(queueId));}}QueryCorrectionOffsetBody body = new QueryCorrectionOffsetBody();body.setCorrectionOffsets(correctionOffset);response.setBody(body.encode());response.setCode(ResponseCode.SUCCESS);response.setRemark(null);return response;}
進入這個方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryMinOffsetInAllGroup查詢全部消費組中最小的offset3d
public Map<Integer, Long> queryMinOffsetInAllGroup(final String topic, final String filterGroups) {Map<Integer, Long> queueMinOffset = new HashMap<Integer, Long>();Set<String> topicGroups = this.offsetTable.keySet();if (!UtilAll.isBlank(filterGroups)) {for (String group : filterGroups.split(",")) {Iterator<String> it = topicGroups.iterator();while (it.hasNext()) {if (group.equals(it.next().split(TOPIC_GROUP_SEPARATOR)[1])) {// 若是這個組是過濾的消費組從集合中刪除it.remove();}}}}for (Map.Entry<String, ConcurrentMap<Integer, Long>> offSetEntry : this.offsetTable.entrySet()) {String topicGroup = offSetEntry.getKey();String[] topicGroupArr = topicGroup.split(TOPIC_GROUP_SEPARATOR);if (topic.equals(topicGroupArr[0])) {for (Entry<Integer, Long> entry : offSetEntry.getValue().entrySet()) {// 查詢隊列最小的offset=》long minOffset = this.brokerController.getMessageStore().getMinOffsetInQueue(topic, entry.getKey());if (entry.getValue() >= minOffset) {Long offset = queueMinOffset.get(entry.getKey());if (offset == null) {queueMinOffset.put(entry.getKey(), Math.min(Long.MAX_VALUE, entry.getValue()));} else {queueMinOffset.put(entry.getKey(), Math.min(entry.getValue(), offset));}}}}}return queueMinOffset;}
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue,查詢消費隊列中最小的offsetcode
public long getMinOffsetInQueue(String topic, int queueId) {// 根據topic和queueId查詢消費者隊列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {// 獲取隊列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1;}
進入這個方法,org.apache.rocketmq.store.DefaultMessageStore#getMinOffsetInQueue 根據topic和queueId查詢消費者隊列blog
public long getMinOffsetInQueue(String topic, int queueId) {// 根據topic和queueId查詢消費者隊列 =》ConsumeQueue logic = this.findConsumeQueue(topic, queueId);if (logic != null) {// 獲取隊列中的最小offsetreturn logic.getMinOffsetInQueue();}return -1;}
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue隊列
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic;}
往上返回到這個方法org.apache.rocketmq.broker.offset.ConsumerOffsetManager#queryOffset(java.lang.String, java.lang.String) 按topic和消費組查找offsetrem
public Map<Integer, Long> queryOffset(final String group, final String topic) {// topic@groupString key = topic + TOPIC_GROUP_SEPARATOR + group;return this.offsetTable.get(key);}
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#queryCorrectionOffset結束get
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣