說在前面apache
本次繼續解析管理請求,GET_MAX_OFFSET 獲取最大的offset安全
源碼解析微信
進入到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMaxOffset獲取最大的offset併發
private RemotingCommand getMaxOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(GetMaxOffsetResponseHeader.class); final GetMaxOffsetResponseHeader responseHeader = (GetMaxOffsetResponseHeader) response.readCustomHeader(); final GetMaxOffsetRequestHeader requestHeader = (GetMaxOffsetRequestHeader) request.decodeCommandCustomHeader(GetMaxOffsetRequestHeader.class); // 根據topic和queueId獲取最大的offset =》 long offset = this.brokerController.getMessageStore().getMaxOffsetInQueue(requestHeader.getTopic(), requestHeader.getQueueId()); responseHeader.setOffset(offset); response.setCode(ResponseCode.SUCCESS); response.setRemark(null); return response; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#getMaxOffsetInQueue 根據topic和queueId獲取最大的offsetapp
public long getMaxOffsetInQueue(String topic, int queueId) { // 根據topic和queueId找到消費者隊列=》 ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { // 獲取最大的offset =》 long offset = logic.getMaxOffsetInQueue(); return offset; } // 若是不存在指定topic和queueId的消費隊列直接返回0 return 0; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 根據topic和queueId查詢消費隊列this
public ConsumeQueue findConsumeQueue(String topic, int queueId) { ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } // 按queue id查找消費者隊列 ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, queueId, // 消費者隊列存儲地址 user.home/store/consumequeue StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // 每一個文件存儲默認30W this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }
往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue 查詢最大的offset線程
public long getMaxOffsetInQueue() { // =》 return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#getMaxOffsetcode
public long getMaxOffset() { // 獲取存儲映射文件隊列中索引位置最大的映射文件 MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { // 映射文件的起始offset+映射文件的可讀取的索引位置 return mappedFile.getFileFromOffset() + mappedFile.getReadPosition(); } // 若是隊列中沒有存儲映射文件直接返回0 return 0; }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile() 獲取映射文件集合中最後一個映射文件blog
public MappedFile getLastMappedFile() { MappedFile mappedFileLast = null; while (!this.mappedFiles.isEmpty()) { try { mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1); break; } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error("getLastMappedFile has exception.", e); break; } } return mappedFileLast; }
能夠看到映射文件集合是用CopyOnWriteArrayList實現索引
// 併發線程安全隊列存儲映射文件 private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#getMaxOffset結束
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣