消息消費以組的模式開展,一個消費組內能夠包含多個消費者,1個消費組可訂閱多個主題。消費組之間有集羣模式與廣播模式兩種。java
集羣模式下,主題下的同一消息只容許被消費組內的一個消費者消費,消費進度存儲在 broker 端。廣播模式下,則每一個消費者均可以消費該消息,消費進度存儲在消費者端。express
消息服務器與消費者的消息傳輸有 2 種方式:推模式、拉模式。拉模式,即消費者主動向消息服務器發送請求;推模式,即消息服務器向消費者推送消息。推模式,是基於拉模式實現的。服務器
集羣模式下,一個消費隊列同一時間,只容許被一個消費者消費,1個消費者,能夠消費多個消息隊列。併發
RocketMQ 只支持局部順序消息消費,即保證同一個消息隊列上的消息順序消費。若是想保證一個 Topic 下的順序消費,那麼只能將該主題的消息隊列設置爲 1。app
DefaultMQPushConsumerdom
// 消費者組 private String consumerGroup; // 消費模式,默認集羣 private MessageModel messageModel = MessageModel.CLUSTERING; // 根據消息進度從消息服務器拉取不到消息時從新計算消費策略 // CONSUME_FROM_LAST_OFFSET:從隊列當前最大偏移量開始消費 // CONSUME_FROM_FIRST_OFFSET 從最先可用的消息開始消費 // CONSUME_FROM_TIMESTAMP 從指定的時間戳開始消費 private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET; // 集羣模式下消息隊列負載策略 private AllocateMessageQueueStrategy allocateMessageQueueStrategy; // 訂閱消息 private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>(); // 消息業務監聽器 private MessageListener messageListener; // 消息消費進度存儲器 private OffsetStore offsetStore; // 消費者最小線程數 private int consumeThreadMin = 20; // 消費者最大線程數 private int consumeThreadMax = 20; // 併發消息消費時處理隊列最大跨度。表示若是消息處理隊列中偏移量最大的消息與偏移量最小的消息的跨度超過2000則延遲50毫秒再拉取消息 private int consumeConcurrentlyMaxSpan = 2000; // 每 1000 次流控後打印流控日誌 private int pullThresholdForQueue = 1000; // 推模式下拉取任務時間間隔,默認一次拉取任務完成繼續拉取 private long pullInterval = 0; // 消息併發消費時一次消費消息條數 private int consumeMessageBatchMaxSize = 1; // 每次消息拉取條數 private int pullBatchSize = 32; // 是否每次拉取消息都更新訂閱信息 private boolean postSubscriptionWhenPull = false; // 最大消費重試次數,消息消費次數超過 maxReconsumeTimes 還未成功,則將該消息轉移到一個失敗隊列,等待刪除 private int maxReconsumeTimes = -1; // 消費超時時間,單位 分鐘。 private long consumeTimeout = 15;
代碼位置:DefaultMQPushConsumerImpl#start異步
DefaultMQPushConsumerImpl#copySubscriptionide
private void copySubscription() throws MQClientException { try { Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); // 構建 SubscriptionData SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString); // 加入 RebalanceImpl 訂閱消息中 this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } // ... switch (this.defaultMQPushConsumer.getMessageModel()) { // ... case CLUSTERING: // 獲取重試的主題,格式: %RETRY% + 消費組名 final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); // 構建重試主題的訂閱消息 SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL); // 加入 RebalanceImpl 訂閱消息中 this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; // ... } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
if (this.defaultMQPushConsumer.getOffsetStore() != null) { this.offsetStore = this.defaultMQPushConsumer.getOffsetStore(); } else { switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: // 從本地獲取消費進度 this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; case CLUSTERING: // 從 broker 獲取消費進度 this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup()); break; default: break; } this.defaultMQPushConsumer.setOffsetStore(this.offsetStore); } this.offsetStore.load();
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) { this.consumeOrderly = true; this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner()); } else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) { this.consumeOrderly = false; this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner()); } this.consumeMessageService.start();
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this); // ... mQClientFactory.start();
RocketMQ 經過 PullMessageService 拉取消息。 函數
PullMessageService#runpost
public void run() { // stopped 是 volidate 修飾的變量,用於線程間通訊。 while (!this.isStopped()) { // .. // 阻塞隊列, 若是 pullRequestQueue 沒有元素,則阻塞 PullRequest pullRequest = this.pullRequestQueue.take(); // 消息拉取 this.pullMessage(pullRequest); // ... } }
// 消費者組 private String consumerGroup; // 消息隊列 private MessageQueue messageQueue; // 消息處理隊列,從 Broker 拉取到的消息先存入 ProcessQueue,而後再提交到消費者消費池消費 private ProcessQueue processQueue; // 待拉取的 MessageQueue 偏移量 private long nextOffset; // 是否被鎖定 private boolean lockedFirst = false;
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) { if (!isStopped()) { this.scheduledExecutorService.schedule(new Runnable() { @Override public void run() { PullMessageService.this.executePullRequestImmediately(pullRequest); } }, timeDelay, TimeUnit.MILLISECONDS); } else { log.warn("PullMessageServiceScheduledThread has shutdown"); } }
public void executePullRequestImmediately(final PullRequest pullRequest) { try { this.pullRequestQueue.put(pullRequest); } catch (InterruptedException e) { log.error("executePullRequestImmediately pullRequestQueue.put", e); } }
ProcessQueue 是 MessageQueue 在消費端的重現、快照。PullMessageService 從消息服務器默認每次拉取 32 條消息,按消息的隊列偏移量順序存放在 ProcessQueue 中,PullMessageService 再將消息提交到消費者消費線程池。消息消費成功後,從 ProcessQueue 中移除。
// 讀寫鎖 private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock(); // 消息存儲容器, k:消息偏移量,v:消息實體 private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>(); // ProcessQueue 中消息總數 private final AtomicLong msgCount = new AtomicLong(); // ProcessQueue 中消息總大小 private final AtomicLong msgSize = new AtomicLong(); // 當前 ProcessQueue 中包含的最大隊列偏移量 private volatile long queueOffsetMax = 0L; // 當前 ProcessQueue 是否被丟棄 private volatile boolean dropped = false; // 上一次開始消息拉取時間戳 private volatile long lastPullTimestamp = System.currentTimeMillis(); // 上一次消息消費時間戳 private volatile long lastConsumeTimestamp = System.currentTimeMillis();
代碼位置:DefaultMQPushConsumerImpl#pullMessage
final ProcessQueue processQueue = pullRequest.getProcessQueue(); // 被刪除 if (processQueue.isDropped()) { return; } // 設置拉取時間 pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis()); try { this.makeSureStateOK(); } catch (MQClientException e) { // 狀態非法,延遲 3 秒拉取消息 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); return; } // 消費者被掛起 if (this.isPause()) { // 延遲 1 秒拉取消息 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND); return; }
流控控制有如下幾個維度:
long cachedMessageCount = processQueue.getMsgCount().get(); long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024); // processQueue 的消息數量 大於 1000,觸發流控 if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) { // 50 毫秒再拉取消息 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { // ... 流控輸出語句 } return; } // processQueue 的消息大小 大於 100 MB,觸發流控 if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) { // 50 毫秒再拉取消息 this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueFlowControlTimes++ % 1000) == 0) { // ... 流控輸出語句 } return; } if (!this.consumeOrderly) { // 順序消費 // processQueue 中偏移量最大的消息與偏移量最小的消息的跨度超過2000則延遲50毫秒再拉取消息 if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) { this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL); if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) { // ... 流控輸出語句 } return; } } else { if (processQueue.isLocked()) { if (!pullRequest.isLockedFirst()) { final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue()); boolean brokerBusy = offset < pullRequest.getNextOffset(); // .. if (brokerBusy) { // ... } pullRequest.setLockedFirst(true); pullRequest.setNextOffset(offset); } } else { // 確保 processQueue 在消費以前必須被鎖定。 this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.info("pull message later because not locked in broker, {}", pullRequest); return; } }
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (null == subscriptionData) { this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException); log.warn("find the consumer's subscription failed, {}", pullRequest); return; }
拉消息系統標記:
// 從內存中讀取消費進度,若是大於0,則設置 commitOffsetEnable = true boolean commitOffsetEnable = false; long commitOffsetValue = 0L; if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) { commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY); if (commitOffsetValue > 0) { commitOffsetEnable = true; } } String subExpression = null; boolean classFilter = false; SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic()); if (sd != null) { if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) { subExpression = sd.getSubString(); } classFilter = sd.isClassFilterMode(); } int sysFlag = PullSysFlag.buildSysFlag( commitOffsetEnable, // commitOffset true, // suspend subExpression != null, // subscription classFilter // class filter );
this.pullAPIWrapper.pullKernelImpl( pullRequest.getMessageQueue(), // 從哪一個隊列拉取消息 subExpression, // 消息過濾表達式 subscriptionData.getExpressionType(), // 消息表達式類型 TAG、SQL92 subscriptionData.getSubVersion(), pullRequest.getNextOffset(), // 本次拉取消息偏移量 this.defaultMQPushConsumer.getPullBatchSize(), // 本次拉取最大消息條數,默認 32 sysFlag, // 拉消息 系統標記 commitOffsetValue, // 當前 MessageQueue 消費進度 BROKER_SUSPEND_MAX_TIME_MILLIS, // 消息拉取過程當中 容許 broker 掛起時間,默認 15 s CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 消息拉取超時時間,默認 30 s CommunicationMode.ASYNC, // 消息拉取模式,默認爲異步拉取 pullCallback // 從 broker 拉取到消息後的回調方法 );
最終調用 MQClientAPIImpl#pullMessageAsync 拉取消息
代碼位置:PullMessageProcessor#processRequest
代碼入口: MQClientAPIImpl#pullMessageAsync
處理請求回調,獲取 PullResult。並經過 PullCallback 回調至 DefaultMQPushConsumerImpl#pullMessage
private PullResult processPullResponse( final RemotingCommand response) throws MQBrokerException, RemotingCommandException { PullStatus pullStatus = PullStatus.NO_NEW_MSG; switch (response.getCode()) { case ResponseCode.SUCCESS: pullStatus = PullStatus.FOUND; break; case ResponseCode.PULL_NOT_FOUND: pullStatus = PullStatus.NO_NEW_MSG; break; case ResponseCode.PULL_RETRY_IMMEDIATELY: pullStatus = PullStatus.NO_MATCHED_MSG; break; case ResponseCode.PULL_OFFSET_MOVED: pullStatus = PullStatus.OFFSET_ILLEGAL; break; default: throw new MQBrokerException(response.getCode(), response.getRemark()); } PullMessageResponseHeader responseHeader = (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class); return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody()); }
代碼入口:PullAPIWrapper#processPullResult
代碼入口:DefaultMQPushConsumerImpl$PullCallback#onSuccess
// 設置下一次拉取的偏移量 long prevRequestOffset = pullRequest.getNextOffset(); pullRequest.setNextOffset(pullResult.getNextBeginOffset()); long pullRT = System.currentTimeMillis() - beginTimestamp; DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT); long firstMsgOffset = Long.MAX_VALUE; if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) { // 消息爲null,當即拉取新的消息 DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } else { firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset(); DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size()); // 將消息放入 ProcessQueue,並將消息交由 ConsumeMessageService 消費。 boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList()); DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest( pullResult.getMsgFoundList(), processQueue, pullRequest.getMessageQueue(), dispatchToConsume); // 根據 pullInterval 參數,等待 pullInterval 毫秒將 PullRequest 放入 pullRequestQueue 中。 // 推模式下, pullInterval 默認爲 0 if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) { DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval()); } else { DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest); } }
RocketMQ 推模式是循環向消息服務端發送消息拉取請求。消費者向 broker 拉取消息時,若是消息未到達消費隊列,而且未啓用 長輪詢機制,則會在服務端等待 shortPollingTimeMills(默認1秒) 時間後再去判斷消息是否已經到達消息隊列,若是消息未到達,則提示消息拉取客戶端 PULL_NOT_FOUND。若是開啓長輪詢模式,rocketMQ 會每 5s 輪詢檢查一次消息是否可達,同時一有新消息到達後立馬通知掛起線程再次驗證新消息是不是本身感興趣的消息,若是是則從 commitlog 文件提取消息返回給消息拉取客戶端,不然直到掛起超時,超時時間由消息拉取方在消息拉取時封裝在請求參數中,PUSH 模式默認 15s。 PULL 模式經過 DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis 設置。RocketMQ 經過在 Broker 端配置 longPollingEnable 爲 true 來開啓長輪詢模式。
RocketMQ 的長輪詢機制由 2 個線程共同完成。PullRequestHoldService、ReputMessageService。
PullRequestHoldService 添加消息拉取任務
代碼入口:PullRequestHoldService#suspendPullRequest
若是 broker 支持 長輪詢,則每 5s 嘗試一次,若是未開啓,則每 1s 嘗試一次
PullRequestHoldService 檢查是否可拉取消息
代碼入口:PullRequestHoldService#notifyMessageArriving
消息拉取長輪詢是指:rocketMQ 每 5s 輪詢檢查一次消息是否可達。有消息後,立馬通知掛起的線程處理消息。
須要經過在 broker 配置 longPollingEnable=true 開啓長輪詢模式。
RocketMQ 消息隊列從新分佈由 RebalanceService 線程來實現的。RebalanceService 隨着 MQClientInstance 的啓動而啓動。RebalanceService 默認每 20 秒,執行一次 MQClientInstance#doRebalance
代碼入口:RebalanceImpl#rebalanceByTopic
// 獲取主題的隊列 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); // 向 broker 發送請求,獲取該主題下,該消費組的全部 消費者客戶端ID List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); if (null == mqSet) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic); } } if (null == cidAll) { log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic); } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); // 排序,同一消費組內,視圖一致,確保同一個消費者隊列不會被多個消費者分配 Collections.sort(mqAll); Collections.sort(cidAll); // 根據分配策略爲消費者分配隊列。 AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(), e); return; } Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } // 更新消費者消息隊列信息 boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder); if (changed) { this.messageQueueChanged(topic, mqSet, allocateResultSet); } }
代碼入口:RebalanceImpl#updateProcessQueueTableInRebalance
若是 processQueue 中的 MessageQueue 不在剛分配的 MessageQueue 中, 那麼表示,該 MessageQueue 已分配給別的消費者,那麼須要刪除此 PullRequest
private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) { boolean changed = false; Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator(); while (it.hasNext()) { Entry<MessageQueue, ProcessQueue> next = it.next(); MessageQueue mq = next.getKey(); ProcessQueue pq = next.getValue(); // 若是 processQueue 中的 MessageQueue 不在剛分配的 MessageQueue 中,則刪除 MessageQueue。 if (mq.getTopic().equals(topic)) { if (!mqSet.contains(mq)) { pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; } } else if (pq.isPullExpired()) { switch (this.consumeType()) { case CONSUME_ACTIVELY: break; case CONSUME_PASSIVELY: pq.setDropped(true); if (this.removeUnnecessaryMessageQueue(mq, pq)) { it.remove(); changed = true; } break; default: break; } } } }
遍歷剛分配的 MessageQueue,若是 processQueueTable 中不包含,則表示是剛添加的。
首先隊列刪除消費進度,再爲之建立 ProcessQueue,計算下次拉取的偏移量。建立對應的 PullRequest,並加入到 pullRequestList 中, 喚醒 PullMessageService。
List<PullRequest> pullRequestList = new ArrayList<PullRequest>(); // 遍歷剛分配的 MessageQueue,若是 processQueue 中不包含,則表示是剛添加的。 for (MessageQueue mq : mqSet) { if (!this.processQueueTable.containsKey(mq)) { if (isOrder && !this.lock(mq)) { log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq); continue; } // 移除消費進度 this.removeDirtyOffset(mq); // 建立新的 ProcessQueue ProcessQueue pq = new ProcessQueue(); // 計算下次從哪裏拉取消息 long nextOffset = this.computePullFromWhere(mq); if (nextOffset >= 0) { ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq); if (pre != null) { log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq); } else { log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq); PullRequest pullRequest = new PullRequest(); pullRequest.setConsumerGroup(consumerGroup); pullRequest.setNextOffset(nextOffset); pullRequest.setMessageQueue(mq); pullRequest.setProcessQueue(pq); pullRequestList.add(pullRequest); changed = true; } } else { log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq); } } }
在計算下次拉取偏移量時, RocketMQ 提供了 3 種方式。可在消費者啓動時,調用DefaultMQPushConsumer#setConsumeFromWhere 設置
注意: ConsumeFromWhere 消費進度校驗只有在從磁盤中獲取的消費進度返回 -1 時才失效。即剛建立的 消費組,若是 broker 中已經有記錄該消費組的消費進度,那麼該值的設置是無效的
總代碼入口:ConsumeMessageConcurrentlyService#submitConsumeRequest
代碼位置:ConsumeMessageConcurrentlyService#submitConsumeRequest
ConsumeMessageConcurrentlyService$ConsumeRequest#run
public void run() { // 阻止消費者 消費 不屬於本身的 隊列 if (this.processQueue.isDropped()) { log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue); return; } MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener; ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue); ConsumeConcurrentlyStatus status = null; // 恢復重試消息主題名, rocketMQ 消息重試機制,決定了,若是發現消息的延時級別 delayTimeLevel 大於 0, // 會首先將重試主題存入消息的屬性中,而後設置主題名稱爲 SCHEDULE_TOPIC ,以便時間到後從新參與消息消費。 defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup()); // 消息消費前 hock // ... long beginTimestamp = System.currentTimeMillis(); boolean hasException = false; ConsumeReturnType returnType = ConsumeReturnType.SUCCESS; try { if (msgs != null && !msgs.isEmpty()) { for (MessageExt msg : msgs) { MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis())); } } // 消費消息,這裏是調用咱們的業務代碼。 status = listener.consumeMessage(Collections.unmodifiableList(msgs), context); } catch (Throwable e) { // 記錄異常 hasException = true; } // ... // 執行消費後 hock // ... // 再次對 processQueue dropped 進行校驗,若是爲 true,那麼不對結果進行處理。由於消息會被別的消費者從新消費 if (!processQueue.isDropped()) { ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this); } else { // 記錄異常 } }
ConsumeMessageConcurrentlyService$processConsumeResult
// 默認 Integer.MAX_VALUE int ackIndex = context.getAckIndex(); if (consumeRequest.getMsgs().isEmpty()) return; switch (status) { // 消費成功, ackIndex 被設置爲 consumeRequest.getMsgs().size() - 1 case CONSUME_SUCCESS: if (ackIndex >= consumeRequest.getMsgs().size()) { ackIndex = consumeRequest.getMsgs().size() - 1; } int ok = ackIndex + 1; int failed = consumeRequest.getMsgs().size() - ok; this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok); this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed); break; // 消費失敗, ackIndex 被設置爲 -1. case RECONSUME_LATER: ackIndex = -1; this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size()); break; default: break; } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString()); } break; // 集羣模式 case CLUSTERING: List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size()); // for 循環內的代碼,只有在消費失敗的時候,纔會執行到 // 消費失敗, ackIndex = -1。所以該批次消息都須要發 ack // 消息消費成功, ackIndex = consumeRequest.getMsgs().size() - 1。所以不會執行 for 循環。 for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) { MessageExt msg = consumeRequest.getMsgs().get(i); // 調用 producer 從新發送消息 boolean result = this.sendMessageBack(msg, context); // 消息發送 ACK 失敗, if (!result) { msg.setReconsumeTimes(msg.getReconsumeTimes() + 1); msgBackFailed.add(msg); } } // 若是存在發送 ACK 失敗的消息, 延遲 5 秒後,從新消費。 if (!msgBackFailed.isEmpty()) { consumeRequest.getMsgs().removeAll(msgBackFailed); this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue()); } break; default: break; } // 移除這批消息,返回 移除該批消息後最小的偏移量 long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs()); if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) { // 無論是消費成功,仍是消費失敗,都會更新消費進度 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true); }
總代碼入口:SendMessageProcessor#consumerSendMsgBack
客戶端在發送重試消息時,封裝了 ConsumerSendMsgBackRequestHeader。先看看這個類的屬性
// 消息物理偏移量 private Long offset; // 消費組 private String group; // 延遲等級 private Integer delayLevel; // 消息ID private String originMsgId; // 消息主題 private String originTopic; // 最大從新消費次數,默認 16 次 SubscriptionGroupConfig.retryMaxTimes 中定義 private Integer maxReconsumeTimes;
當客戶端調用 MQClientAPIImpl#consumerSendMessageBack ,發送消息時,服務由 SendMessageProcessor#consumerSendMsgBack 接收這次請求。
代碼入口:SendMessageProcessor#consumerSendMsgBack
注:若是以上某個環節出現錯誤,會致使消息客戶端從新封裝新的 ConsumeRequest,並延遲 5s 執行。
private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request) throws RemotingCommandException { // ... // 鉤子函數 // ... // 先獲取消費組訂閱配置信息 SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup()); // 不存在,直接返回 if (null == subscriptionGroupConfig) { response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST); response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " " + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST)); return response; } // 隊列權限判斷 // ... // 建立新的主題,並隨機選擇一個隊列 // %RETRY% + group String newTopic = MixAll.getRetryTopic(requestHeader.getGroup()); // 隨機選擇一個隊列 int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums(); // ... // ... // 根據 偏移量 獲取消息 MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset()); if (null == msgExt) { response.setCode(ResponseCode.SYSTEM_ERROR); response.setRemark("look message by offset failed, " + requestHeader.getOffset()); return response; } // ... int delayLevel = requestHeader.getDelayLevel(); // 消息超過最大重試次數,默認最大重試 16 次 int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes(); if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) { maxReconsumeTimes = requestHeader.getMaxReconsumeTimes(); } // 若是消息重試次數超過 maxReconsumeTimes,再次改寫 newTopic 主題爲 %DLQ%, // 該主題的權限爲只寫,說明消息一旦進入到 DLQ 隊列中,RocketMQ 將不在負責再次調度 // 消費,須要人工干預。 if (msgExt.getReconsumeTimes() >= maxReconsumeTimes || delayLevel < 0) { newTopic = MixAll.getDLQTopic(requestHeader.getGroup()); queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP; topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic, DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0); // ... } else { if (0 == delayLevel) { delayLevel = 3 + msgExt.getReconsumeTimes(); } msgExt.setDelayTimeLevel(delayLevel); } // 按照原消息,建立一個新的消息,主題: %RETRY% + group MessageExtBrokerInner msgInner = new MessageExtBrokerInner(); msgInner.setTopic(newTopic); msgInner.setBody(msgExt.getBody()); msgInner.setFlag(msgExt.getFlag()); MessageAccessor.setProperties(msgInner, msgExt.getProperties()); msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties())); msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags())); msgInner.setQueueId(queueIdInt); msgInner.setSysFlag(msgExt.getSysFlag()); msgInner.setBornTimestamp(msgExt.getBornTimestamp()); msgInner.setBornHost(msgExt.getBornHost()); msgInner.setStoreHost(this.getStoreHost()); msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1); String originMsgId = MessageAccessor.getOriginMessageId(msgExt); MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId); // 交由 commitlog 存入消息 PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner); if (putMessageResult != null) { switch (putMessageResult.getPutMessageStatus()) { case PUT_OK: // ... 返回正確結果 default: break; } // ... 返回錯誤結果 return response; } // ... 返回錯誤結果 return response; }
代碼入口:ScheduleMessageService#start
邏輯比較簡單,直接說主要的流程。
(以上,是查找消息的流程)
其中須要注意的一個細節是:ScheduleMessageService 每隔 10 秒鐘 持久化一次 延遲消息消費進度。
代碼入口:RebalanceImpl#updateProcessQueueTableInRebalance 以及加鎖邏輯 RebalanceImpl#lock
代碼入口:DefaultMQPushConsumerImpl#pullMessage
代碼入口:ConsumerMessageOrderly#start
ConsumerMessageOrderly 消費流程
代碼入口:ConsumerMessageOrderly$ConsumeRequest#run