說在前面apache
管理請求 CONSUME_MESSAGE_DIRECTLY 直接消費消息微信
源碼解析app
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#consumeMessageDirectlyide
private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());SelectMappedBufferResult selectMappedBufferResult = null;try {MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());// 從commit的offset位置獲取一條消息=》selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());byte[] body = new byte[selectMappedBufferResult.getSize()];selectMappedBufferResult.getByteBuffer().get(body);request.setBody(body);} catch (UnknownHostException e) {} finally {if (selectMappedBufferResult != null) {selectMappedBufferResult.release();}}// =》return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(),requestHeader.getClientId());}
進入這個方法org.apache.rocketmq.store.DefaultMessageStore#selectOneMessageByOffset(long) 根據offset從commitLog中讀取消息this
@Overridepublic SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {// SelectMappedBufferResult中存儲的是消息offset、建立時間=》SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);if (null != sbr) {try {// 1 TOTALSIZEint size = sbr.getByteBuffer().getInt();// =》return this.commitLog.getMessage(commitLogOffset, size);} finally {sbr.release();}}return null;}
進入這個方法org.apache.rocketmq.store.CommitLog#getMessage查詢消息.net
public SelectMappedBufferResult getMessage(final long offset, final int size) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();// 根據offset找到映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);if (mappedFile != null) {int pos = (int) (offset % mappedFileSize);return mappedFile.selectMappedBuffer(pos, size);}return null;}
進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)根據offset查詢映射文件netty
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 獲取隊列中第一個映射文件MappedFile firstMappedFile = this.getFirstMappedFile();// 獲取隊列中最後一個映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {// 若是offset不在索引文件的offset範圍內if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 找到映射文件在隊列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {// 獲取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// offset在目標文件的起始offset和結束offset範圍內if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}// 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}// 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;}
進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#callConsumer調用消費者code
private RemotingCommand callConsumer(final int requestCode,final RemotingCommand request,final String consumerGroup,final String clientId) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);// 按消費組和clentId查詢client的渠道信息 =》ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);if (null == clientChannelInfo) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId));return response;}if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT",clientId,MQVersion.getVersionDesc(clientChannelInfo.getVersion())));return response;}try {RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);newRequest.setExtFields(request.getExtFields());newRequest.setBody(request.getBody());// 調用client=》return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);} catch (RemotingTimeoutException e) {response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);response.setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));return response;} catch (Exception e) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));return response;}}
進入這個方法org.apache.rocketmq.broker.client.ConsumerManager#findChannel個護具clientId找到channelorm
public ClientChannelInfo findChannel(final String group, final String clientId) {// 獲取組的消費組信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (consumerGroupInfo != null) {// 按clineId從消費組信息中獲取client渠道信息return consumerGroupInfo.findChannel(clientId);}return null;}
進入這個方法org.apache.rocketmq.broker.client.net.Broker2Client#callClient調用clientblog
public RemotingCommand callClient(final Channel channel,final RemotingCommand request) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {// =》return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);}
進入這個方法
org.apache.rocketmq.remoting.netty.NettyRemotingServer#invokeSync同步執行請求
@Overridepublic void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);}
這裏前面介紹過了
往上返回到這個方法
org.apache.rocketmq.broker.processor.AdminBrokerProcessor#consumeMessageDirectly結束
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣