rocketmq消費負載均衡--push消費爲例

本文介紹了DefaultMQPushConsumerImpl消費者,客戶端負載均衡相關知識點。本文從DefaultMQPushConsumerImpl啓動過程到實現負載均衡,從源代碼一步一步分析,共分爲6個部分進行介紹,其中第6個部分  rebalanceByTopic 爲負載均衡的核心邏輯模塊,具體過程運用了圖文進行闡述。
 
介紹以前首先拋出幾個問題:
1. 要作負載均衡,首先要解決的一個問題是什麼?
2. 負載均衡是Client端處理仍是Broker端處理?
我的理解:
1. 要作負載均衡,首先要作的就是信號收集。
所謂信號收集,就是得知道每個consumerGroup有哪些consumer,對應的topic是誰。信號收集分爲Client端信號收集與Broker端信號收集兩個部分。
2. 負載均衡放在Client端處理。
具體作法是:消費者客戶端在啓動時完善rebalanceImpl實例,同時拷貝訂閱信息存放rebalanceImpl實例對象中,另外也是很重要的一個步驟 -- 經過心跳消息,不停的上報本身到全部Broker,註冊RegisterConsumer,等待上述過程準備好以後在Client端不斷執行的負載均衡服務線程從Broker端獲取一份全局信息(該consumerGroup下全部的消費Client),而後分配這些全局信息,獲取當前客戶端分配到的消費隊列。
 
本文具體的內容:
I. copySubscription
Client端信號收集,拷貝訂閱信息。
在DefaultMQPushConsumerImpl.start()時,會將消費者的topic訂閱關係設置到rebalanceImpl的SubscriptionInner的map中用於負載:
private void copySubscription() throws MQClientException {
        try {
      //注:一個consumer對象能夠訂閱多個topic
            Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData =
                            FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                                topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
            case BROADCASTING:
                break;
            case CLUSTERING:
                final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                SubscriptionData subscriptionData =
                        FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),//
                            retryTopic, SubscriptionData.SUB_ALL);
                this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
                break;
            default:
                break;
            }
        }
        catch (Exception e) {
            throw new MQClientException("subscription exception", e);
        }
    }
FilterAPI.buildSubscriptionData接口將訂閱關係轉換爲SubscriptionData 數據,其中subString包含訂閱tag等信息。另外,若是該消費者的消費模式爲集羣消費,則會將retry的topic一併放到。
 
II. 完善rebalanceImpl實例
Client繼續收集信息:
 this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
 this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
 this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer
                .getAllocateMessageQueueStrategy());
 this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
本文以DefaultMQPushConsumerImpl爲例,所以this對象類型爲DefaultMQPushConsumerImp。
 
III. this.rebalanceService.start()
開啓負載均衡服務。this.rebalanceService是一個RebalanceService實例對象,它繼承與ServiceThread,是一個線程類。 this.rebalanceService.start()執行時,也即執行RebalanceService線程體:
   @Override
    public void run() {
        log.info(this.getServiceName() + " service started");

        while (!this.isStoped()) {
            this.waitForRunning(WaitInterval);
            this.mqClientFactory.doRebalance();
        }

        log.info(this.getServiceName() + " service end");
    }

 

IV. this.mqClientFactory.doRebalance
客戶端遍歷消費組table,對該客戶端上全部消費者獨立進行負載均衡,分發消費隊列:
 public void doRebalance() {
        for (String group : this.consumerTable.keySet()) {
            MQConsumerInner impl = this.consumerTable.get(group);
            if (impl != null) {
                try {
                    impl.doRebalance();
                } catch (Exception e) {
                    log.error("doRebalance exception", e);
                }
            }
        }
    }

 

V. MQConsumerInner.doRebalance
因爲本文以DefaultMQPushConsumerImpl消費過程爲例,即DefaultMQPushConsumerImpl.doRebalance:
@Override
    public void doRebalance() {
        if (this.rebalanceImpl != null) {
            this.rebalanceImpl.doRebalance();
        }
    }
步驟II 中完善了rebalanceImpl實例,爲調用rebalanceImpl.doRebalance()提供了初始數據。
rebalanceImpl.doRebalance()過程以下:
public void doRebalance() {
     // 前文copySubscription中初始化了SubscriptionInner
        Map<String, SubscriptionData> subTable = this.getSubscriptionInner();
        if (subTable != null) {
            for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) {
                final String topic = entry.getKey();
                try {
                    this.rebalanceByTopic(topic);
                } catch (Exception e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                        log.warn("rebalanceByTopic Exception", e);
                    }
                }
            }
        }

        this.truncateMessageQueueNotMyTopic();
    }

 

VI. rebalanceByTopic -- 核心步驟之一
rebalanceByTopic方法中根據消費者的消費類型爲BROADCASTING或CLUSTERING作不一樣的邏輯處理。CLUSTERING邏輯包括BROADCASTING邏輯,本部分只介紹集羣消費負載均衡的邏輯。
集羣消費負載均衡邏輯主要代碼以下(省略了log等代碼):算法

//1.從topicSubscribeInfoTable列表中獲取與該topic相關的全部消息隊列
Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic);
//2. 從broker端獲取消費該消費組的全部客戶端clientId
List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup);
 f (null == mqSet) { ... }
if (null == cidAll) { ... }
if (mqSet != null && cidAll != null) {
        List<MessageQueue> mqAll = new ArrayList<MessageQueue>();
        mqAll.addAll(mqSet);
        Collections.sort(mqAll);
        Collections.sort(cidAll);

     // 3.建立DefaultMQPushConsumer對象時默認設置爲AllocateMessageQueueAveragely
        AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy;

        List<MessageQueue> allocateResult = null;
        try {
          // 4.調用AllocateMessageQueueAveragely.allocate方法,獲取當前client分配消費隊列
                allocateResult = strategy.allocate(
                this.consumerGroup, 
                this.mQClientFactory.getClientId(), 
                mqAll,
                cidAll);
            } catch (Throwable e) {
                        return;
                    }
         // 5. 將分配獲得的allocateResult 中的隊列放入allocateResultSet 集合
            Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>();
            if (allocateResult != null) {
                allocateResultSet.addAll(allocateResult);
            }
、
         //6. 更新updateProcessQueue
            boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet);
            if (changed) {
                    this.messageQueueChanged(topic, mqSet, allocateResultSet);
            }
}
注:BROADCASTING邏輯只包含上述的一、6。
集羣消費負載均衡邏輯中的一、二、4這三個點相關知識爲其核心過程,各個點相關知識以下:
 
第1點:從topicSubscribeInfoTable列表中獲取與該topic相關的全部消息隊列

 

第2點: 從broker端獲取消費該消費組的全部客戶端clientId
首先,消費者對象不斷地向全部broker發送心跳包,上報本身,註冊並更新訂閱關係以及客戶端ChannelInfoTable;以後,客戶端在作消費負載均衡時獲取那些消費客戶端,對這些客戶端進行負載均衡,分發消費的隊列。具體過程以下圖所示:

第4點:調用AllocateMessageQueueAveragely.allocate方法,獲取當前client分配消費隊列
 

 

注:上圖中cId一、cId二、...、cIdN經過 getConsumerIdListByGroup 獲取,它們在這個ConsumerGroup下全部在線客戶端列表中。
當前消費對進行負載均衡策略後獲取對應的消息消費隊列。具體的算法很簡單,能夠看源碼。
相關文章
相關標籤/搜索