說在前面java
管理請求 INVOKE_BROKER_TO_RESET_OFFSET 重置broker offsetc++
源碼解析apache
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#resetOffset 重置broker offset緩存
public RemotingCommand resetOffset(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final ResetOffsetRequestHeader requestHeader = (ResetOffsetRequestHeader) request.decodeCommandCustomHeader(ResetOffsetRequestHeader.class); log.info("[reset-offset] reset offset started by {}. topic={}, group={}, timestamp={}, isForce={}", RemotingHelper.parseChannelRemoteAddr(ctx.channel()), requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getTimestamp(), requestHeader.isForce()); boolean isC = false; LanguageCode language = request.getLanguage(); switch (language) { case CPP: isC = true; break; } // =》 return this.brokerController.getBroker2Client().resetOffset(requestHeader.getTopic(), requestHeader.getGroup(), requestHeader.getTimestamp(), requestHeader.isForce(), isC); }
進入這個方法org.apache.rocketmq.broker.client.net.Broker2Client#resetOffset(java.lang.String, java.lang.String, long, boolean, boolean)重置offset微信
public RemotingCommand resetOffset(String topic, String group, long timeStamp, boolean isForce, boolean isC) { final RemotingCommand response = RemotingCommand.createResponseCommand(null); // 按topic從緩存中獲取topic配置信息=》 TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(topic); if (null == topicConfig) { log.error("[reset-offset] reset offset failed, no topic in this broker. topic={}", topic); response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("[reset-offset] reset offset failed, no topic in this broker. topic=" + topic); return response; } Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>(); for (int i = 0; i < topicConfig.getWriteQueueNums(); i++) { MessageQueue mq = new MessageQueue(); mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName()); mq.setTopic(topic); mq.setQueueId(i); // 查詢消費者offset=》 long consumerOffset = this.brokerController.getConsumerOffsetManager().queryOffset(group, topic, i); if (-1 == consumerOffset) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark(String.format("THe consumer group <%s> not exist", group)); return response; } long timeStampOffset; if (timeStamp == -1) { // 按topic和queueId查詢到最大offset=》 timeStampOffset = this.brokerController.getMessageStore().getMaxOffsetInQueue(topic, i); } else { // 按時間、topic、queueId查詢時間offset=》 timeStampOffset = this.brokerController.getMessageStore().getOffsetInQueueByTime(topic, i, timeStamp); } if (timeStampOffset < 0) { log.warn("reset offset is invalid. topic={}, queueId={}, timeStampOffset={}", topic, i, timeStampOffset); timeStampOffset = 0; } if (isForce || timeStampOffset < consumerOffset) { // 獲取比較小的offset offsetTable.put(mq, timeStampOffset); } else { offsetTable.put(mq, consumerOffset); } } ResetOffsetRequestHeader requestHeader = new ResetOffsetRequestHeader(); requestHeader.setTopic(topic); requestHeader.setGroup(group); requestHeader.setTimestamp(timeStamp); RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.RESET_CONSUMER_CLIENT_OFFSET, requestHeader); if (isC) { // c++ language ResetOffsetBodyForC body = new ResetOffsetBodyForC(); // 轉換offset集合=》 List<MessageQueueForC> offsetList = convertOffsetTable2OffsetList(offsetTable); body.setOffsetTable(offsetList); request.setBody(body.encode()); } else { // other language ResetOffsetBody body = new ResetOffsetBody(); body.setOffsetTable(offsetTable); request.setBody(body.encode()); } // 從緩存中獲取消費組信息 ConsumerGroupInfo consumerGroupInfo = this.brokerController.getConsumerManager().getConsumerGroupInfo(group); // 消費組緩存的client channel if (consumerGroupInfo != null && !consumerGroupInfo.getAllChannel().isEmpty()) { ConcurrentMap<Channel, ClientChannelInfo> channelInfoTable = consumerGroupInfo.getChannelInfoTable(); for (Map.Entry<Channel, ClientChannelInfo> entry : channelInfoTable.entrySet()) { int version = entry.getValue().getVersion(); if (version >= MQVersion.Version.V3_0_7_SNAPSHOT.ordinal()) { try { // 重置offset請求=》 this.brokerController.getRemotingServer().invokeOneway(entry.getKey(), request, 5000); log.info("[reset-offset] reset offset success. topic={}, group={}, clientId={}", topic, group, entry.getValue().getClientId()); } catch (Exception e) { log.error("[reset-offset] reset offset exception. topic={}, group={}", new Object[] {topic, group}, e); } } else { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("the client does not support this feature. version=" + MQVersion.getVersionDesc(version)); log.warn("[reset-offset] the client does not support this feature. version={}", RemotingHelper.parseChannelRemoteAddr(entry.getKey()), MQVersion.getVersionDesc(version)); return response; } } } else { String errorInfo = String.format("Consumer not online, so can not reset offset, Group: %s Topic: %s Timestamp: %d", requestHeader.getGroup(), requestHeader.getTopic(), requestHeader.getTimestamp()); log.error(errorInfo); response.setCode(ResponseCode.CONSUMER_NOT_ONLINE); response.setRemark(errorInfo); return response; } response.setCode(ResponseCode.SUCCESS); ResetOffsetBody resBody = new ResetOffsetBody(); resBody.setOffsetTable(offsetTable); response.setBody(resBody.encode()); return response; }
進入這個方法org.apache.rocketmq.broker.topic.TopicConfigManager#selectTopicConfig按topic獲取topic配置app
public TopicConfig selectTopicConfig(final String topic) { // 從topic配置緩存信息中查詢當前topic的配置 return this.topicConfigTable.get(topic); }
進入這個方法 按group, topic, queueId查詢offsetide
public long queryOffset(final String group, final String topic, final int queueId) { // topic@group 從本地offset緩存中查詢 String key = topic + TOPIC_GROUP_SEPARATOR + group; ConcurrentMap<Integer, Long> map = this.offsetTable.get(key); if (null != map) { Long offset = map.get(queueId); if (offset != null) return offset; } return -1; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#getMaxOffsetInQueue按topic、queueId查詢消費隊列最大offsetui
public long getMaxOffsetInQueue(String topic, int queueId) { // 根據topic和queueId找到消費者隊列=》 ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { // 獲取最大的offset =》 long offset = logic.getMaxOffsetInQueue(); return offset; } // 若是不存在指定topic和queueId的消費隊列直接返回0 return 0; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue 按topic、queueId查詢消費隊列this
public ConsumeQueue findConsumeQueue(String topic, int queueId) { // 找到topic的全部消息隊列 ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } // 按queue id查找消費者隊列 ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, queueId, // 消費者隊列存儲地址 user.home/store/consumequeue StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // 每一個文件存儲默認30W this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }
往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue 獲取消費隊列最大的offset.net
public long getMaxOffsetInQueue() { // =》 return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#getMaxOffset
public long getMaxOffset() { // 獲取存儲映射文件隊列中索引位置最大的映射文件=》 MappedFile mappedFile = getLastMappedFile(); if (mappedFile != null) { // 映射文件的起始offset+映射文件的可讀取的索引位置 return mappedFile.getFileFromOffset() + mappedFile.getReadPosition(); } // 若是隊列中沒有存儲映射文件直接返回0 return 0; }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()獲取映射文件隊列中最後的映射文件
public MappedFile getLastMappedFile() { MappedFile mappedFileLast = null; while (!this.mappedFiles.isEmpty()) { try { mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1); break; } catch (IndexOutOfBoundsException e) { //continue; } catch (Exception e) { log.error("getLastMappedFile has exception.", e); break; } } return mappedFileLast; }
往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getOffsetInQueueByTime 按topic、queueId、timestamp查詢timeStampOffset
public long getOffsetInQueueByTime(String topic, int queueId, long timestamp) { // 根據topic找到隊列羣 =》 ConsumeQueue logic = this.findConsumeQueue(topic, queueId); if (logic != null) { // 按時間查詢offset=》 return logic.getOffsetInQueueByTime(timestamp); } return 0; }
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue按topic、queueId查詢消費隊列
public ConsumeQueue findConsumeQueue(String topic, int queueId) { // 找到topic的全部消息隊列 ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic); if (null == map) { ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128); ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap); if (oldMap != null) { map = oldMap; } else { map = newMap; } } // 按queue id查找消費者隊列 ConsumeQueue logic = map.get(queueId); if (null == logic) { ConsumeQueue newLogic = new ConsumeQueue( topic, queueId, // 消費者隊列存儲地址 user.home/store/consumequeue StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()), // 每一個文件存儲默認30W this.getMessageStoreConfig().getMapedFileSizeConsumeQueue(), this); ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic); if (oldLogic != null) { logic = oldLogic; } else { logic = newLogic; } } return logic; }
往上返回到這個方法org.apache.rocketmq.store.ConsumeQueue#getOffsetInQueueByTime
按timestamp查詢timeStampOffset
public long getOffsetInQueueByTime(final long timestamp) { // 根據時間找到映射的文件,文件能夠知道最後一次修改的時間 =》 MappedFile mappedFile = this.mappedFileQueue.getMappedFileByTime(timestamp); if (mappedFile != null) { long offset = 0; int low = minLogicOffset > mappedFile.getFileFromOffset() ? (int) (minLogicOffset - mappedFile.getFileFromOffset()) : 0; int high = 0; int midOffset = -1, targetOffset = -1, leftOffset = -1, rightOffset = -1; long leftIndexValue = -1L, rightIndexValue = -1L; // 獲取最小的物理偏移量 =》 long minPhysicOffset = this.defaultMessageStore.getMinPhyOffset(); // =》 SelectMappedBufferResult sbr = mappedFile.selectMappedBuffer(0); if (null != sbr) { ByteBuffer byteBuffer = sbr.getByteBuffer(); high = byteBuffer.limit() - CQ_STORE_UNIT_SIZE; try { while (high >= low) { midOffset = (low + high) / (2 * CQ_STORE_UNIT_SIZE) * CQ_STORE_UNIT_SIZE; byteBuffer.position(midOffset); long phyOffset = byteBuffer.getLong(); int size = byteBuffer.getInt(); if (phyOffset < minPhysicOffset) { low = midOffset + CQ_STORE_UNIT_SIZE; leftOffset = midOffset; continue; } // 按物理offset從commitLog中獲取存儲時間=》 long storeTime = this.defaultMessageStore.getCommitLog().pickupStoreTimestamp(phyOffset, size); if (storeTime < 0) { return 0; } else if (storeTime == timestamp) { targetOffset = midOffset; break; } else if (storeTime > timestamp) { high = midOffset - CQ_STORE_UNIT_SIZE; rightOffset = midOffset; rightIndexValue = storeTime; } else { low = midOffset + CQ_STORE_UNIT_SIZE; leftOffset = midOffset; leftIndexValue = storeTime; } } if (targetOffset != -1) { offset = targetOffset; } else { if (leftIndexValue == -1) { offset = rightOffset; } else if (rightIndexValue == -1) { offset = leftOffset; } else { offset = Math.abs(timestamp - leftIndexValue) > Math.abs(timestamp - rightIndexValue) ? rightOffset : leftOffset; } } return (mappedFile.getFileFromOffset() + offset) / CQ_STORE_UNIT_SIZE; } finally { // 映射文件釋放 sbr.release(); } } } return 0; }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#getMappedFileByTime按timestamp查詢MappedFile
public MappedFile getMappedFileByTime(final long timestamp) { Object[] mfs = this.copyMappedFiles(0); if (null == mfs) return null; for (int i = 0; i < mfs.length; i++) { MappedFile mappedFile = (MappedFile) mfs[i]; // 若是文件的最後修改時間大於等於參數時間 if (mappedFile.getLastModifiedTimestamp() >= timestamp) { return mappedFile; } } return (MappedFile) mfs[mfs.length - 1]; }
往上返回到這個方法org.apache.rocketmq.store.DefaultMessageStore#getMinPhyOffset擦查詢minPhyOffset
@Override public long getMinPhyOffset() { // 獲取commitLog的最小偏移量 =》 return this.commitLog.getMinOffset(); }
進入這個方法org.apache.rocketmq.store.CommitLog#getMinOffset
public long getMinOffset() { // 獲取第一個映射文件=》 MappedFile mappedFile = this.mappedFileQueue.getFirstMappedFile(); if (mappedFile != null) { if (mappedFile.isAvailable()) { // 獲取映射文件的起始偏移量 return mappedFile.getFileFromOffset(); } else { // 獲取下個文件的起始偏移量=》 return this.rollNextFile(mappedFile.getFileFromOffset()); } } return -1; }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#getFirstMappedFile找到映射文件隊列中第一個映射文件
public MappedFile getFirstMappedFile() { MappedFile mappedFileFirst = null; if (!this.mappedFiles.isEmpty()) { try { mappedFileFirst = this.mappedFiles.get(0); } catch (IndexOutOfBoundsException e) { //ignore } catch (Exception e) { log.error("getFirstMappedFile has exception.", e); } } return mappedFileFirst; }
進入這個方法org.apache.rocketmq.store.CommitLog#rollNextFile獲取下一個映射文件的起始偏移量
public long rollNextFile(final long offset) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); return offset + mappedFileSize - offset % mappedFileSize; }
往上返回到這個方法org.apache.rocketmq.store.MappedFile#selectMappedBuffer(int)查詢SelectMappedBufferResult
public SelectMappedBufferResult selectMappedBuffer(int pos) { // 獲取文件讀取的位置 int readPosition = getReadPosition(); if (pos < readPosition && pos >= 0) { if (this.hold()) { // 建立新的緩衝區 ByteBuffer byteBuffer = this.mappedByteBuffer.slice(); byteBuffer.position(pos); int size = readPosition - pos; ByteBuffer byteBufferNew = byteBuffer.slice(); byteBufferNew.limit(size); return new SelectMappedBufferResult(this.fileFromOffset + pos, byteBufferNew, size, this); } } return null; }
往上返回到這個方法org.apache.rocketmq.store.CommitLog#pickupStoreTimestamp按offset、文件大小查詢存儲時間
public long pickupStoreTimestamp(final long offset, final int size) { if (offset >= this.getMinOffset()) { // =》 SelectMappedBufferResult result = this.getMessage(offset, size); if (null != result) { try { // 獲取消息存儲時間 return result.getByteBuffer().getLong(MessageDecoder.MESSAGE_STORE_TIMESTAMP_POSTION); } finally { result.release(); } } } return -1; }
進入這個方法org.apache.rocketmq.store.CommitLog#getMessage查詢buffer
public SelectMappedBufferResult getMessage(final long offset, final int size) { int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog(); // 根據offset找到映射文件 =》 MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0); if (mappedFile != null) { int pos = (int) (offset % mappedFileSize); return mappedFile.selectMappedBuffer(pos, size); } return null; }
進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)按offset查詢mappedFile
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) { try { // 獲取隊列中第一個映射文件 MappedFile firstMappedFile = this.getFirstMappedFile(); // 獲取隊列中最後一個映射文件 MappedFile lastMappedFile = this.getLastMappedFile(); if (firstMappedFile != null && lastMappedFile != null) { // 若是offset不在索引文件的offset範圍內 if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) { LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}", offset, firstMappedFile.getFileFromOffset(), lastMappedFile.getFileFromOffset() + this.mappedFileSize, this.mappedFileSize, this.mappedFiles.size()); } else { // 找到映射文件在隊列中的索引位置 int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize)); MappedFile targetFile = null; try { // 獲取索引文件 targetFile = this.mappedFiles.get(index); } catch (Exception ignored) { } // offset在目標文件的起始offset和結束offset範圍內 if (targetFile != null && offset >= targetFile.getFileFromOffset() && offset < targetFile.getFileFromOffset() + this.mappedFileSize) { return targetFile; } // 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件 for (MappedFile tmpMappedFile : this.mappedFiles) { if (offset >= tmpMappedFile.getFileFromOffset() && offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) { return tmpMappedFile; } } } // 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意 if (returnFirstOnNotFound) { return firstMappedFile; } } } catch (Exception e) { log.error("findMappedFileByOffset Exception", e); } return null; }
往上返回這個方法org.apache.rocketmq.remoting.netty.NettyRemotingServer#invokeOneway執行單途rpc請求
@Override public void invokeOneway(Channel channel, RemotingCommand request, long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { this.invokeOnewayImpl(channel, request, timeoutMillis); }
進入這個方法org.apache.rocketmq.remoting.netty.NettyRemotingAbstract#invokeOnewayImpl
public void invokeOnewayImpl(final Channel channel, final RemotingCommand request, final long timeoutMillis) throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException { request.markOnewayRPC(); // 獲取信號量的信號 boolean acquired = this.semaphoreOneway.tryAcquire(timeoutMillis, TimeUnit.MILLISECONDS); if (acquired) { final SemaphoreReleaseOnlyOnce once = new SemaphoreReleaseOnlyOnce(this.semaphoreOneway); try { channel.writeAndFlush(request).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture f) throws Exception { once.release(); if (!f.isSuccess()) { log.warn("send a request command to channel <" + channel.remoteAddress() + "> failed."); } } }); } catch (Exception e) { once.release(); log.warn("write send a request command to channel <" + channel.remoteAddress() + "> failed."); throw new RemotingSendRequestException(RemotingHelper.parseChannelRemoteAddr(channel), e); } } else { if (timeoutMillis <= 0) { throw new RemotingTooMuchRequestException("invokeOnewayImpl invoke too fast"); } else { String info = String.format( "invokeOnewayImpl tryAcquire semaphore timeout, %dms, waiting thread nums: %d semaphoreAsyncValue: %d", timeoutMillis, this.semaphoreOneway.getQueueLength(), this.semaphoreOneway.availablePermits() ); log.warn(info); throw new RemotingTimeoutException(info); } } }
往上返回到這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#resetOffset結束
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣