說在前面apache
查詢消息處理器c#
源碼解析微信
進入到這個方法,org.apache.rocketmq.broker.processor.QueryMessageProcessor#processRequestapp
@Overridepublic RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {switch (request.getCode()) {// 查詢消息=》case RequestCode.QUERY_MESSAGE:return this.queryMessage(ctx, request);// 按id查詢消息=》case RequestCode.VIEW_MESSAGE_BY_ID:return this.viewMessageById(ctx, request);default:break;}return null; }
進入這個方法,查詢消息,org.apache.rocketmq.broker.processor.QueryMessageProcessor#queryMessageide
public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response =RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class);final QueryMessageResponseHeader responseHeader =(QueryMessageResponseHeader) response.readCustomHeader();final QueryMessageRequestHeader requestHeader =(QueryMessageRequestHeader) request.decodeCommandCustomHeader(QueryMessageRequestHeader.class);response.setOpaque(request.getOpaque());String isUniqueKey = request.getExtFields().get(MixAll.UNIQUE_MSG_QUERY_FLAG);if (isUniqueKey != null && isUniqueKey.equals("true")) {requestHeader.setMaxNum(this.brokerController.getMessageStoreConfig().getDefaultQueryMaxNum());}// =》final QueryMessageResult queryMessageResult =this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(),requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(),requestHeader.getEndTimestamp());assert queryMessageResult != null;responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset());responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp());if (queryMessageResult.getBufferTotalSize() > 0) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);try {FileRegion fileRegion =new QueryMessageTransfer(response.encodeHeader(queryMessageResult.getBufferTotalSize()), queryMessageResult);ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {queryMessageResult.release();if (!future.isSuccess()) {log.error("transfer query message by page cache failed, ", future.cause());}}});} catch (Throwable e) {log.error("", e);queryMessageResult.release();}return null;}response.setCode(ResponseCode.QUERY_NOT_FOUND);response.setRemark("can not find message, maybe time range not correct");return response; }
進入這個方法,org.apache.rocketmq.store.DefaultMessageStore#queryMessageui
@Overridepublic QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {QueryMessageResult queryMessageResult = new QueryMessageResult();long lastQueryMsgTime = end;for (int i = 0; i < 3; i++) {// 按topic、key、分頁數3二、開始時間、結束時間查詢offset=》QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, lastQueryMsgTime);if (queryOffsetResult.getPhyOffsets().isEmpty()) {break;}Collections.sort(queryOffsetResult.getPhyOffsets());queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());for (int m = 0; m < queryOffsetResult.getPhyOffsets().size(); m++) {long offset = queryOffsetResult.getPhyOffsets().get(m);try {boolean match = true;// 根據offset查詢消息=》MessageExt msg = this.lookMessageByOffset(offset);if (0 == m) {lastQueryMsgTime = msg.getStoreTimestamp();}// String[] keyArray = msg.getKeys().split(MessageConst.KEY_SEPARATOR);// if (topic.equals(msg.getTopic())) {// for (String k : keyArray) {// if (k.equals(key)) {// match = true;// break;// }// }// }if (match) {// 根據offset查詢消息數據=》SelectMappedBufferResult result = this.commitLog.getData(offset, false);if (result != null) {int size = result.getByteBuffer().getInt(0);result.getByteBuffer().limit(size);result.setSize(size);queryMessageResult.addMessage(result);}} else {log.warn("queryMessage hash duplicate, {} {}", topic, key);}} catch (Exception e) {log.error("queryMessage exception", e);}}if (queryMessageResult.getBufferTotalSize() > 0) {break;}if (lastQueryMsgTime < begin) {break;}}return queryMessageResult; }
進入這個方法,按topic、key、分頁數3二、開始時間、結束時間查詢offset,org.apache.rocketmq.store.index.IndexService#queryOffsetthis
public QueryOffsetResult queryOffset(String topic, String key, int maxNum, long begin, long end) {List<Long> phyOffsets = new ArrayList<Long>(maxNum);long indexLastUpdateTimestamp = 0;long indexLastUpdatePhyoffset = 0;// 分頁數32,最大批量數是64maxNum = Math.min(maxNum, this.defaultMessageStore.getMessageStoreConfig().getMaxMsgsNumBatch());try {this.readWriteLock.readLock().lock();if (!this.indexFileList.isEmpty()) {for (int i = this.indexFileList.size(); i > 0; i--) {IndexFile f = this.indexFileList.get(i - 1);boolean lastFile = i == this.indexFileList.size();if (lastFile) {indexLastUpdateTimestamp = f.getEndTimestamp();indexLastUpdatePhyoffset = f.getEndPhyOffset();}if (f.isTimeMatched(begin, end)) {// 按key=topic#key、分頁數32,開始時間、結束時間、最新文件查詢物理offsetf.selectPhyOffset(phyOffsets, buildKey(topic, key), maxNum, begin, end, lastFile);}if (f.getBeginTimestamp() < begin) {break;}if (phyOffsets.size() >= maxNum) {break;}}}} catch (Exception e) {log.error("queryMsg exception", e);} finally {this.readWriteLock.readLock().unlock();}return new QueryOffsetResult(phyOffsets, indexLastUpdateTimestamp, indexLastUpdatePhyoffset); }
往上返回到這個方法,按id查詢消息,org.apache.rocketmq.broker.processor.QueryMessageProcessor#viewMessageByIdnetty
public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request)throws RemotingCommandException {final RemotingCommand response = RemotingCommand.createResponseCommand(null);final ViewMessageRequestHeader requestHeader =(ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class);response.setOpaque(request.getOpaque());// 按offste查詢消息=》final SelectMappedBufferResult selectMappedBufferResult =this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset());if (selectMappedBufferResult != null) {response.setCode(ResponseCode.SUCCESS);response.setRemark(null);try { // 這裏採用的是netty的fileRegion 文件 0 copy技術FileRegion fileRegion =new OneMessageTransfer(response.encodeHeader(selectMappedBufferResult.getSize()),selectMappedBufferResult);ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {selectMappedBufferResult.release();if (!future.isSuccess()) {log.error("Transfer one message from page cache failed, ", future.cause());}}});} catch (Throwable e) {log.error("", e);selectMappedBufferResult.release();}return null;} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("can not find message by the offset, " + requestHeader.getOffset());}return response; }
進入這個方法,按offste查詢消息,org.apache.rocketmq.store.DefaultMessageStore#selectOneMessageByOffset(long)前面介紹過了。code
往上返回到這個方法,org.apache.rocketmq.broker.processor.QueryMessageProcessor#processRequest結束。blog
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣