rocketmq源碼解析查詢消息處理器

說在前面apache

查詢消息處理器c#

 

源碼解析微信

進入到這個方法,org.apache.rocketmq.broker.processor.QueryMessageProcessor#processRequestapp

@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {//            查詢消息=》case RequestCode.QUERY_MESSAGE:return this.queryMessage(ctx, request);//                按id查詢消息=》case RequestCode.VIEW_MESSAGE_BY_ID:return this.viewMessageById(ctx, request);default:break;}return null;    }

進入這個方法,查詢消息,org.apache.rocketmq.broker.processor.QueryMessageProcessor#queryMessageide








public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);final QueryMessageResponseHeader responseHeader =(QueryMessageResponseHeader) response.readCustomHeader();final QueryMessageRequestHeader requestHeader =(QueryMessageRequestHeader) request.decodeCommandCustomHeader(QueryMessageRequestHeader.class);response.setOpaque(request.getOpaque());String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);if (isUniqueKey != null && isUniqueKey.equals("true")) {requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());}//        =》final QueryMessageResult queryMessageResult =this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),requestHeader.getEndTimestamp());assert queryMessageResult != null;responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());if (queryMessageResult.getBufferTotalSize() > 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);try {FileRegion fileRegion =new QueryMessageTransfer(response.encodeHeader(queryMessageResult.getBufferTotalSize()), queryMessageResult);ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {queryMessageResult.release();if (!future.isSuccess()) {log.error("transfer query message by page cache failed, ", future.cause());}}});} catch (Throwable e) {log.error("", e);queryMessageResult.release();}return null;}response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("can not find message, maybe time range not correct");return response;    }

進入這個方法,org.apache.rocketmq.store.DefaultMessageStore#queryMessageui












@Overridepublic QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {QueryMessageResult queryMessageResult = new QueryMessageResult();long lastQueryMsgTime = end;for (int i = 0; i < 3; i++) {//            按topic、key、分頁數3二、開始時間、結束時間查詢offset=》QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);if (queryOffsetResult.getPhyOffsets().isEmpty()) {break;}Collections.sort(queryOffsetResult.getPhyOffsets());queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {long offset = queryOffsetResult.getPhyOffsets().get(m);try {boolean match = true;//                    根據offset查詢消息=》MessageExt msg = this.lookMessageByOffset(offset);if (0 == m) {lastQueryMsgTime = msg.getStoreTimestamp();}//                    String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);//                    if (topic.equals(msg.getTopic())) {//                        for (String k : keyArray) {//                            if (k.equals(key)) {//                                match = true;//                                break;//                            }//                        }//                    }if (match) {//                        根據offset查詢消息數據=》SelectMappedBufferResult result = this.commitLog.getData(offset, false);if (result != null) {int size = result.getByteBuffer().getInt(0);result.getByteBuffer().limit(size);result.setSize(size);queryMessageResult.addMessage(result);}} else {log.warn("queryMessage hash duplicate, {} {}", topic, key);}} catch (Exception e) {log.error("queryMessage exception", e);}}if (queryMessageResult.getBufferTotalSize() > 0) {break;}if (lastQueryMsgTime < begin) {break;}}return queryMessageResult;    }

進入這個方法,按topic、key、分頁數3二、開始時間、結束時間查詢offset,org.apache.rocketmq.store.index.IndexService#queryOffsetthis






public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {List<Long> phyOffsets = new ArrayList<Long>(maxNum);long indexLastUpdateTimestamp = 0;long indexLastUpdatePhyoffset = 0;//        分頁數32,最大批量數是64maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());try {this.readWriteLock.readLock().lock();if (!this.indexFileList.isEmpty()) {for (int i = this.indexFileList.size(); i > 0; i--) {IndexFile f = this.indexFileList.get(i - 1);boolean lastFile = i == this.indexFileList.size();if (lastFile) {indexLastUpdateTimestamp = f.getEndTimestamp();indexLastUpdatePhyoffset = f.getEndPhyOffset();}if (f.isTimeMatched(begin, end)) {//                        按key=topic#key、分頁數32,開始時間、結束時間、最新文件查詢物理offsetf.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);}if (f.getBeginTimestamp() < begin) {break;}if (phyOffsets.size() >= maxNum) {break;}}}} catch (Exception e) {log.error("queryMsg exception", e);} finally {this.readWriteLock.readLock().unlock();}return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset);    }

往上返回到這個方法,按id查詢消息,org.apache.rocketmq.broker.processor.QueryMessageProcessor#viewMessageByIdnetty





public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ViewMessageRequestHeader requestHeader =(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);response.setOpaque(request.getOpaque());//        按offste查詢消息=》final SelectMappedBufferResult selectMappedBufferResult =this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());if (selectMappedBufferResult != null) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);try {            //   這裏採用的是netty的fileRegion 文件 0 copy技術FileRegion fileRegion =new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),selectMappedBufferResult);ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {selectMappedBufferResult.release();if (!future.isSuccess()) {log.error("Transfer one message from page cache failed, ", future.cause());}}});} catch (Throwable e) {log.error("", e);selectMappedBufferResult.release();}return null;} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("can not find message by the offset, " + requestHeader.getOffset());}return response;    }

進入這個方法,按offste查詢消息,org.apache.rocketmq.store.DefaultMessageStore#selectOneMessageByOffset(long)前面介紹過了。code

往上返回到這個方法,org.apache.rocketmq.broker.processor.QueryMessageProcessor#processRequest結束。blog

 

說在最後

本次解析僅表明我的觀點,僅供參考。

 

加入技術微信羣

釘釘技術羣

相關文章
相關標籤/搜索