rocketmq源碼解析之管理請求直接消費消息

說在前面apache

管理請求 CONSUME_MESSAGE_DIRECTLY 直接消費消息微信

 

源碼解析app

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#consumeMessageDirectlyide



private RemotingCommand consumeMessageDirectly(ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {final ConsumeMessageDirectlyResultRequestHeader requestHeader = (ConsumeMessageDirectlyResultRequestHeader) request.decodeCommandCustomHeader(ConsumeMessageDirectlyResultRequestHeader.class);request.getExtFields().put("brokerName", this.brokerController.getBrokerConfig().getBrokerName());SelectMappedBufferResult selectMappedBufferResult = null;try {MessageId messageId = MessageDecoder.decodeMessageId(requestHeader.getMsgId());//            從commit的offset位置獲取一條消息=》selectMappedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(messageId.getOffset());byte[] body = new byte[selectMappedBufferResult.getSize()];selectMappedBufferResult.getByteBuffer().get(body);request.setBody(body);} catch (UnknownHostException e) {} finally {if (selectMappedBufferResult != null) {selectMappedBufferResult.release();}}//        =》return this.callConsumer(RequestCode.CONSUME_MESSAGE_DIRECTLY, request, requestHeader.getConsumerGroup(),requestHeader.getClientId());}

進入這個方法org.apache.rocketmq.store.DefaultMessageStore#selectOneMessageByOffset(long) 根據offset從commitLog中讀取消息this

@Overridepublic SelectMappedBufferResult selectOneMessageByOffset(long commitLogOffset) {//        SelectMappedBufferResult中存儲的是消息offset、建立時間=》SelectMappedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);if (null != sbr) {try {// 1 TOTALSIZEint size = sbr.getByteBuffer().getInt();//                =》return this.commitLog.getMessage(commitLogOffset, size);} finally {sbr.release();}}return null;}

進入這個方法org.apache.rocketmq.store.CommitLog#getMessage查詢消息.net

 

public SelectMappedBufferResult getMessage(final long offset, final int size) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();//        根據offset找到映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);if (mappedFile != null) {int pos = (int) (offset % mappedFileSize);return mappedFile.selectMappedBuffer(pos, size);}return null;}

進入這個方法org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)根據offset查詢映射文件netty

 




public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {//            獲取隊列中第一個映射文件MappedFile firstMappedFile = this.getFirstMappedFile();//            獲取隊列中最後一個映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {//                若是offset不在索引文件的offset範圍內if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {//                   找到映射文件在隊列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {//                        獲取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}//                    offset在目標文件的起始offset和結束offset範圍內if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}//                    若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}//                若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null;}

 

進入這個方法org.apache.rocketmq.broker.processor.AdminBrokerProcessor#callConsumer調用消費者code

 



private RemotingCommand callConsumer(final int requestCode,final RemotingCommand request,final String consumerGroup,final String clientId) throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);//        按消費組和clentId查詢client的渠道信息 =》ClientChannelInfo clientChannelInfo = this.brokerController.getConsumerManager().findChannel(consumerGroup, clientId);if (null == clientChannelInfo) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer <%s> <%s> not online", consumerGroup, clientId));return response;}if (clientChannelInfo.getVersion() < MQVersion.Version.V3_1_8_SNAPSHOT.ordinal()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("The Consumer <%s> Version <%s> too low to finish, please upgrade it to V3_1_8_SNAPSHOT",clientId,MQVersion.getVersionDesc(clientChannelInfo.getVersion())));return response;}try {RemotingCommand newRequest = RemotingCommand.createRequestCommand(requestCode, null);newRequest.setExtFields(request.getExtFields());newRequest.setBody(request.getBody());//            調用client=》return this.brokerController.getBroker2Client().callClient(clientChannelInfo.getChannel(), newRequest);} catch (RemotingTimeoutException e) {response.setCode(ResponseCode.CONSUME_MSG_TIMEOUT);response.setRemark(String.format("consumer <%s> <%s> Timeout: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));return response;} catch (Exception e) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(String.format("invoke consumer <%s> <%s> Exception: %s", consumerGroup, clientId, RemotingHelper.exceptionSimpleDesc(e)));return response;}}

 

進入這個方法org.apache.rocketmq.broker.client.ConsumerManager#findChannel個護具clientId找到channelorm

 

public ClientChannelInfo findChannel(final String group, final String clientId) {//        獲取組的消費組信息ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);if (consumerGroupInfo != null) {//            按clineId從消費組信息中獲取client渠道信息return consumerGroupInfo.findChannel(clientId);}return null;}

 

進入這個方法org.apache.rocketmq.broker.client.net.Broker2Client#callClient調用clientblog

 

public RemotingCommand callClient(final Channel channel,final RemotingCommand request) throws RemotingSendRequestException, RemotingTimeoutException, InterruptedException {//        =》return this.brokerController.getRemotingServer().invokeSync(channel, request, 10000);}

 

進入這個方法

org.apache.rocketmq.remoting.netty.NettyRemotingServer#invokeSync同步執行請求

@Overridepublic void invokeAsync(Channel channel, RemotingCommand request, long timeoutMillis, InvokeCallback invokeCallback)throws InterruptedException, RemotingTooMuchRequestException, RemotingTimeoutException, RemotingSendRequestException {this.invokeAsyncImpl(channel, request, timeoutMillis, invokeCallback);}

 

這裏前面介紹過了

往上返回到這個方法

org.apache.rocketmq.broker.processor.AdminBrokerProcessor#consumeMessageDirectly結束

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索