說在前面java
消息拉取處理器express
源碼解析apache
進入這個方法,org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.ChannelHandlerContext, org.apache.rocketmq.remoting.protocol.RemotingCommand)緩存
@Overridepublic RemotingCommand processRequest(final ChannelHandlerContext ctx,RemotingCommand request) throws RemotingCommandException {return this.processRequest(ctx.channel(), request, true); }
進入這個方法,org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest(io.netty.channel.Channel, org.apache.rocketmq.remoting.protocol.RemotingCommand, boolean)微信
private RemotingCommand processRequest(final Channel channel, RemotingCommand request, boolean brokerAllowSuspend)throws RemotingCommandException {RemotingCommand response = RemotingCommand.createResponseCommand(PullMessageResponseHeader.class);final PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.readCustomHeader();final PullMessageRequestHeader requestHeader =(PullMessageRequestHeader) request.decodeCommandCustomHeader(PullMessageRequestHeader.class);// 請求編號response.setOpaque(request.getOpaque());log.debug("receive PullMessage request command, {}", request);// 沒有讀取權限if (!PermName.isReadable(this.brokerController.getBrokerConfig().getBrokerPermission())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark(String.format("the broker[%s] pulling message is forbidden", this.brokerController.getBrokerConfig().getBrokerIP1()));return response;}// 找到消費組的訂閱信息=》SubscriptionGroupConfig subscriptionGroupConfig =this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getConsumerGroup());if (null == subscriptionGroupConfig) {response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);response.setRemark(String.format("subscription group [%s] does not exist, %s", requestHeader.getConsumerGroup(), FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)));return response;}// 若是消費組訂閱信息不容許消費if (!subscriptionGroupConfig.isConsumeEnable()) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("subscription group no permission, " + requestHeader.getConsumerGroup());return response;}final boolean hasSuspendFlag = PullSysFlag.hasSuspendFlag(requestHeader.getSysFlag());final boolean hasCommitOffsetFlag = PullSysFlag.hasCommitOffsetFlag(requestHeader.getSysFlag());final boolean hasSubscriptionFlag = PullSysFlag.hasSubscriptionFlag(requestHeader.getSysFlag());final long suspendTimeoutMillisLong = hasSuspendFlag ? requestHeader.getSuspendTimeoutMillis() : 0;// 查找topic配置信息TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());if (null == topicConfig) {log.error("the topic {} not exist, consumer: {}", requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(channel));response.setCode(ResponseCode.TOPIC_NOT_EXIST);response.setRemark(String.format("topic[%s] not exist, apply first please! %s", requestHeader.getTopic(), FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL)));return response;}// 沒有topic配置信息讀的權限if (!PermName.isReadable(topicConfig.getPerm())) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the topic[" + requestHeader.getTopic() + "] pulling message is forbidden");return response;}if (requestHeader.getQueueId() < 0 || requestHeader.getQueueId() >= topicConfig.getReadQueueNums()) {String errorInfo = String.format("queueId[%d] is illegal, topic:[%s] topicConfig.readQueueNums:[%d] consumer:[%s]",requestHeader.getQueueId(), requestHeader.getTopic(), topicConfig.getReadQueueNums(), channel.remoteAddress());log.warn(errorInfo);response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark(errorInfo);return response;}SubscriptionData subscriptionData = null;ConsumerFilterData consumerFilterData = null;if (hasSubscriptionFlag) {try {// 構建訂閱信息=》subscriptionData = FilterAPI.build(requestHeader.getTopic(), requestHeader.getSubscription(), requestHeader.getExpressionType());// 若是訂閱類型是tasg,構建過濾消費的數據=》if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {consumerFilterData = ConsumerFilterManager.build(requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getSubscription(),requestHeader.getExpressionType(), requestHeader.getSubVersion());assert consumerFilterData != null;}} catch (Exception e) {log.warn("Parse the consumer's subscription[{}] failed, group: {}", requestHeader.getSubscription(),requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_PARSE_FAILED);response.setRemark("parse the consumer's subscription failed");return response;}} else {// 獲取消費組信息ConsumerGroupInfo consumerGroupInfo =this.brokerController.getConsumerManager().getConsumerGroupInfo(requestHeader.getConsumerGroup());if (null == consumerGroupInfo) {log.warn("the consumer's group info not exist, group: {}", requestHeader.getConsumerGroup());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's group info not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}// 訂閱信息中不是廣播消費是集羣消費,消費組信息的消息模式是廣播消費if (!subscriptionGroupConfig.isConsumeBroadcastEnable()&& consumerGroupInfo.getMessageModel() == MessageModel.BROADCASTING) {response.setCode(ResponseCode.NO_PERMISSION);response.setRemark("the consumer group[" + requestHeader.getConsumerGroup() + "] can not consume by broadcast way");return response;}// 從消費組信息中找到訂閱數據subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());if (null == subscriptionData) {log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));return response;}// 訂閱數據中版本號小於請求數據中版本號,訂閱數據不是最新的if (subscriptionData.getSubVersion() < requestHeader.getSubVersion()) {log.warn("The broker's subscription is not latest, group: {} {}", requestHeader.getConsumerGroup(),subscriptionData.getSubString());response.setCode(ResponseCode.SUBSCRIPTION_NOT_LATEST);response.setRemark("the consumer's subscription not latest");return response;}// 訂閱信息中表達式類型不是tag類型if (!ExpressionType.isTagType(subscriptionData.getExpressionType())) {// 按topic過濾的數據中獲取要過濾的消費數據=》consumerFilterData = this.brokerController.getConsumerFilterManager().get(requestHeader.getTopic(),requestHeader.getConsumerGroup());if (consumerFilterData == null) {response.setCode(ResponseCode.FILTER_DATA_NOT_EXIST);response.setRemark("The broker's consumer filter data is not exist!Your expression may be wrong!");return response;}// 消費過濾數據中的版本號小於請求的版本號,過濾數據不是最新的if (consumerFilterData.getClientVersion() < requestHeader.getSubVersion()) {log.warn("The broker's consumer filter data is not latest, group: {}, topic: {}, serverV: {}, clientV: {}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), consumerFilterData.getClientVersion(), requestHeader.getSubVersion());response.setCode(ResponseCode.FILTER_DATA_NOT_LATEST);response.setRemark("the consumer's consumer filter data not latest");return response;}}}// 消息過濾模式不是tag,broker配置中不容許過濾if (!ExpressionType.isTagType(subscriptionData.getExpressionType())&& !this.brokerController.getBrokerConfig().isEnablePropertyFilter()) {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("The broker does not support consumer to filter message by " + subscriptionData.getExpressionType());return response;}MessageFilter messageFilter;if (this.brokerController.getBrokerConfig().isFilterSupportRetry()) {messageFilter = new ExpressionForRetryMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());} else {messageFilter = new ExpressionMessageFilter(subscriptionData, consumerFilterData,this.brokerController.getConsumerFilterManager());}// 獲取消息=》final GetMessageResult getMessageResult =this.brokerController.getMessageStore().getMessage(requestHeader.getConsumerGroup(), requestHeader.getTopic(),requestHeader.getQueueId(), requestHeader.getQueueOffset(), requestHeader.getMaxMsgNums(), messageFilter);if (getMessageResult != null) {response.setRemark(getMessageResult.getStatus().name());responseHeader.setNextBeginOffset(getMessageResult.getNextBeginOffset());responseHeader.setMinOffset(getMessageResult.getMinOffset());responseHeader.setMaxOffset(getMessageResult.getMaxOffset());if (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (this.brokerController.getMessageStoreConfig().getBrokerRole()) {case ASYNC_MASTER:case SYNC_MASTER:break;case SLAVE:if (!this.brokerController.getBrokerConfig().isSlaveReadEnable()) {response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}break;}if (this.brokerController.getBrokerConfig().isSlaveReadEnable()) {// consume too slow ,redirect to another machine 消費太慢直接轉向另外一臺機器if (getMessageResult.isSuggestPullingFromSlave()) {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getWhichBrokerWhenConsumeSlowly());}// consume okelse {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());}} else {responseHeader.setSuggestWhichBrokerId(MixAll.MASTER_ID);}switch (getMessageResult.getStatus()) {case FOUND:response.setCode(ResponseCode.SUCCESS);break;case MESSAGE_WAS_REMOVING:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case NO_MATCHED_LOGIC_QUEUE:case NO_MESSAGE_IN_QUEUE:if (0 != requestHeader.getQueueOffset()) {response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the broker store no queue data, fix the request offset {} to {}, Topic: {} QueueId: {} Consumer Group: {}",requestHeader.getQueueOffset(),getMessageResult.getNextBeginOffset(),requestHeader.getTopic(),requestHeader.getQueueId(),requestHeader.getConsumerGroup());} else {response.setCode(ResponseCode.PULL_NOT_FOUND);}break;case NO_MATCHED_MESSAGE:response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);break;case OFFSET_FOUND_NULL:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_OVERFLOW_BADLY:response.setCode(ResponseCode.PULL_OFFSET_MOVED);// XXX: warn and notify melog.info("the request offset: {} over flow badly, broker max offset: {}, consumer: {}",requestHeader.getQueueOffset(), getMessageResult.getMaxOffset(), channel.remoteAddress());break;case OFFSET_OVERFLOW_ONE:response.setCode(ResponseCode.PULL_NOT_FOUND);break;case OFFSET_TOO_SMALL:response.setCode(ResponseCode.PULL_OFFSET_MOVED);log.info("the request offset too small. group={}, topic={}, requestOffset={}, brokerMinOffset={}, clientIp={}",requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueOffset(),getMessageResult.getMinOffset(), channel.remoteAddress());break;default:assert false;break;}if (this.hasConsumeMessageHook()) {ConsumeMessageContext context = new ConsumeMessageContext();context.setConsumerGroup(requestHeader.getConsumerGroup());context.setTopic(requestHeader.getTopic());context.setQueueId(requestHeader.getQueueId());String owner = request.getExtFields().get(BrokerStatsManager.COMMERCIAL_OWNER);switch (response.getCode()) {case ResponseCode.SUCCESS:int commercialBaseCount = brokerController.getBrokerConfig().getCommercialBaseCount();int incValue = getMessageResult.getMsgCount4Commercial() * commercialBaseCount;context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_SUCCESS);context.setCommercialRcvTimes(incValue);context.setCommercialRcvSize(getMessageResult.getBufferTotalSize());context.setCommercialOwner(owner);break;case ResponseCode.PULL_NOT_FOUND:if (!brokerAllowSuspend) {context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);context.setCommercialRcvTimes(1);context.setCommercialOwner(owner);}break;case ResponseCode.PULL_RETRY_IMMEDIATELY:case ResponseCode.PULL_OFFSET_MOVED:context.setCommercialRcvStats(BrokerStatsManager.StatsType.RCV_EPOLLS);context.setCommercialRcvTimes(1);context.setCommercialOwner(owner);break;default:assert false;break;}// 執行消息消費的鉤子方法在消費消息以前=》this.executeConsumeMessageHookBefore(context);}switch (response.getCode()) {case ResponseCode.SUCCESS:this.brokerController.getBrokerStatsManager().incGroupGetNums(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getMessageCount());this.brokerController.getBrokerStatsManager().incGroupGetSize(requestHeader.getConsumerGroup(), requestHeader.getTopic(),getMessageResult.getBufferTotalSize());this.brokerController.getBrokerStatsManager().incBrokerGetNums(getMessageResult.getMessageCount());if (this.brokerController.getBrokerConfig().isTransferMsgByHeap()) {final long beginTimeMills = this.brokerController.getMessageStore().now();final byte[] r = this.readGetMessageResult(getMessageResult, requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId());this.brokerController.getBrokerStatsManager().incGroupGetLatency(requestHeader.getConsumerGroup(),requestHeader.getTopic(), requestHeader.getQueueId(),(int) (this.brokerController.getMessageStore().now() - beginTimeMills));response.setBody(r);} else {try {FileRegion fileRegion =new ManyMessageTransfer(response.encodeHeader(getMessageResult.getBufferTotalSize()), getMessageResult);channel.writeAndFlush(fileRegion).addListener(new ChannelFutureListener() {@Overridepublic void operationComplete(ChannelFuture future) throws Exception {getMessageResult.release();if (!future.isSuccess()) {log.error("transfer many message by pagecache failed, {}", channel.remoteAddress(), future.cause());}}});} catch (Throwable e) {log.error("transfer many message by pagecache exception", e);getMessageResult.release();}response = null;}break;case ResponseCode.PULL_NOT_FOUND:if (brokerAllowSuspend && hasSuspendFlag) {long pollingTimeMills = suspendTimeoutMillisLong;// 不是長輪詢拉取消息獲取短輪詢的超時時間if (!this.brokerController.getBrokerConfig().isLongPollingEnable()) {pollingTimeMills = this.brokerController.getBrokerConfig().getShortPollingTimeMills();}String topic = requestHeader.getTopic();long offset = requestHeader.getQueueOffset();int queueId = requestHeader.getQueueId();// 構建拉取消息請求PullRequest pullRequest = new PullRequest(request, channel, pollingTimeMills,this.brokerController.getMessageStore().now(), offset, subscriptionData, messageFilter);// 暫停消息拉取服務=》this.brokerController.getPullRequestHoldService().suspendPullRequest(topic, queueId, pullRequest);response = null;break;}case ResponseCode.PULL_RETRY_IMMEDIATELY:break;case ResponseCode.PULL_OFFSET_MOVED:if (this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE|| this.brokerController.getMessageStoreConfig().isOffsetCheckInSlave()) {MessageQueue mq = new MessageQueue();mq.setTopic(requestHeader.getTopic());mq.setQueueId(requestHeader.getQueueId());mq.setBrokerName(this.brokerController.getBrokerConfig().getBrokerName());OffsetMovedEvent event = new OffsetMovedEvent();event.setConsumerGroup(requestHeader.getConsumerGroup());event.setMessageQueue(mq);event.setOffsetRequest(requestHeader.getQueueOffset());event.setOffsetNew(getMessageResult.getNextBeginOffset());this.generateOffsetMovedEvent(event);log.warn("PULL_OFFSET_MOVED:correction offset. topic={}, groupId={}, requestOffset={}, newOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), event.getOffsetRequest(), event.getOffsetNew(),responseHeader.getSuggestWhichBrokerId());} else {responseHeader.setSuggestWhichBrokerId(subscriptionGroupConfig.getBrokerId());response.setCode(ResponseCode.PULL_RETRY_IMMEDIATELY);log.warn("PULL_OFFSET_MOVED:none correction. topic={}, groupId={}, requestOffset={}, suggestBrokerId={}",requestHeader.getTopic(), requestHeader.getConsumerGroup(), requestHeader.getQueueOffset(),responseHeader.getSuggestWhichBrokerId());}break;default:assert false;}} else {response.setCode(ResponseCode.SYSTEM_ERROR);response.setRemark("store getMessage return null");}boolean storeOffsetEnable = brokerAllowSuspend;storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;storeOffsetEnable = storeOffsetEnable&& this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;if (storeOffsetEnable) {// 提交offset=》this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel),requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());}return response; }
進入這個方法,找到消費組的訂閱信息,org.apache.rocketmq.broker.subscription.SubscriptionGroupManager#findSubscriptionGroupConfigapp
public SubscriptionGroupConfig findSubscriptionGroupConfig(final String group) {// 從緩存中獲取組的訂閱信息SubscriptionGroupConfig subscriptionGroupConfig = this.subscriptionGroupTable.get(group);if (null == subscriptionGroupConfig) {// 自動建立消費組或者是系統自用的消費組if (brokerController.getBrokerConfig().isAutoCreateSubscriptionGroup() || MixAll.isSysConsumerGroup(group)) {subscriptionGroupConfig = new SubscriptionGroupConfig();subscriptionGroupConfig.setGroupName(group);SubscriptionGroupConfig preConfig = this.subscriptionGroupTable.putIfAbsent(group, subscriptionGroupConfig);if (null == preConfig) {log.info("auto create a subscription group, {}", subscriptionGroupConfig.toString());}// 更新數據的版本號this.dataVersion.nextVersion();// 持久化=》this.persist();}}return subscriptionGroupConfig; }
進入這個方法,構建訂閱信息,org.apache.rocketmq.common.filter.FilterAPI#buildeclipse
public static SubscriptionData build(final String topic, final String subString,final String type) throws Exception {if (ExpressionType.TAG.equals(type) || type == null) {// =》return buildSubscriptionData(null, topic, subString);}if (subString == null || subString.length() < 1) {throw new IllegalArgumentException("Expression can't be null! " + type);}SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);subscriptionData.setExpressionType(type);return subscriptionData; }
進入這個方法,org.apache.rocketmq.common.filter.FilterAPI#buildSubscriptionDataide
public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,String subString) throws Exception {SubscriptionData subscriptionData = new SubscriptionData();subscriptionData.setTopic(topic);subscriptionData.setSubString(subString);// 訂閱全部的topicif (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {subscriptionData.setSubString(SubscriptionData.SUB_ALL);} else {// 解析tags,多個tag用|分開String[] tags = subString.split("\\|\\|");if (tags.length > 0) {for (String tag : tags) {if (tag.length() > 0) {String trimString = tag.trim();if (trimString.length() > 0) {// 解析訂閱信息中的tagsubscriptionData.getTagsSet().add(trimString);subscriptionData.getCodeSet().add(trimString.hashCode());}}}} else {throw new Exception("subString split error");}}return subscriptionData; }
往上返回到這個方法,若是訂閱類型是tag,構建過濾消費的數據,org.apache.rocketmq.broker.filter.ConsumerFilterManager#buildui
public static ConsumerFilterData build(final String topic, final String consumerGroup,final String expression, final String type,final long clientVersion) {if (ExpressionType.isTagType(type)) {return null;}ConsumerFilterData consumerFilterData = new ConsumerFilterData();consumerFilterData.setTopic(topic);consumerFilterData.setConsumerGroup(consumerGroup);consumerFilterData.setBornTime(System.currentTimeMillis());consumerFilterData.setDeadTime(0);consumerFilterData.setExpression(expression);consumerFilterData.setExpressionType(type);consumerFilterData.setClientVersion(clientVersion);try {consumerFilterData.setCompiledExpression(FilterFactory.INSTANCE.get(type).compile(expression));} catch (Throwable e) {log.error("parse error: expr={}, topic={}, group={}, error={}", expression, topic, consumerGroup, e.getMessage());return null;}return consumerFilterData; }
往上返回到這個方法,按topic過濾的數據中獲取要過濾的消費數據,org.apache.rocketmq.broker.filter.ConsumerFilterManager#get(java.lang.String, java.lang.String)this
public ConsumerFilterData get(final String topic, final String consumerGroup) {if (!this.filterDataByTopic.containsKey(topic)) {return null;}if (this.filterDataByTopic.get(topic).getGroupFilterData().isEmpty()) {return null;}return this.filterDataByTopic.get(topic).getGroupFilterData().get(consumerGroup); }
往上返回到這個方法,獲取消息,org.apache.rocketmq.store.DefaultMessageStore#getMessage
public GetMessageResult getMessage(final String group, final String topic, final int queueId, final long offset,final int maxMsgNums,final MessageFilter messageFilter) {// 消息存儲服務關閉了if (this.shutdown) {log.warn("message store has shutdown, so getMessage is forbidden");return null;}// 消息存儲不能讀取if (!this.runningFlags.isReadable()) {log.warn("message store is not readable, so getMessage is forbidden " + this.runningFlags.getFlagBits());return null;}long beginTime = this.getSystemClock().now();GetMessageStatus status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;long nextBeginOffset = offset;long minOffset = 0;long maxOffset = 0;GetMessageResult getResult = new GetMessageResult();// 從commitLog中獲取最大的offset=》final long maxOffsetPy = this.commitLog.getMaxOffset();// 找到指定topic、queueId的消息隊列=》ConsumeQueue consumeQueue = findConsumeQueue(topic, queueId);if (consumeQueue != null) {// 找到最小的offsetminOffset = consumeQueue.getMinOffsetInQueue();// 找到最大的offset=》maxOffset = consumeQueue.getMaxOffsetInQueue();// 消息隊列中沒有消息if (maxOffset == 0) {status = GetMessageStatus.NO_MESSAGE_IN_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);// offset小於最小的offset了} else if (offset < minOffset) {status = GetMessageStatus.OFFSET_TOO_SMALL;nextBeginOffset = nextOffsetCorrection(offset, minOffset);// offset太大,大於最大的offset了} else if (offset == maxOffset) {status = GetMessageStatus.OFFSET_OVERFLOW_ONE;nextBeginOffset = nextOffsetCorrection(offset, offset);// offset太大,超過最大的offset了} else if (offset > maxOffset) {status = GetMessageStatus.OFFSET_OVERFLOW_BADLY;if (0 == minOffset) {nextBeginOffset = nextOffsetCorrection(offset, minOffset);} else {nextBeginOffset = nextOffsetCorrection(offset, maxOffset);}} else {// 獲取消費隊列的buffer=》SelectMappedBufferResult bufferConsumeQueue = consumeQueue.getIndexBuffer(offset);if (bufferConsumeQueue != null) {try {status = GetMessageStatus.NO_MATCHED_MESSAGE;long nextPhyFileStartOffset = Long.MIN_VALUE;long maxPhyOffsetPulling = 0;int i = 0;final int maxFilterMessageCount = Math.max(16000, maxMsgNums * ConsumeQueue.CQ_STORE_UNIT_SIZE);final boolean diskFallRecorded = this.messageStoreConfig.isDiskFallRecorded();ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();for (; i < bufferConsumeQueue.getSize() && i < maxFilterMessageCount; i += ConsumeQueue.CQ_STORE_UNIT_SIZE) {long offsetPy = bufferConsumeQueue.getByteBuffer().getLong();int sizePy = bufferConsumeQueue.getByteBuffer().getInt();long tagsCode = bufferConsumeQueue.getByteBuffer().getLong();maxPhyOffsetPulling = offsetPy;if (nextPhyFileStartOffset != Long.MIN_VALUE) {if (offsetPy < nextPhyFileStartOffset)continue;}boolean isInDisk = checkInDiskByCommitOffset(offsetPy, maxOffsetPy);// 檢查磁盤if (this.isTheBatchFull(sizePy, maxMsgNums, getResult.getBufferTotalSize(), getResult.getMessageCount(),isInDisk)) {break;}boolean extRet = false, isTagsCodeLegal = true;if (consumeQueue.isExtAddr(tagsCode)) {// 擴展消息存儲文件=》extRet = consumeQueue.getExt(tagsCode, cqExtUnit);if (extRet) {tagsCode = cqExtUnit.getTagsCode();} else {// can't find ext content.Client will filter messages by tag also.log.error("[BUG] can't find consume queue extend file content!addr={}, offsetPy={}, sizePy={}, topic={}, group={}",tagsCode, offsetPy, sizePy, topic, group);isTagsCodeLegal = false;}}if (messageFilter != null&& !messageFilter.isMatchedByConsumeQueue(isTagsCodeLegal ? tagsCode : null, extRet ? cqExtUnit : null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}continue;}// 從commitLog中找到offset和大小的buffer=》SelectMappedBufferResult selectResult = this.commitLog.getMessage(offsetPy, sizePy);if (null == selectResult) {if (getResult.getBufferTotalSize() == 0) {// 消息刪除status = GetMessageStatus.MESSAGE_WAS_REMOVING;}// 根據offset找到下個映射文件=》nextPhyFileStartOffset = this.commitLog.rollNextFile(offsetPy);continue;}if (messageFilter != null&& !messageFilter.isMatchedByCommitLog(selectResult.getByteBuffer().slice(), null)) {if (getResult.getBufferTotalSize() == 0) {status = GetMessageStatus.NO_MATCHED_MESSAGE;}// release...selectResult.release();continue;}this.storeStatsService.getGetMessageTransferedMsgCount().incrementAndGet();getResult.addMessage(selectResult);status = GetMessageStatus.FOUND;nextPhyFileStartOffset = Long.MIN_VALUE;}if (diskFallRecorded) {long fallBehind = maxOffsetPy - maxPhyOffsetPulling;brokerStatsManager.recordDiskFallBehindSize(group, topic, queueId, fallBehind);}nextBeginOffset = offset + (i / ConsumeQueue.CQ_STORE_UNIT_SIZE);long diff = maxOffsetPy - maxPhyOffsetPulling;long memory = (long) (StoreUtil.TOTAL_PHYSICAL_MEMORY_SIZE* (this.messageStoreConfig.getAccessMessageInMemoryMaxRatio() / 100.0));getResult.setSuggestPullingFromSlave(diff > memory);} finally {bufferConsumeQueue.release();}} else {status = GetMessageStatus.OFFSET_FOUND_NULL;nextBeginOffset = nextOffsetCorrection(offset, consumeQueue.rollNextFile(offset));log.warn("consumer request topic: " + topic + "offset: " + offset + " minOffset: " + minOffset + " maxOffset: "+ maxOffset + ", but access logic queue failed.");}}} else {status = GetMessageStatus.NO_MATCHED_LOGIC_QUEUE;nextBeginOffset = nextOffsetCorrection(offset, 0);}if (GetMessageStatus.FOUND == status) {this.storeStatsService.getGetMessageTimesTotalFound().incrementAndGet();} else {this.storeStatsService.getGetMessageTimesTotalMiss().incrementAndGet();}long eclipseTime = this.getSystemClock().now() - beginTime;this.storeStatsService.setGetMessageEntireTimeMax(eclipseTime);getResult.setStatus(status);getResult.setNextBeginOffset(nextBeginOffset);getResult.setMaxOffset(maxOffset);getResult.setMinOffset(minOffset);return getResult; }
進入這個方法,從commitLog中獲取最大的offset,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset
public long getMaxOffset() {// 獲取存儲映射文件隊列中索引位置最大的映射文件=》MappedFile mappedFile = getLastMappedFile();if (mappedFile != null) {// 映射文件的起始offset+映射文件的可讀取的索引位置return mappedFile.getFileFromOffset() + mappedFile.getReadPosition();}// 若是隊列中沒有存儲映射文件直接返回0return 0; }
進入這個方法,獲取存儲映射文件隊列中索引位置最大的映射文件,org.apache.rocketmq.store.MappedFileQueue#getLastMappedFile()
public MappedFile getLastMappedFile() {MappedFile mappedFileLast = null;while (!this.mappedFiles.isEmpty()) {try {mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);break;} catch (IndexOutOfBoundsException e) {//continue;} catch (Exception e) {log.error("getLastMappedFile has exception.", e);break;}}return mappedFileLast; }
往上返回到這個方法,找到指定topic、queueId的消息隊列,org.apache.rocketmq.store.DefaultMessageStore#findConsumeQueue
public ConsumeQueue findConsumeQueue(String topic, int queueId) {// 找到topic的全部消息隊列ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);if (null == map) {ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);if (oldMap != null) {map = oldMap;} else {map = newMap;}}// 按queue id查找消費者隊列ConsumeQueue logic = map.get(queueId);if (null == logic) {ConsumeQueue newLogic = new ConsumeQueue(topic,queueId,// 消費者隊列存儲地址 user.home/store/consumequeueStorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),// 每一個文件存儲默認30Wthis.getMessageStoreConfig().getMapedFileSizeConsumeQueue(),this);ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);if (oldLogic != null) {logic = oldLogic;} else {logic = newLogic;}}return logic; }
往上返回到這個方法,找到最大的offset,org.apache.rocketmq.store.ConsumeQueue#getMaxOffsetInQueue
public long getMaxOffsetInQueue() {// =》return this.mappedFileQueue.getMaxOffset() / CQ_STORE_UNIT_SIZE; }
進入這個方法,org.apache.rocketmq.store.MappedFileQueue#getMaxOffset上面介紹過了。
往上返回到這個方法,獲取消費隊列的buffer,org.apache.rocketmq.store.ConsumeQueue#getIndexBuffer
public SelectMappedBufferResult getIndexBuffer(final long startIndex) {int mappedFileSize = this.mappedFileSize;// 獲取最小的物理offsetlong offset = startIndex * CQ_STORE_UNIT_SIZE;if (offset >= this.getMinLogicOffset()) {// 根據offset查詢映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset);if (mappedFile != null) {SelectMappedBufferResult result = mappedFile.selectMappedBuffer((int) (offset % mappedFileSize));return result;}}return null; }
進入這個方法,根據offset查詢映射文件,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long)
public MappedFile findMappedFileByOffset(final long offset) {// =》return findMappedFileByOffset(offset, false); }
進入這個方法,org.apache.rocketmq.store.MappedFileQueue#findMappedFileByOffset(long, boolean)
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) {try {// 獲取隊列中第一個映射文件MappedFile firstMappedFile = this.getFirstMappedFile();// 獲取隊列中最後一個映射文件MappedFile lastMappedFile = this.getLastMappedFile();if (firstMappedFile != null && lastMappedFile != null) {// 若是offset不在索引文件的offset範圍內if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) {LOG_ERROR.warn("Offset not matched. Request offset: {}, firstOffset: {}, lastOffset: {}, mappedFileSize: {}, mappedFiles count: {}",offset,firstMappedFile.getFileFromOffset(),lastMappedFile.getFileFromOffset() + this.mappedFileSize,this.mappedFileSize,this.mappedFiles.size());} else {// 找到映射文件在隊列中的索引位置int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.mappedFileSize));MappedFile targetFile = null;try {// 獲取索引文件targetFile = this.mappedFiles.get(index);} catch (Exception ignored) {}// offset在目標文件的起始offset和結束offset範圍內if (targetFile != null && offset >= targetFile.getFileFromOffset()&& offset < targetFile.getFileFromOffset() + this.mappedFileSize) {return targetFile;}// 若是按索引在隊列中找不到映射文件就遍歷隊列查找映射文件for (MappedFile tmpMappedFile : this.mappedFiles) {if (offset >= tmpMappedFile.getFileFromOffset()&& offset < tmpMappedFile.getFileFromOffset() + this.mappedFileSize) {return tmpMappedFile;}}}// 若是offset=0獲取隊列中第一個映射文件,我的感受這個邏輯是否放在前面判斷更爲合理,仍是放在這裏另有深意if (returnFirstOnNotFound) {return firstMappedFile;}}} catch (Exception e) {log.error("findMappedFileByOffset Exception", e);}return null; }
往上返回到這個方法,從commitLog中找到offset和大小的buffer,org.apache.rocketmq.store.CommitLog#getMessage
public SelectMappedBufferResult getMessage(final long offset, final int size) {int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMapedFileSizeCommitLog();// 根據offset找到映射文件 =》MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);if (mappedFile != null) {int pos = (int) (offset % mappedFileSize);return mappedFile.selectMappedBuffer(pos, size);}return null; }
接下篇。
說在最後
本次解析僅表明我的觀點,僅供參考。
加入技術微信羣
釘釘技術羣