RocketMQ(4.6.1)系列教程--消息消費篇

RocketMQ 消息消費

消息消費總覽

消息消費以組的模式開展,一個消費組內能夠包含多個消費者,1個消費組可訂閱多個主題。消費組之間有集羣模式與廣播模式兩種。java

集羣模式下,主題下的同一消息只容許被消費組內的一個消費者消費,消費進度存儲在 broker 端。廣播模式下,則每一個消費者均可以消費該消息,消費進度存儲在消費者端。express

消息服務器與消費者的消息傳輸有 2 種方式:推模式、拉模式。拉模式,即消費者主動向消息服務器發送請求;推模式,即消息服務器向消費者推送消息。推模式,是基於拉模式實現的。服務器

集羣模式下,一個消費隊列同一時間,只容許被一個消費者消費,1個消費者,能夠消費多個消息隊列。併發

RocketMQ 只支持局部順序消息消費,即保證同一個消息隊列上的消息順序消費。若是想保證一個 Topic 下的順序消費,那麼只能將該主題的消息隊列設置爲 1。app

認識消息消費者

DefaultMQPushConsumerdom

// 消費者組
private String consumerGroup;
// 消費模式,默認集羣
private MessageModel messageModel = MessageModel.CLUSTERING;
// 根據消息進度從消息服務器拉取不到消息時從新計算消費策略
// CONSUME_FROM_LAST_OFFSET:從隊列當前最大偏移量開始消費
// CONSUME_FROM_FIRST_OFFSET 從最先可用的消息開始消費
// CONSUME_FROM_TIMESTAMP 從指定的時間戳開始消費
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
// 集羣模式下消息隊列負載策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;
// 訂閱消息
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();
// 消息業務監聽器
private MessageListener messageListener;
// 消息消費進度存儲器
private OffsetStore offsetStore;
// 消費者最小線程數
private int consumeThreadMin = 20;
// 消費者最大線程數
private int consumeThreadMax = 20;
// 併發消息消費時處理隊列最大跨度。表示若是消息處理隊列中偏移量最大的消息與偏移量最小的消息的跨度超過2000則延遲50毫秒再拉取消息
private int consumeConcurrentlyMaxSpan = 2000;
// 每 1000 次流控後打印流控日誌
private int pullThresholdForQueue = 1000;
// 推模式下拉取任務時間間隔,默認一次拉取任務完成繼續拉取
private long pullInterval = 0;
// 消息併發消費時一次消費消息條數
private int consumeMessageBatchMaxSize = 1;
// 每次消息拉取條數
private int pullBatchSize = 32;
// 是否每次拉取消息都更新訂閱信息
private boolean postSubscriptionWhenPull = false;
// 最大消費重試次數,消息消費次數超過 maxReconsumeTimes 還未成功,則將該消息轉移到一個失敗隊列,等待刪除
private int maxReconsumeTimes = -1;
// 消費超時時間,單位 分鐘。
private long consumeTimeout = 15;

消費者啓動流程

代碼位置:DefaultMQPushConsumerImpl#start異步

  1. 構建 SubscriptionData 加入到 RebalanceImpl 訂閱消息中。

DefaultMQPushConsumerImpl#copySubscriptionide

private void copySubscription() throws MQClientException {
  try {
    Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
    if (sub != null) {
      for (final Map.Entry<String, String> entry : sub.entrySet()) {
        final String topic = entry.getKey();
        final String subString = entry.getValue();
        // 構建 SubscriptionData
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), topic, subString);
        // 加入 RebalanceImpl 訂閱消息中
        this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
      }
    }
        // ...
    switch (this.defaultMQPushConsumer.getMessageModel()) {
       // ...
      case CLUSTERING:
        // 獲取重試的主題,格式: %RETRY% + 消費組名
        final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
        // 構建重試主題的訂閱消息
        SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), retryTopic, SubscriptionData.SUB_ALL);
        // 加入 RebalanceImpl 訂閱消息中
        this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
        break;
       // ...
    }
  } catch (Exception e) {
    throw new MQClientException("subscription exception", e);
  }
}
  1. 初始化 MQClientInstance、RebalanceImpl 等。
  2. 初始化消息消費進度,集羣模式,進度放在 broker,廣播模式,本地
if (this.defaultMQPushConsumer.getOffsetStore() != null) {
  this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
} else {
  switch (this.defaultMQPushConsumer.getMessageModel()) {
    case BROADCASTING:
      // 從本地獲取消費進度
      this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
      break;
    case CLUSTERING:
      // 從 broker 獲取消費進度
      this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
      break;
    default:
      break;
  }
  this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
}
this.offsetStore.load();
  1. 根據是不是順序消費,建立不一樣的 ConsumeMessageService。該類主要負責消息的消費
if (this.getMessageListenerInner() instanceof MessageListenerOrderly) {
  this.consumeOrderly = true;
  this.consumeMessageService = new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
} else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) {
  this.consumeOrderly = false;
  this.consumeMessageService = new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
}
this.consumeMessageService.start();
  1. 向 MQClientInstance 註冊消費者,並啓動 MQClientInstance。在一個 JVM 中的全部消費者、生產者持有同一個 MQClientInstance,MQClientInstance 只會啓動一次。
boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
// ...
mQClientFactory.start();

消息拉取

PullMessageService 消息拉取

RocketMQ 經過 PullMessageService 拉取消息。 函數

PullMessageService#runpost

public void run() {
  // stopped 是 volidate 修飾的變量,用於線程間通訊。
  while (!this.isStopped()) {
  // .. 
      // 阻塞隊列, 若是 pullRequestQueue 沒有元素,則阻塞
      PullRequest pullRequest = this.pullRequestQueue.take();
      // 消息拉取 
      this.pullMessage(pullRequest);
   // ...
  }
}
認識 PullRequest
// 消費者組
private String consumerGroup;
// 消息隊列
private MessageQueue messageQueue;
// 消息處理隊列,從 Broker 拉取到的消息先存入 ProcessQueue,而後再提交到消費者消費池消費
private ProcessQueue processQueue;
// 待拉取的 MessageQueue 偏移量
private long nextOffset;
// 是否被鎖定
private boolean lockedFirst = false;
PullMessageService 添加 PullRequest 的方式
  • 延時添加
public void executePullRequestLater(final PullRequest pullRequest, final long timeDelay) {
  if (!isStopped()) {
    this.scheduledExecutorService.schedule(new Runnable() {
      @Override
      public void run() {
        PullMessageService.this.executePullRequestImmediately(pullRequest);
      }
    }, timeDelay, TimeUnit.MILLISECONDS);
  } else {
    log.warn("PullMessageServiceScheduledThread has shutdown");
  }
}
  • 當即添加
public void executePullRequestImmediately(final PullRequest pullRequest) {
  try {
    this.pullRequestQueue.put(pullRequest);
  } catch (InterruptedException e) {
    log.error("executePullRequestImmediately pullRequestQueue.put", e);
  }
}

認識 ProcessQueue

ProcessQueue 是 MessageQueue 在消費端的重現、快照。PullMessageService 從消息服務器默認每次拉取 32 條消息,按消息的隊列偏移量順序存放在 ProcessQueue 中,PullMessageService 再將消息提交到消費者消費線程池。消息消費成功後,從 ProcessQueue 中移除。

// 讀寫鎖
private final ReadWriteLock lockTreeMap = new ReentrantReadWriteLock();
// 消息存儲容器, k:消息偏移量,v:消息實體
private final TreeMap<Long, MessageExt> msgTreeMap = new TreeMap<Long, MessageExt>();
// ProcessQueue 中消息總數
private final AtomicLong msgCount = new AtomicLong();
// ProcessQueue 中消息總大小
private final AtomicLong msgSize = new AtomicLong();
// 當前 ProcessQueue 中包含的最大隊列偏移量
private volatile long queueOffsetMax = 0L;
// 當前 ProcessQueue 是否被丟棄
private volatile boolean dropped = false;
// 上一次開始消息拉取時間戳
private volatile long lastPullTimestamp = System.currentTimeMillis();
// 上一次消息消費時間戳
private volatile long lastConsumeTimestamp = System.currentTimeMillis();

消息拉取流程

代碼位置:DefaultMQPushConsumerImpl#pullMessage

消息客戶端拉取消息
  1. 獲取 ProcessQueue,並對 ProcessQueue 作校驗
final ProcessQueue processQueue = pullRequest.getProcessQueue();
// 被刪除
if (processQueue.isDropped()) {
  return;
}
// 設置拉取時間
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());

try {
  this.makeSureStateOK();
} catch (MQClientException e) {
  // 狀態非法,延遲 3 秒拉取消息
  this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  return;
}
// 消費者被掛起 
if (this.isPause()) {
  // 延遲 1 秒拉取消息
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
  return;
}
  1. 對消息拉取進行流量控制

流控控制有如下幾個維度:

  • processQueue 的消息數量 大於 1000, processQueue 的消息大小 大於 100 MB,將延遲 50 毫秒後拉取消息
  • processQueue 中偏移量最大的消息與偏移量最小的消息的跨度超過 2000 則延遲 50 毫秒再拉取消息
long cachedMessageCount = processQueue.getMsgCount().get();
long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);

// processQueue 的消息數量 大於 1000,觸發流控
if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) {
  // 50 毫秒再拉取消息
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  if ((queueFlowControlTimes++ % 1000) == 0) {
        // ... 流控輸出語句
  }
  return;
}

// processQueue 的消息大小 大於 100 MB,觸發流控
if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) {
  // 50 毫秒再拉取消息
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
  if ((queueFlowControlTimes++ % 1000) == 0) {
     // ... 流控輸出語句
  }
  return;
}

if (!this.consumeOrderly) {
  // 順序消費
  // processQueue 中偏移量最大的消息與偏移量最小的消息的跨度超過2000則延遲50毫秒再拉取消息
  if (processQueue.getMaxSpan() > this.defaultMQPushConsumer.getConsumeConcurrentlyMaxSpan()) {
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
    if ((queueMaxSpanFlowControlTimes++ % 1000) == 0) {
         // ... 流控輸出語句
    }
    return;
  }
} else {
  if (processQueue.isLocked()) {
    if (!pullRequest.isLockedFirst()) {
      final long offset = this.rebalanceImpl.computePullFromWhere(pullRequest.getMessageQueue());
      boolean brokerBusy = offset < pullRequest.getNextOffset();
      // ..
      if (brokerBusy) {
        // ... 
      }

      pullRequest.setLockedFirst(true);
      pullRequest.setNextOffset(offset);
    }
  } else {
    // 確保 processQueue 在消費以前必須被鎖定。
    this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
    log.info("pull message later because not locked in broker, {}", pullRequest);
    return;
  }
}
  1. 根據主題拉取訂閱的消息,若是爲空,延遲 3 秒,再拉取
final SubscriptionData subscriptionData = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (null == subscriptionData) {
  this.executePullRequestLater(pullRequest, pullTimeDelayMillsWhenException);
  log.warn("find the consumer's subscription failed, {}", pullRequest);
  return;
}
  1. 構建消息拉取系統標記

拉消息系統標記:
image.png

  • FLAG_COMMIT_OFFSET:表示從內存中讀取的消費進度大於0,則設置該標記位
  • FLAG_SUSPEND:表示消息拉取時支持掛起
  • FLAG_SUBSCRIPTION:消息過濾機制爲表達式,設置該標記位
  • FLAG_CLASS_FILTER:消息過濾機制爲類過濾模式
  • FLAG_LITE_PULL_MESSAGE:精簡拉取消息
// 從內存中讀取消費進度,若是大於0,則設置 commitOffsetEnable = true
boolean commitOffsetEnable = false;
long commitOffsetValue = 0L;
if (MessageModel.CLUSTERING == this.defaultMQPushConsumer.getMessageModel()) {
  commitOffsetValue = this.offsetStore.readOffset(pullRequest.getMessageQueue(), ReadOffsetType.READ_FROM_MEMORY);
  if (commitOffsetValue > 0) {
    commitOffsetEnable = true;
  }
}

String subExpression = null;
boolean classFilter = false;
SubscriptionData sd = this.rebalanceImpl.getSubscriptionInner().get(pullRequest.getMessageQueue().getTopic());
if (sd != null) {
  if (this.defaultMQPushConsumer.isPostSubscriptionWhenPull() && !sd.isClassFilterMode()) {
    subExpression = sd.getSubString();
  }

  classFilter = sd.isClassFilterMode();
}

int sysFlag = PullSysFlag.buildSysFlag(
  commitOffsetEnable, // commitOffset
  true, // suspend
  subExpression != null, // subscription
  classFilter // class filter
);
  1. 向消息服務端拉取消息
this.pullAPIWrapper.pullKernelImpl(
  pullRequest.getMessageQueue(), // 從哪一個隊列拉取消息 
  subExpression, // 消息過濾表達式
  subscriptionData.getExpressionType(), // 消息表達式類型 TAG、SQL92
  subscriptionData.getSubVersion(), 
  pullRequest.getNextOffset(), // 本次拉取消息偏移量
  this.defaultMQPushConsumer.getPullBatchSize(), // 本次拉取最大消息條數,默認 32 
  sysFlag, // 拉消息 系統標記
  commitOffsetValue, // 當前 MessageQueue 消費進度
  BROKER_SUSPEND_MAX_TIME_MILLIS, // 消息拉取過程當中 容許 broker 掛起時間,默認 15 s
  CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND, // 消息拉取超時時間,默認 30 s
  CommunicationMode.ASYNC, // 消息拉取模式,默認爲異步拉取
  pullCallback // 從 broker 拉取到消息後的回調方法
);

最終調用 MQClientAPIImpl#pullMessageAsync 拉取消息

消息服務端 broker 組裝消息

代碼位置:PullMessageProcessor#processRequest

  1. 根據訂閱消息,構建消息過濾器
  2. 調用 MessageStore.getMessage 查找消息
  3. 根據主題名與隊列編號獲取消息消費隊列
  4. 消息偏移量異常狀況校對下一次拉取偏移量
  5. 根據 PullRequest 填充 responseHeader 的 nextBeginOffset、minOffset、maxOffset
  6. 根據主從同步延遲,若是從節點數據包含下一次拉取的偏移量,設置下一次拉取任務的 brokerId
  7. 若是 commitlog 標記可用而且當前節點爲主節點,則更新消息消費進度
消息拉取客戶端處理消息

代碼入口: MQClientAPIImpl#pullMessageAsync

處理請求回調,獲取 PullResult。並經過 PullCallback 回調至 DefaultMQPushConsumerImpl#pullMessage

  1. 處理拉取到的消息,將結果封裝爲 PullResult,並經過 PullCallback 將 PullResult 回調
private PullResult processPullResponse(
  final RemotingCommand response) throws MQBrokerException, RemotingCommandException {
  PullStatus pullStatus = PullStatus.NO_NEW_MSG;
  switch (response.getCode()) {
    case ResponseCode.SUCCESS:
      pullStatus = PullStatus.FOUND;
      break;
    case ResponseCode.PULL_NOT_FOUND:
      pullStatus = PullStatus.NO_NEW_MSG;
      break;
    case ResponseCode.PULL_RETRY_IMMEDIATELY:
      pullStatus = PullStatus.NO_MATCHED_MSG;
      break;
    case ResponseCode.PULL_OFFSET_MOVED:
      pullStatus = PullStatus.OFFSET_ILLEGAL;
      break;

    default:
      throw new MQBrokerException(response.getCode(), response.getRemark());
  }
  
  PullMessageResponseHeader responseHeader =
    (PullMessageResponseHeader) response.decodeCommandCustomHeader(PullMessageResponseHeader.class);

  return new PullResultExt(pullStatus, responseHeader.getNextBeginOffset(), responseHeader.getMinOffset(), responseHeader.getMaxOffset(), null, responseHeader.getSuggestWhichBrokerId(), response.getBody());
}
  1. 處理回調的 PullResult ,解碼消息,並過濾消息

代碼入口:PullAPIWrapper#processPullResult

  1. 假設消息被找到,處理消息被找到的邏輯

代碼入口:DefaultMQPushConsumerImpl$PullCallback#onSuccess

// 設置下一次拉取的偏移量
long prevRequestOffset = pullRequest.getNextOffset();
pullRequest.setNextOffset(pullResult.getNextBeginOffset());
long pullRT = System.currentTimeMillis() - beginTimestamp;
DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullRT(pullRequest.getConsumerGroup(), pullRequest.getMessageQueue().getTopic(), pullRT);

long firstMsgOffset = Long.MAX_VALUE;
if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {
  // 消息爲null,當即拉取新的消息
  DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
} else {
  firstMsgOffset = pullResult.getMsgFoundList().get(0).getQueueOffset();
 DefaultMQPushConsumerImpl.this.getConsumerStatsManager().incPullTPS(pullRequest.getConsumerGroup(),pullRequest.getMessageQueue().getTopic(), pullResult.getMsgFoundList().size());
    
  // 將消息放入 ProcessQueue,並將消息交由 ConsumeMessageService 消費。
  boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());
  DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(
    pullResult.getMsgFoundList(),
    processQueue,
    pullRequest.getMessageQueue(),
    dispatchToConsume);
 
// 根據 pullInterval 參數,等待 pullInterval 毫秒將 PullRequest 放入 pullRequestQueue 中。
// 推模式下, pullInterval 默認爲 0
  if (DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval() > 0) {
    DefaultMQPushConsumerImpl.this.executePullRequestLater(pullRequest, DefaultMQPushConsumerImpl.this.defaultMQPushConsumer.getPullInterval());
  } else {
    DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);
  }
}
消息拉取流程圖

image.png

消息拉取長輪詢機制

RocketMQ 推模式是循環向消息服務端發送消息拉取請求。消費者向 broker 拉取消息時,若是消息未到達消費隊列,而且未啓用 長輪詢機制,則會在服務端等待 shortPollingTimeMills(默認1秒) 時間後再去判斷消息是否已經到達消息隊列,若是消息未到達,則提示消息拉取客戶端 PULL_NOT_FOUND。若是開啓長輪詢模式,rocketMQ 會每 5s 輪詢檢查一次消息是否可達,同時一有新消息到達後立馬通知掛起線程再次驗證新消息是不是本身感興趣的消息,若是是則從 commitlog 文件提取消息返回給消息拉取客戶端,不然直到掛起超時,超時時間由消息拉取方在消息拉取時封裝在請求參數中,PUSH 模式默認 15s。 PULL 模式經過 DefaultMQPullConsumer#setBrokerSuspendMaxTimeMillis 設置。RocketMQ 經過在 Broker 端配置 longPollingEnable 爲 true 來開啓長輪詢模式。

RocketMQ 的長輪詢機制由 2 個線程共同完成。PullRequestHoldService、ReputMessageService。

  • PullRequestHoldService 添加消息拉取任務
    代碼入口:PullRequestHoldService#suspendPullRequest

    1. 經過 topic + '&' + queueId 構建 key
    2. 從 ConcurrentMap<String/ topic@queueId /, ManyPullRequest> pullRequestTable 中獲取對應的 ManyPullRequest。ManyPullRequest 中存儲了隊列堆積的消息拉取任務
    3. 將新的消息拉取任務,放入 ManyPullRequest 中。
  • PullRequestHoldService 執行消息拉取任務
    代碼入口:PullRequestHoldService#run

    若是 broker 支持 長輪詢,則每 5s 嘗試一次,若是未開啓,則每 1s 嘗試一次

  • PullRequestHoldService 檢查是否可拉取消息
    代碼入口:PullRequestHoldService#notifyMessageArriving

    1. 遍歷全部拉取任務,獲取該主題下隊列的最大偏移量,若是大於待拉取偏移量,說明有新的消息到達,調用 notifyMessageArriving 觸發消息拉取
    2. 若是隊列的最大偏移量大於待拉取偏移量,且消息匹配,則調用 executeRequestWhenWakeup 將消息返回給消息拉取客戶端,不然等待下一次拉取
    3. 若是掛起超時,則不繼續等待將直接返回客戶端消息未找到。
      (不繼續等待客戶端,直接將消息返回的代碼入口:PullMessageProcessor#executeRequestWhenWakeup)
  • ReputMessageService 線程每 1毫秒,調用 PullRequestHoldService#notifyMessageArriving。長輪詢模式,消息準實時。

消息拉取長輪詢是指:rocketMQ 每 5s 輪詢檢查一次消息是否可達。有消息後,立馬通知掛起的線程處理消息。

須要經過在 broker 配置 longPollingEnable=true 開啓長輪詢模式。

消息隊列負載與從新分佈機制

RocketMQ 消息隊列從新分佈由 RebalanceService 線程來實現的。RebalanceService 隨着 MQClientInstance 的啓動而啓動。RebalanceService 默認每 20 秒,執行一次 MQClientInstance#doRebalance

集羣模式下,主題的消息隊列負載

代碼入口:RebalanceImpl#rebalanceByTopic

  1. 獲取主題的隊列,向 broker 發送請求,獲取主題下,消費組全部消費者客戶端ID。
  2. 只有當 2 者均不爲空時,纔有必要進行 rebalance。
  3. 在 rebalance 時,須要對 隊列,還有消費者客戶端 ID 進行排序,以確保同一個消費組下的視圖是一致的。
  4. 根據 分配策略 AllocateMessageQueueStrategy 爲 消費者分配隊列。
// 獲取主題的隊列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
// 向 broker 發送請求,獲取該主題下,該消費組的全部 消費者客戶端ID
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
if (null == mqSet) {
  if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
    log.warn("doRebalance, {}, but the topic[{}] not exist.", consumerGroup, topic);
  }
}
if (null == cidAll) {
  log.warn("doRebalance, {} {}, get consumer id list failed", consumerGroup, topic);
}


if (mqSet != null && cidAll != null) {
  List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
  mqAll.addAll(mqSet);
    
  // 排序,同一消費組內,視圖一致,確保同一個消費者隊列不會被多個消費者分配
  Collections.sort(mqAll);
  Collections.sort(cidAll);

  // 根據分配策略爲消費者分配隊列。
  AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;
  List<MessageQueue> allocateResult = null;
  try {
    allocateResult = strategy.allocate(
      this.consumerGroup,
      this.mQClientFactory.getClientId(),
      mqAll,
      cidAll);
  } catch (Throwable e) {
    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
              e);
    return;
  }

  Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
  if (allocateResult != null) {
    allocateResultSet.addAll(allocateResult);
  }
    
  // 更新消費者消息隊列信息
  boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
  if (changed) {
    this.messageQueueChanged(topic, mqSet, allocateResultSet);
  }
}
  1. 更新消費者消息隊列信息

代碼入口:RebalanceImpl#updateProcessQueueTableInRebalance

若是 processQueue 中的 MessageQueue 不在剛分配的 MessageQueue 中, 那麼表示,該 MessageQueue 已分配給別的消費者,那麼須要刪除此 PullRequest

private boolean updateProcessQueueTableInRebalance(final String topic, final Set<MessageQueue> mqSet, final boolean isOrder) {                                                                                                                      
  boolean changed = false;                                                                                                                      
    
  Iterator<Entry<MessageQueue, ProcessQueue>> it = this.processQueueTable.entrySet().iterator();                                                
  while (it.hasNext()) {                                                                                                                        
    Entry<MessageQueue, ProcessQueue> next = it.next();                                                                                       
    MessageQueue mq = next.getKey();                                                                                                          
    ProcessQueue pq = next.getValue();                                                                                                        
         // 若是 processQueue 中的 MessageQueue 不在剛分配的 MessageQueue 中,則刪除 MessageQueue。
    if (mq.getTopic().equals(topic)) {                                                                                                        
      if (!mqSet.contains(mq)) {                                                                                                            
        pq.setDropped(true);                                                                                                              
        if (this.removeUnnecessaryMessageQueue(mq, pq)) {                                                                                 
          it.remove();                                                                                                                  
          changed = true;                                                                                                                    
        }                                                                                                                                 
      } else if (pq.isPullExpired()) {     
        switch (this.consumeType()) {                                                                                                     
          case CONSUME_ACTIVELY:                                                                                                        
            break;                                                                                                                    
          case CONSUME_PASSIVELY:                                                                                                       
            pq.setDropped(true);                                                                                                      
            if (this.removeUnnecessaryMessageQueue(mq, pq)) {                                                                         
              it.remove();                                                                                                          
              changed = true;                                                                                                       
            }                                                                                                                         
            break;                                                                                                                    
          default:                                                                                                                      
            break;                                                                                                                    
        }                                                                                                                                 
      }                                                                                                                                     
    }                                                                                                                                         
  }

遍歷剛分配的 MessageQueue,若是 processQueueTable 中不包含,則表示是剛添加的。

首先隊列刪除消費進度,再爲之建立 ProcessQueue,計算下次拉取的偏移量。建立對應的 PullRequest,並加入到 pullRequestList 中, 喚醒 PullMessageService。

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 遍歷剛分配的 MessageQueue,若是 processQueue 中不包含,則表示是剛添加的。
for (MessageQueue mq : mqSet) {
  if (!this.processQueueTable.containsKey(mq)) {
    if (isOrder && !this.lock(mq)) {
      log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
      continue;
    }
      // 移除消費進度
    this.removeDirtyOffset(mq);
    // 建立新的 ProcessQueue
    ProcessQueue pq = new ProcessQueue();
    // 計算下次從哪裏拉取消息
    long nextOffset = this.computePullFromWhere(mq);
    if (nextOffset >= 0) {
      ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
      if (pre != null) {
        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
      } else {
        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
        PullRequest pullRequest = new PullRequest();
        pullRequest.setConsumerGroup(consumerGroup);
        pullRequest.setNextOffset(nextOffset);
        pullRequest.setMessageQueue(mq);
        pullRequest.setProcessQueue(pq);
        pullRequestList.add(pullRequest);
        changed = true;
      }
    } else {
      log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    }
  }
}

在計算下次拉取偏移量時, RocketMQ 提供了 3 種方式。可在消費者啓動時,調用DefaultMQPushConsumer#setConsumeFromWhere 設置

  • ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:從隊列當前最大偏移量開始消費
  • ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET 從最先可用的消息開始消費
  • ConsumeFromWhere.CONSUME_FROM_TIMESTAMP 從指定的時間戳開始消費

注意: ConsumeFromWhere 消費進度校驗只有在從磁盤中獲取的消費進度返回 -1 時才失效。即剛建立的 消費組,若是 broker 中已經有記錄該消費組的消費進度,那麼該值的設置是無效的

PullMessageService 與 RebalanceService 線程交互圖

image.png

消息消費過程

消息消費

總代碼入口:ConsumeMessageConcurrentlyService#submitConsumeRequest

  1. 若是消息數量大於 32 則分頁處理

代碼位置:ConsumeMessageConcurrentlyService#submitConsumeRequest

  1. 每次進行消費時,都會判斷 processQueue 是否被刪除,阻止消費者 消費 不屬於本身的 隊列
  2. 恢復重試消息主題名, rocketMQ 消息重試機制,決定了,若是發現消息的延時級別 delayTimeLevel 大於 0,會首先將重試主題存入消息的屬性中,而後設置主題名稱爲 SCHEDULE_TOPIC ,以便時間到後從新參與消息消費。
  3. 在消費以前,執行 hock
  4. 執行,咱們編寫的消費代碼
  5. 在消費以後,執行 hock
  6. 消費完畢後,再次驗證 processQueue 是否被刪除,若是被刪除,不處理結果

ConsumeMessageConcurrentlyService$ConsumeRequest#run

public void run() {
  // 阻止消費者 消費 不屬於本身的 隊列
  if (this.processQueue.isDropped()) {
    log.info("the message queue not be able to consume, because it's dropped. group={} {}", ConsumeMessageConcurrentlyService.this.consumerGroup, this.messageQueue);
    return;
  }

  MessageListenerConcurrently listener = ConsumeMessageConcurrentlyService.this.messageListener;
  ConsumeConcurrentlyContext context = new ConsumeConcurrentlyContext(messageQueue);
  ConsumeConcurrentlyStatus status = null;
  // 恢復重試消息主題名, rocketMQ 消息重試機制,決定了,若是發現消息的延時級別 delayTimeLevel 大於 0,
  // 會首先將重試主題存入消息的屬性中,而後設置主題名稱爲 SCHEDULE_TOPIC ,以便時間到後從新參與消息消費。
  defaultMQPushConsumerImpl.resetRetryAndNamespace(msgs, defaultMQPushConsumer.getConsumerGroup());

  // 消息消費前 hock
  // ...

  long beginTimestamp = System.currentTimeMillis();
  boolean hasException = false;
  ConsumeReturnType returnType = ConsumeReturnType.SUCCESS;
  try {
    if (msgs != null && !msgs.isEmpty()) {
      for (MessageExt msg : msgs) {
        MessageAccessor.setConsumeStartTimeStamp(msg, String.valueOf(System.currentTimeMillis()));
      }
    }
    // 消費消息,這裏是調用咱們的業務代碼。
    status = listener.consumeMessage(Collections.unmodifiableList(msgs), context);
  } catch (Throwable e) {
    // 記錄異常
    hasException = true;
  }
  // ... 

  // 執行消費後 hock
  // ...

  // 再次對 processQueue dropped 進行校驗,若是爲 true,那麼不對結果進行處理。由於消息會被別的消費者從新消費
  if (!processQueue.isDropped()) {
    ConsumeMessageConcurrentlyService.this.processConsumeResult(status, context, this);
  } else {
    // 記錄異常
  }
}
  1. 對消費者返回的結果,進行處理
  2. 若是消費成功,那麼 ack = consumeRequest.getMsgs().size() - 1。會直接更新消費進度。若是消費失敗,那麼 ack = -1,從新發送消息。若是在從新發送消息時,又失敗了,那麼會延遲 5 秒在繼續消費。
  3. 無論是消費成功,仍是失敗,都會更新消費進度

ConsumeMessageConcurrentlyService$processConsumeResult

// 默認 Integer.MAX_VALUE
int ackIndex = context.getAckIndex();

if (consumeRequest.getMsgs().isEmpty())
  return;

switch (status) {
   // 消費成功, ackIndex 被設置爲 consumeRequest.getMsgs().size() - 1
  case CONSUME_SUCCESS:
    if (ackIndex >= consumeRequest.getMsgs().size()) {
      ackIndex = consumeRequest.getMsgs().size() - 1;
    }
    int ok = ackIndex + 1;
    int failed = consumeRequest.getMsgs().size() - ok;
    this.getConsumerStatsManager().incConsumeOKTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), ok);
    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(), failed);
    break;
   // 消費失敗, ackIndex 被設置爲 -1. 
  case RECONSUME_LATER:
    ackIndex = -1;
    this.getConsumerStatsManager().incConsumeFailedTPS(consumerGroup, consumeRequest.getMessageQueue().getTopic(),consumeRequest.getMsgs().size());
    break;
  default:
    break;
}

switch (this.defaultMQPushConsumer.getMessageModel()) {
  case BROADCASTING:
    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
      MessageExt msg = consumeRequest.getMsgs().get(i);
      log.warn("BROADCASTING, the message consume failed, drop it, {}", msg.toString());
    }
    break;
    // 集羣模式
  case CLUSTERING:
    List<MessageExt> msgBackFailed = new ArrayList<MessageExt>(consumeRequest.getMsgs().size());
    // for 循環內的代碼,只有在消費失敗的時候,纔會執行到
    // 消費失敗, ackIndex = -1。所以該批次消息都須要發 ack
    // 消息消費成功, ackIndex = consumeRequest.getMsgs().size() - 1。所以不會執行 for 循環。
    for (int i = ackIndex + 1; i < consumeRequest.getMsgs().size(); i++) {
      MessageExt msg = consumeRequest.getMsgs().get(i);
      // 調用 producer 從新發送消息
      boolean result = this.sendMessageBack(msg, context);
      // 消息發送 ACK 失敗,
      if (!result) {
        msg.setReconsumeTimes(msg.getReconsumeTimes() + 1);
        msgBackFailed.add(msg);
      }
    }
    // 若是存在發送 ACK 失敗的消息, 延遲 5 秒後,從新消費。
    if (!msgBackFailed.isEmpty()) {
      consumeRequest.getMsgs().removeAll(msgBackFailed);
      this.submitConsumeRequestLater(msgBackFailed, consumeRequest.getProcessQueue(), consumeRequest.getMessageQueue());
    }
    break;
  default:
    break;
}
// 移除這批消息,返回 移除該批消息後最小的偏移量
long offset = consumeRequest.getProcessQueue().removeMessage(consumeRequest.getMsgs());
if (offset >= 0 && !consumeRequest.getProcessQueue().isDropped()) {
  // 無論是消費成功,仍是消費失敗,都會更新消費進度
 this.defaultMQPushConsumerImpl.getOffsetStore().updateOffset(consumeRequest.getMessageQueue(), offset, true);
}

消息確認

總代碼入口:SendMessageProcessor#consumerSendMsgBack

客戶端在發送重試消息時,封裝了 ConsumerSendMsgBackRequestHeader。先看看這個類的屬性

// 消息物理偏移量
private Long offset;
// 消費組
private String group;
// 延遲等級
private Integer delayLevel;
// 消息ID
private String originMsgId;
// 消息主題
private String originTopic;
// 最大從新消費次數,默認 16 次   SubscriptionGroupConfig.retryMaxTimes 中定義
private Integer maxReconsumeTimes;

當客戶端調用 MQClientAPIImpl#consumerSendMessageBack ,發送消息時,服務由 SendMessageProcessor#consumerSendMsgBack 接收這次請求。

代碼入口:SendMessageProcessor#consumerSendMsgBack

  1. 先獲取消費組訂閱配置信息,不存在則直接返回
  2. 建立主題:%RETRY% + group,並隨機選擇一個隊列
  3. 用原來的消息,建立一個新的消息
  4. 若是重試消息的最大重試次數超過 16 次(默認),則將消息放入 %DLQ% 隊列。等待人工處理
  5. 由 Commitlog.putMessage 存入消息。

注:若是以上某個環節出現錯誤,會致使消息客戶端從新封裝新的 ConsumeRequest,並延遲 5s 執行。

private RemotingCommand consumerSendMsgBack(final ChannelHandlerContext ctx, final RemotingCommand request)
        throws RemotingCommandException {
  // ...

  // 鉤子函數
  // ... 
    
  // 先獲取消費組訂閱配置信息
  SubscriptionGroupConfig subscriptionGroupConfig =
 this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(requestHeader.getGroup());
  // 不存在,直接返回
  if (null == subscriptionGroupConfig) {
    response.setCode(ResponseCode.SUBSCRIPTION_GROUP_NOT_EXIST);
    response.setRemark("subscription group not exist, " + requestHeader.getGroup() + " "
                       + FAQUrl.suggestTodo(FAQUrl.SUBSCRIPTION_GROUP_NOT_EXIST));
    return response;
  }

  
  // 隊列權限判斷
  // ...
 
  // 建立新的主題,並隨機選擇一個隊列
  // %RETRY% + group
  String newTopic = MixAll.getRetryTopic(requestHeader.getGroup());
  // 隨機選擇一個隊列
  int queueIdInt = Math.abs(this.random.nextInt() % 99999999) % subscriptionGroupConfig.getRetryQueueNums();

  // ...
  // ...

  // 根據 偏移量 獲取消息
  MessageExt msgExt = this.brokerController.getMessageStore().lookMessageByOffset(requestHeader.getOffset());
  if (null == msgExt) {
    response.setCode(ResponseCode.SYSTEM_ERROR);
    response.setRemark("look message by offset failed, " + requestHeader.getOffset());
    return response;
  }

  // ...

  int delayLevel = requestHeader.getDelayLevel();
  // 消息超過最大重試次數,默認最大重試 16 次
  int maxReconsumeTimes = subscriptionGroupConfig.getRetryMaxTimes();
  if (request.getVersion() >= MQVersion.Version.V3_4_9.ordinal()) {
    maxReconsumeTimes = requestHeader.getMaxReconsumeTimes();
  }

  // 若是消息重試次數超過 maxReconsumeTimes,再次改寫 newTopic 主題爲 %DLQ%,
  // 該主題的權限爲只寫,說明消息一旦進入到 DLQ 隊列中,RocketMQ 將不在負責再次調度
  // 消費,須要人工干預。
  if (msgExt.getReconsumeTimes() >= maxReconsumeTimes
      || delayLevel < 0) {
    newTopic = MixAll.getDLQTopic(requestHeader.getGroup());
    queueIdInt = Math.abs(this.random.nextInt() % 99999999) % DLQ_NUMS_PER_GROUP;

    topicConfig = this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(newTopic,
DLQ_NUMS_PER_GROUP, PermName.PERM_WRITE, 0);
    
    // ... 
  } else {
    if (0 == delayLevel) {
      delayLevel = 3 + msgExt.getReconsumeTimes();
    }
    msgExt.setDelayTimeLevel(delayLevel);
  }

  // 按照原消息,建立一個新的消息,主題: %RETRY% + group
  MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
  msgInner.setTopic(newTopic);
  msgInner.setBody(msgExt.getBody());
  msgInner.setFlag(msgExt.getFlag());
  MessageAccessor.setProperties(msgInner, msgExt.getProperties());
  msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgExt.getProperties()));
  msgInner.setTagsCode(MessageExtBrokerInner.tagsString2tagsCode(null, msgExt.getTags()));

  msgInner.setQueueId(queueIdInt);
  msgInner.setSysFlag(msgExt.getSysFlag());
  msgInner.setBornTimestamp(msgExt.getBornTimestamp());
  msgInner.setBornHost(msgExt.getBornHost());
  msgInner.setStoreHost(this.getStoreHost());
  msgInner.setReconsumeTimes(msgExt.getReconsumeTimes() + 1);
  String originMsgId = MessageAccessor.getOriginMessageId(msgExt);
  MessageAccessor.setOriginMessageId(msgInner, UtilAll.isBlank(originMsgId) ? msgExt.getMsgId() : originMsgId);
  
  // 交由 commitlog 存入消息
  PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
  if (putMessageResult != null) {
    switch (putMessageResult.getPutMessageStatus()) {
      case PUT_OK:
        // ... 返回正確結果
      default:
        break;
    }

    // ... 返回錯誤結果
    return response;
  }

  // ... 返回錯誤結果
  return response;
}

消息消費,消息確認 流程圖

消費者消費流程.png

定時消息

代碼入口:ScheduleMessageService#start

定時消息拉取流程

邏輯比較簡單,直接說主要的流程。

  1. 根據 隊列ID,延遲主題,找到消息隊列
  2. 遍歷消息隊列,找到消息偏移量
  3. 根據消息偏移量查找消息

(以上,是查找消息的流程)

  1. 找到消息後,從消息屬性中,獲取原消息主題,消息隊列,從新封裝一個消息(在發送延遲消息時,會將原消息的主題、隊列存入消息的屬性中。key 分別是 PROPERTY_REAL_TOPIC、PROPERTY_REAL_QUEUE_ID)
  2. 將消息放入 commitlog,並轉發到對應的消息消費隊列。

其中須要注意的一個細節是:ScheduleMessageService 每隔 10 秒鐘 持久化一次 延遲消息消費進度。

定時消息拉取流程圖

image.png

順序消息

  1. 順序消息在 消息隊列負載時,就須要向 broker 請求加鎖該隊列,加鎖成功後,纔會分配該隊列,不然不會分配。若是當次隊列負載未加鎖成功,則會在下一次繼續嘗試加鎖。

    代碼入口:RebalanceImpl#updateProcessQueueTableInRebalance 以及加鎖邏輯 RebalanceImpl#lock

  2. 在拉取消息時(由PullMessageService 負責消息拉取),若是消息處理隊列未被鎖定,則延遲 3s 後再將 PullRequest 對象放入到拉取任務中。若是是第一次拉取,則計算拉取偏移量。

    代碼入口:DefaultMQPushConsumerImpl#pullMessage

  3. 順序消費者,由 ConsumerMessageOrderly 實現。在啓動 ConsumerMessageOrderly 時, 會啓動一個線程,每隔 20秒,就鎖定當前分配的隊列。

    代碼入口:ConsumerMessageOrderly#start

  4. ConsumerMessageOrderly 消費流程

    1. ConsumerMessageOrderly 在消費時,消費線程池中,只會有一個線程在消費
    2. ConsumerMessageOrderly 的消費是以時間爲單位,一個消費組內的線程,默認最多消費 60s
    3. 每次從處理隊列中拉取 1 條(默認)消息,若是爲null,則表示沒有消息,退出本次循環
    4. 在消費消息前,執行 鉤子函數 (ConsumeMessageHook)
    5. 真正開始消費消息時,須要將消息上鎖。若是消息未被丟棄,則執行,咱們編寫的業務代碼
    6. 執行消費後 鉤子函數(ConsumeMessageHook)
    7. 若是消費成功,則提交消費進度(即從 ProcessQueue 中刪除該批消息)
    8. 若是消費失敗,若 消息重試次數大於或等於容許的最大重試次數,消息最終會被送入 DLQ 隊列。若不容許重試,那麼該批消息將會被提交(即消費成功)
    9. 消費成功後,保存消費進度

代碼入口:ConsumerMessageOrderly$ConsumeRequest#run

相關文章
相關標籤/搜索