RocketMQ主從如何同步消息消費進度?

微信公衆號「後端進階」,專一後端技術分享:Java、Golang、WEB框架、分佈式中間件、服務治理等等。

前面我也跟你們講述了 RocketMQ 讀寫分離的規則,可是你可能會問,主從服務器之間的消費進度是如何保持同步的?下面我來給你們解答一下。java

若是消費者消費模式不一樣,也會有不一樣的保存方式,消費者端的消息消費進度保存到 OffsetStore 中,他有兩個實現類:apache

org.apache.rocketmq.client.consumer.store.LocalFileOffsetStore // 本地消費進度保存實現
org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore // 遠程消費進度保存實現

其中,若是是廣播模式消費,消息的消費進度是保存到本地,若是是集羣消費模式,消息的消費進度則是保存到 Broker,但不管是保存到本地,仍是保存到 Broker,消費者都會在本地留一份緩存,咱們暫且看看集羣消費模式下,消息消費進度的緩存是如何保存的:後端

org.apache.rocketmq.client.consumer.store.RemoteBrokerOffsetStore#updateOffset:緩存

public void updateOffset(MessageQueue mq, long offset, boolean increaseOnly) {
  if (mq != null) {
    AtomicLong offsetOld = this.offsetTable.get(mq);
    if (null == offsetOld) {
      offsetOld = this.offsetTable.putIfAbsent(mq, new AtomicLong(offset));
    }

    if (null != offsetOld) {
      if (increaseOnly) {
        MixAll.compareAndIncreaseOnly(offsetOld, offset);
      } else {
        offsetOld.set(offset);
      }
    }
  }
}

消息者在消費完消息後,會調用以上方法,講消費進度放入 offsetTable 緩存中,當 Rebalance 負載從新分配生成 PullRequest 對象時,會調用 RemoteBrokerOffsetStore.readOffset 方法從 offsetTable 緩存中取出對應的消費進度緩存值,再將該值放進 PullRequest 對象中,接下來消息拉取時就很將消息消費進度緩存發送到 Broker 端,因此咱們繼續看 Broker 端的處理邏輯。服務器

以前整理 Broker 啓動流程時,發現 Broker 啓動時會開啓一個定時任務:微信

org.apache.rocketmq.broker.BrokerController#initialize:框架

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            BrokerController.this.slaveSynchronize.syncAll();
        } catch (Throwable e) {
            log.error("ScheduledTask syncAll slave exception", e);
        }
    }
}, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);

若是 Broker 是從服務器,則會開啓以上定時任務。分佈式

org.apache.rocketmq.broker.slave.SlaveSynchronize#syncAll:ide

public void syncAll() {
  this.syncTopicConfig();
  this.syncConsumerOffset();
  this.syncDelayOffset();
  this.syncSubscriptionGroupConfig();
}

在主服務器沒有宕機的狀況下,從服務器會定時從主服務器中同步消息消費進度等信息,那如今問題來了,因爲這個同步是單方面同步,即只會從服務器同步主服務器,那若是主服務器宕機了以後,消費者切換成從服務器拉取消息進行消費,若是以後主服務器啓動了,從服務器在把已經消費過的偏移量同步過來,那豈不是形成同步消費了?this

其實消費者取在拉取消息的時候,若是消費者的緩存中存在消費進度,也會向 Broker 更新消息消費進度,因此即便是主服務器掛了,在它從新啓動以後,消費者的消費進度沒有丟失,依然會更新主服務器的消息消費進度,這樣一來,消費端與主服務器只掛了器中一個,並不會致使消息從新被消費,具體代碼邏輯以下:

org.apache.rocketmq.broker.processor.PullMessageProcessor#processRequest:

boolean storeOffsetEnable = brokerAllowSuspend;
storeOffsetEnable = storeOffsetEnable && hasCommitOffsetFlag;
storeOffsetEnable = storeOffsetEnable
    && this.brokerController.getMessageStoreConfig().getBrokerRole() != BrokerRole.SLAVE;
if (storeOffsetEnable) {
 this.brokerController.getConsumerOffsetManager().commitOffset(RemotingHelper.parseChannelRemoteAddr(channel), requestHeader.getConsumerGroup(), requestHeader.getTopic(), requestHeader.getQueueId(), requestHeader.getCommitOffset());
}

其中 brokerAllowSuspend 表示 broker 是否容許掛起,該值默認爲 true,hasCommitOffsetFlag 表示息消費者在內存中是否緩存了消息消費進度,從代碼邏輯可看出,若是 Broker 爲主服務器,而且 brokerAllowSuspend 和 hasCommitOffsetFlag 都爲true,那麼就會將消費者消費進度更新到本地。
公衆號「後端進階」,專一後端技術分享!

關注公衆號回覆關鍵字「後端」免費領取後端開發大禮包!
相關文章
相關標籤/搜索