com.alibaba.rocketmq.tools.command.message.QueryMsgByKeySubCommandexpress
com.alibaba.rocketmq.tools.command.message.QueryMsgByIdSubCommandapache
com.alibaba.rocketmq.tools.command.message.QueryMsgByOffsetSubCommandapp
com.alibaba.rocketmq.broker.processor.QueryMessageProcessorless
/** * Copyright (C) 2010-2013 Alibaba Group Holding Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package com.alibaba.rocketmq.broker.processor; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.FileRegion; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.alibaba.rocketmq.broker.BrokerController; import com.alibaba.rocketmq.broker.pagecache.OneMessageTransfer; import com.alibaba.rocketmq.broker.pagecache.QueryMessageTransfer; import com.alibaba.rocketmq.common.constant.LoggerName; import com.alibaba.rocketmq.common.protocol.MQProtos.MQRequestCode; import com.alibaba.rocketmq.common.protocol.MQProtos.MQResponseCode; import com.alibaba.rocketmq.common.protocol.header.QueryMessageRequestHeader; import com.alibaba.rocketmq.common.protocol.header.QueryMessageResponseHeader; import com.alibaba.rocketmq.common.protocol.header.ViewMessageRequestHeader; import com.alibaba.rocketmq.remoting.exception.RemotingCommandException; import com.alibaba.rocketmq.remoting.netty.NettyRequestProcessor; import com.alibaba.rocketmq.remoting.protocol.RemotingCommand; import com.alibaba.rocketmq.remoting.protocol.RemotingProtos.ResponseCode; import com.alibaba.rocketmq.store.QueryMessageResult; import com.alibaba.rocketmq.store.SelectMapedBufferResult; /** * 查詢消息請求處理 * * @author shijia.wxr<vintage.wang@gmail.com> * @since 2013-7-26 */ public class QueryMessageProcessor implements NettyRequestProcessor { private static final Logger log = LoggerFactory.getLogger(LoggerName.BrokerLoggerName); private final BrokerController brokerController; public QueryMessageProcessor(final BrokerController brokerController) { this.brokerController = brokerController; } @Override public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { MQRequestCode code = MQRequestCode.valueOf(request.getCode()); switch (code) { case QUERY_MESSAGE: return this.queryMessage(ctx, request); case VIEW_MESSAGE_BY_ID: return this.viewMessageById(ctx, request); default: break; } return null; } public RemotingCommand queryMessage(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(QueryMessageResponseHeader.class); final QueryMessageResponseHeader responseHeader = (QueryMessageResponseHeader) response.getCustomHeader(); final QueryMessageRequestHeader requestHeader = (QueryMessageRequestHeader) request .decodeCommandCustomHeader(QueryMessageRequestHeader.class); // 校驗查詢時間範圍 long maxTimeSpan = this.brokerController.getBrokerConfig().getQueryMessageMaxTimeSpan() * 60 * 60 * 1000; long diff = requestHeader.getEndTimestamp() - requestHeader.getBeginTimestamp(); if (diff > maxTimeSpan) { response.setCode(ResponseCode.SYSTEM_ERROR_VALUE); response.setRemark("the time range is too long, broker limits " + maxTimeSpan + "h"); return response; } // 因爲使用sendfile,因此必需要設置 response.setOpaque(request.getOpaque()); final QueryMessageResult queryMessageResult = this.brokerController.getMessageStore().queryMessage(requestHeader.getTopic(), requestHeader.getKey(), requestHeader.getMaxNum(), requestHeader.getBeginTimestamp(), requestHeader.getEndTimestamp()); assert queryMessageResult != null; responseHeader.setIndexLastUpdatePhyoffset(queryMessageResult.getIndexLastUpdatePhyoffset()); responseHeader.setIndexLastUpdateTimestamp(queryMessageResult.getIndexLastUpdateTimestamp()); // 說明找到消息 if (queryMessageResult.getBufferTotalSize() > 0) { response.setCode(ResponseCode.SUCCESS_VALUE); response.setRemark(null); try { FileRegion fileRegion = new QueryMessageTransfer(response.encodeHeader(queryMessageResult .getBufferTotalSize()), queryMessageResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { queryMessageResult.release(); if (!future.isSuccess()) { log.error("transfer query message by pagecache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); queryMessageResult.release(); } return null; } response.setCode(MQResponseCode.QUERY_NOT_FOUND_VALUE); response.setRemark("can not find message, maybe time range not correct"); return response; } public RemotingCommand viewMessageById(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException { final RemotingCommand response = RemotingCommand.createResponseCommand(null); final ViewMessageRequestHeader requestHeader = (ViewMessageRequestHeader) request.decodeCommandCustomHeader(ViewMessageRequestHeader.class); // 因爲使用sendfile,因此必需要設置 response.setOpaque(request.getOpaque()); final SelectMapedBufferResult selectMapedBufferResult = this.brokerController.getMessageStore().selectOneMessageByOffset(requestHeader.getOffset()); if (selectMapedBufferResult != null) { response.setCode(ResponseCode.SUCCESS_VALUE); response.setRemark(null); try { FileRegion fileRegion = new OneMessageTransfer(response.encodeHeader(selectMapedBufferResult.getSize()), selectMapedBufferResult); ctx.channel().writeAndFlush(fileRegion).addListener(new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { selectMapedBufferResult.release(); if (!future.isSuccess()) { log.error("transfer one message by pagecache failed, ", future.cause()); } } }); } catch (Throwable e) { log.error("", e); selectMapedBufferResult.release(); } return null; } else { response.setCode(ResponseCode.SYSTEM_ERROR_VALUE); response.setRemark("can not find message by the offset, " + requestHeader.getOffset()); } return response; } }
@Override
public SelectMapedBufferResult selectOneMessageByOffset(long commitLogOffset) {
SelectMapedBufferResult sbr = this.commitLog.getMessage(commitLogOffset, 4);
if (null != sbr) {
try {
// 1 TOTALSIZE
int size = sbr.getByteBuffer().getInt();
return this.commitLog.getMessage(commitLogOffset, size);
}
finally {
sbr.release();
}
}ide
return null;
}ui
@Override
public QueryMessageResult queryMessage(String topic, String key, int maxNum, long begin, long end) {
QueryOffsetResult queryOffsetResult = this.indexService.queryOffset(topic, key, maxNum, begin, end);
QueryMessageResult queryMessageResult = new QueryMessageResult();this
queryMessageResult.setIndexLastUpdatePhyoffset(queryOffsetResult.getIndexLastUpdatePhyoffset());
queryMessageResult.setIndexLastUpdateTimestamp(queryOffsetResult.getIndexLastUpdateTimestamp());.net
for (Long offset : queryOffsetResult.getPhyOffsets()) {
SelectMapedBufferResult result = this.commitLog.getData(offset, false);
if (result != null) {
int size = result.getByteBuffer().getInt(0);
result.getByteBuffer().limit(size);
result.setSize(size);
queryMessageResult.addMessage(result);
}
}netty
return queryMessageResult;
}code