RocketMQ爲何要保證訂閱關係的一致性?

微信公衆號「後端進階」,專一後端技術分享:Java、Golang、WEB框架、分佈式中間件、服務治理等等。java

前段時間有個朋友向我提了一個問題,他說在搭建 RocketMQ 集羣過程當中遇到了關於消費訂閱的問題,具體問題以下:算法

而後他發了報錯的日誌給我看:apache

the consumer's subscription not exist 複製代碼

我第一時間在源碼裏找到了報錯的位置:後端

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:緩存

subscriptionData = consumerGroupInfo.findSubscriptionData(requestHeader.getTopic());
if (null == subscriptionData) {
  log.warn("the consumer's subscription not exist, group: {}, topic:{}", requestHeader.getConsumerGroup(), requestHeader.getTopic());
  response.setCode(ResponseCode.SUBSCRIPTION_NOT_EXIST);
  response.setRemark("the consumer's subscription not exist" + FAQUrl.suggestTodo(FAQUrl.SAME_GROUP_DIFFERENT_TOPIC));
  return response;
}
複製代碼

此處源碼是將該 Topic 的訂閱信息找出來,然而這裏卻沒找到,因此報了消費訂閱不存在的錯誤。bash

朋友還跟我講了他的消費集羣中,每一個消費者訂閱了本身的 Topic,他的消費組中 有 c1 和 c2 消費者,c1 訂閱了 topicA,而 c2 訂閱了 topicB。微信

這時我已經知道什麼緣由了,我先說一下消費者的訂閱信息在 broker 中是以 group 來分組的,數據結構以下:數據結構

org.apache.rocketmq.broker.client.ConsumerManager:負載均衡

private final ConcurrentMap<String/* Group */, ConsumerGroupInfo> consumerTable =
  new ConcurrentHashMap<String, ConsumerGroupInfo>(1024);
複製代碼

這意味着集羣中的每一個消費者在向 broker 註冊訂閱信息的時候相互覆蓋掉對方的訂閱信息了,這也是爲何同一個消費組應該擁有徹底同樣的訂閱關係的緣由,而朋友在同一個消費組的每一個消費者訂閱關係都不同,就出現了訂閱信息相互覆蓋的問題。框架

但是朋友這時又有疑惑了,他以爲每一個消費者訂閱本身的主題,貌似沒問題啊,邏輯上也行的通,他不明白爲何 RocketMQ 不容許這樣作,因而秉承着老司機的職業素養,下面我會從源碼的角度深度分析 RocketMQ 消費訂閱註冊,消息拉取,消息隊列負載與從新分佈機制,讓你們完全弄清 RocketMQ 消費訂閱機制。

消費者訂閱信息註冊

消費者在啓動時會向全部 broker 註冊訂閱信息,並啓動心跳機制,定時更新訂閱信息,每一個消費者都有一個 MQClientInstance,消費者啓動時會啓動這個類,啓動方法中會啓動一些列定時任務,其中:

org.apache.rocketmq.client.impl.factory.MQClientInstance#startScheduledTask:

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
  @Override
  public void run() {
    try {
      MQClientInstance.this.cleanOfflineBroker();
      MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
    } catch (Exception e) {
      log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
    }
  }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
複製代碼

上面是向集羣內全部 broker 發送訂閱心跳信息的定時任務,源碼繼續跟進去,發現會給集羣中的每一個 broker 都發送本身的 HeartbeatData,HeartbeatData 便是每一個客戶端的心跳數據,它包含了以下數據:

// 客戶端ID
private String clientID;
// 生產者信息
private Set<ProducerData> producerDataSet = new HashSet<ProducerData>();
// 消費者信息
private Set<ConsumerData> consumerDataSet = new HashSet<ConsumerData>();
複製代碼

其中消費者信息包含了客戶端訂閱的主題信息。

咱們繼續看看 broker 如何處理 HeartbeatData 數據,客戶端發送 HeartbeatData 時的請求類型爲 HEART_BEAT,咱們直接找到 broker 處理 HEART_BEAT 請求類型的邏輯:

org.apache.rocketmq.broker.processor.ClientManageProcessor#heartBeat:

public RemotingCommand heartBeat(ChannelHandlerContext ctx, RemotingCommand request) {
  RemotingCommand response = RemotingCommand.createResponseCommand(null);
  // 解碼,獲取 HeartbeatData
  HeartbeatData heartbeatData = HeartbeatData.decode(request.getBody(), HeartbeatData.class);
  ClientChannelInfo clientChannelInfo = new ClientChannelInfo(
    ctx.channel(),
    heartbeatData.getClientID(),
    request.getLanguage(),
    request.getVersion()
  );

  // 循環註冊消費者訂閱信息
  for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
    // 按消費組獲取訂閱配置信息
    SubscriptionGroupConfig subscriptionGroupConfig =
      this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(
      data.getGroupName());
    boolean isNotifyConsumerIdsChangedEnable = true;
    if (null != subscriptionGroupConfig) {
      isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
      int topicSysFlag = 0;
      if (data.isUnitMode()) {
        topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
      }
      String newTopic = MixAll.getRetryTopic(data.getGroupName());
      this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
        newTopic,
        subscriptionGroupConfig.getRetryQueueNums(),
        PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    }

    // 註冊消費者訂閱信息
    boolean changed = this.brokerController.getConsumerManager().registerConsumer(
      data.getGroupName(),
      clientChannelInfo,
      data.getConsumeType(),
      data.getMessageModel(),
      data.getConsumeFromWhere(),
      data.getSubscriptionDataSet(),
      isNotifyConsumerIdsChangedEnable
    );
    // ...
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;
  }
複製代碼

在這裏咱們能夠看到,broker 收到 HEART_BEAT 請求後,將請求數據解壓獲取 HeartbeatData,根據 HeartbeatData 裏面的消費訂閱信息,循環進行註冊:

org.apache.rocketmq.broker.client.ConsumerManager#registerConsumer:

public boolean registerConsumer(final String group, final ClientChannelInfo clientChannelInfo, ConsumeType consumeType, MessageModel messageModel, ConsumeFromWhere consumeFromWhere, final Set<SubscriptionData> subList, boolean isNotifyConsumerIdsChangedEnable) {

  // 獲取消費組內的消費者信息
  ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
  // 若是消費組的消費者信息爲空,則新建一個
  if (null == consumerGroupInfo) {
    ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
    ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
    consumerGroupInfo = prev != null ? prev : tmp;
  }

  boolean r1 =
    consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
                                    consumeFromWhere);
  // 更新訂閱信息,訂閱信息是按照消費組存放的,所以這步驟就會致使同一個消費組內的各個消費者客戶端的訂閱信息相互被覆蓋
  boolean r2 = consumerGroupInfo.updateSubscription(subList);

  if (r1 || r2) {
    if (isNotifyConsumerIdsChangedEnable) {
      this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    }
  }

  this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);

  return r1 || r2;
}
複製代碼

這步驟是 broker 更新消費者訂閱信息的核心方法,若是消費組的消費者信息 ConsumerGroupInfo 爲空,則新建一個,從名字可知道,訂閱信息是按照消費組進行存放的,所以在更新訂閱信息時,訂閱信息是按照消費組存放的,這步驟就會致使同一個消費組內的各個消費者客戶端的訂閱信息相互被覆蓋

消息拉取

在 MQClientInstance 啓動時,會啓動一條線程來處理消息拉取任務:

org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

// Start pull service
this.pullMessageService.start();
複製代碼

pullMessageService 繼承了 ServiceThread,而 ServiceThread 實現了 Runnable 接口,它的 run 方法實現以下:

org.apache.rocketmq.client.impl.consumer.PullMessageService#run:

@Override
public void run() {
  while (!this.isStopped()) {
    try {
      // 從 pullRequestQueue 中獲取拉取消息請求對象
      PullRequest pullRequest = this.pullRequestQueue.take();
      // 執行消息拉取
      this.pullMessage(pullRequest);
    } catch (InterruptedException ignored) {
    } catch (Exception e) {
      log.error("Pull Message Service Run Method exception", e);
    }
  }
}
複製代碼

消費端拿到 PullRequest 對象進行拉取消息,pullRequestQueue 是一個阻塞隊列,若是 pullRequest 數據爲空,執行 take() 方法會一直阻塞,直到有新的 pullRequest 拉取任務進來,這裏是一個很關鍵的步驟,你可能會想,pullRequest 何時被建立而後放入 pullRequestQueue?pullRequest 它是在RebalanceImpl 中建立,它是 RocketMQ 消息隊列負載與從新分佈機制的實現

消息隊列負載與從新分佈

從上面消息拉取源碼分析可知,pullMessageService 啓動時因爲 pullRequestQueue 中沒有 pullRequest 對象,會一直阻塞,而在 MQClientInstance 啓動時,一樣會啓動一條線程來處理消息隊列負載與從新分佈任務:

org.apache.rocketmq.client.impl.factory.MQClientInstance#start:

// Start rebalance service
this.rebalanceService.start();
複製代碼

rebalanceService 一樣繼承了 ServiceThread,它的 run 方法以下:

@Override
public void run() {
  while (!this.isStopped()) {
    this.waitForRunning(waitInterval);
    this.mqClientFactory.doRebalance();
  }
}
複製代碼

繼續跟進去:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#doRebalance:

public void doRebalance(final boolean isOrder) {
  // 獲取消費者全部訂閱信息
  Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
  if (subTable != null) {
    for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
      final String topic = entry.getKey();
      try {
        // 消息隊列負載與從新分佈
        this.rebalanceByTopic(topic, isOrder);
      } catch (Throwable e) {
        if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
          log.warn("rebalanceByTopic Exception", e);
        }
      }
    }
  }
  this.truncateMessageQueueNotMyTopic();
}
複製代碼

這裏主要是獲取客戶端訂閱的主題,並根據主題進行消息隊列負載與從新分佈,subTable 存儲了消費者的訂閱信息,消費者進行消息訂閱時會填充到裏面,咱們接着往下:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
複製代碼

rebalanceByTopic 方法是實現 Consumer 端負載均衡的核心,咱們這裏以集羣模式的消息隊列負載與從新分佈,首先從 topicSubscribeInfoTable 中獲取訂閱主題的隊列信息,接着隨機從集羣中的一個 broker 中獲取消費組內某個 topic 的訂閱客戶端 ID 列表,這裏須要注意的是,爲何從集羣內任意一個 broker 就能夠獲取訂閱客戶端信息呢?前面的分析也說了,消費者客戶端啓動時會啓動一個線程,向全部 broker 發送心跳包。

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

// 若是 主題訂閱信息mqSet和主題訂閱客戶端不爲空,就執行消息隊列負載與從新分佈
if (mqSet != null && cidAll != null) {
  List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
  mqAll.addAll(mqSet);

  // 排序,確保每一個消息隊列只分配一個消費者
  Collections.sort(mqAll);
  Collections.sort(cidAll);

  // 消息隊列分配算法
  AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

  // 執行算法,並獲得隊列從新分配後的結果對象allocateResult
  List<MessageQueue> allocateResult = null;
  try {
    allocateResult = strategy.allocate(
      this.consumerGroup,
      this.mQClientFactory.getClientId(),
      mqAll,
      cidAll);
  } catch (Throwable e) {
    log.error("AllocateMessageQueueStrategy.allocate Exception. allocateMessageQueueStrategyName={}", strategy.getName(),
              e);
    return;
  }
  // ...
}
複製代碼

以上是消息負載均衡的核心邏輯,RocketMQ 自己提供了 5 種負載算法,默認使用 AllocateMessageQueueAveragely 平均分配算法,它分配算法特色以下:

假設有消費組 g1,有消費者 c1 和 c2,c1 訂閱了 topicA,c2 訂閱了 topicB,集羣內有 broker1 和broker2,假設 topicA 有 8 個消息隊列,broker_a(q0/q1/q2/q3) 和 broker_b(q0/q1/q2/q3),前面咱們知道 findConsumerIdList 方法會獲取消費組內全部消費者客戶端 ID,topicA 通過平均分配算法進行分配以後的消費狀況以下:

c1:broker_a(q0/q1/q2/q3)

c2:broker_b(q0/q1/q2/q3)

問題就出如今這裏,c2 根本沒有訂閱 topicA,但根據分配算法,卻要加上 c2 進行分配,這樣就會致使這種狀況有一半的消息被分配到 c2 進行消費,被分配到 c2 的消息隊列會延遲十幾秒甚至更久纔會被消費,topicB 同理

下面我用圖表示 topicA 和 topicB 通過 rebalance 以後的消費狀況:

至於爲何會報 the consumer's subscription not exist,咱們繼續往下擼:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#rebalanceByTopic:

if (mqSet != null && cidAll != null) {
  // ...
  Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
  if (allocateResult != null) {
    allocateResultSet.addAll(allocateResult);
  }
  // 用戶從新分配後的結果allocateResult來更新當前消費者負載的消息隊列緩存表processQueueTable,並生成 pullRequestList 放入 pullRequestQueue 阻塞隊列中
  boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet, isOrder);
  if (changed) {
    log.info(
      "rebalanced result changed. allocateMessageQueueStrategyName={}, group={}, topic={}, clientId={}, mqAllSize={}, cidAllSize={}, rebalanceResultSize={}, rebalanceResultSet={}",
      strategy.getName(), consumerGroup, topic, this.mQClientFactory.getClientId(), mqSet.size(), cidAll.size(),
      allocateResultSet.size(), allocateResultSet);
    this.messageQueueChanged(topic, mqSet, allocateResultSet);
  }
}
複製代碼

以上代碼邏輯主要是拿 mqSet 和 cidAll 進行消息隊列負載與從新分佈,獲得結果 allocateResult,它是一個 MessageQueue 列表,接着用 allocateResult 更新消費者負載的消息隊列緩存表 processQueueTable,生成 pullRequestList 放入 pullRequestQueue 阻塞隊列中:

org.apache.rocketmq.client.impl.consumer.RebalanceImpl#updateProcessQueueTableInRebalance:

List<PullRequest> pullRequestList = new ArrayList<PullRequest>();
// 循環執行,將mqSet訂閱數據封裝成PullRequest對象,並添加到pullRequestList中
for (MessageQueue mq : mqSet) {
  // 若是緩存列表不存在該訂閱信息,說明此次消息隊列從新分配後新增長的消息隊列
  if (!this.processQueueTable.containsKey(mq)) {
    if (isOrder && !this.lock(mq)) {
      log.warn("doRebalance, {}, add a new mq failed, {}, because lock failed", consumerGroup, mq);
      continue;
    }
    this.removeDirtyOffset(mq);
    ProcessQueue pq = new ProcessQueue();
    long nextOffset = this.computePullFromWhere(mq);
    if (nextOffset >= 0) {
      ProcessQueue pre = this.processQueueTable.putIfAbsent(mq, pq);
      if (pre != null) {
        log.info("doRebalance, {}, mq already exists, {}", consumerGroup, mq);
      } else {
        log.info("doRebalance, {}, add a new mq, {}", consumerGroup, mq);
        PullRequest pullRequest = new PullRequest();
        pullRequest.setConsumerGroup(consumerGroup);
        pullRequest.setNextOffset(nextOffset);
        pullRequest.setMessageQueue(mq);
        pullRequest.setProcessQueue(pq);
        pullRequestList.add(pullRequest);
        changed = true;
      }
    } else {
      log.warn("doRebalance, {}, add new mq failed, {}", consumerGroup, mq);
    }
  }
}
// 將pullRequestList添加到PullMessageService中的pullRequestQueue阻塞隊列中,以喚醒PullMessageService線程執行消息拉取
this.dispatchPullRequest(pullRequestList);
複製代碼

前面咱們講到消息拉取是從 pullRequestQueue 阻塞隊列中拿 pullRequest 執行拉取的,以上方法就是建立 pullRequest 的地方。

源碼分析到這裏,就能夠弄清楚爲何會報 the consumer's subscription not exist 這個錯誤了:

假設有消費者組 g1,g1下有消費者 c1 和消費者 c2,c1 訂閱了 topicA,c2 訂閱了 topicB,此時c2 先啓動,將 g1 的訂閱信息更新爲 topicB,c1 隨後啓動,將 g1 的訂閱信息覆蓋爲 topicA,c1 的 Rebalance 負載將 topicA 的 pullRequest 添加到 pullRequestQueue 中,而剛好此時 c2 心跳包又將 g1 的訂閱信息更新爲 topicB,那麼此時 c1 的 PullMessageService 線程拿到 pullRequestQueue 中 topicA 的 pullRequest 進行消息拉取,然而在 broker 端找不到消費者組 g1 下 topicA 的訂閱信息(由於此時剛好被 c2 心跳包給覆蓋了),就會報消費者訂閱信息不存在的錯誤了

公衆號「後端進階」,專一後端技術分享!
相關文章
相關標籤/搜索