private void copySubscription() throws MQClientException { try { //注:一個consumer對象能夠訂閱多個topic Map<String, String> sub = this.defaultMQPushConsumer.getSubscription(); if (sub != null) { for (final Map.Entry<String, String> entry : sub.entrySet()) { final String topic = entry.getKey(); final String subString = entry.getValue(); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),// topic, subString); this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData); } } if (null == this.messageListenerInner) { this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener(); } switch (this.defaultMQPushConsumer.getMessageModel()) { case BROADCASTING: break; case CLUSTERING: final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup()); SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),// retryTopic, SubscriptionData.SUB_ALL); this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData); break; default: break; } } catch (Exception e) { throw new MQClientException("subscription exception", e); } }
this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup()); this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel()); this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer .getAllocateMessageQueueStrategy()); this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
@Override public void run() { log.info(this.getServiceName() + " service started"); while (!this.isStoped()) { this.waitForRunning(WaitInterval); this.mqClientFactory.doRebalance(); } log.info(this.getServiceName() + " service end"); }
public void doRebalance() { for (String group : this.consumerTable.keySet()) { MQConsumerInner impl = this.consumerTable.get(group); if (impl != null) { try { impl.doRebalance(); } catch (Exception e) { log.error("doRebalance exception", e); } } } }
@Override public void doRebalance() { if (this.rebalanceImpl != null) { this.rebalanceImpl.doRebalance(); } }
public void doRebalance() { // 前文copySubscription中初始化了SubscriptionInner Map<String, SubscriptionData> subTable = this.getSubscriptionInner(); if (subTable != null) { for (final Map.Entry<String, SubscriptionData> entry : subTable.entrySet()) { final String topic = entry.getKey(); try { this.rebalanceByTopic(topic); } catch (Exception e) { if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) { log.warn("rebalanceByTopic Exception", e); } } } } this.truncateMessageQueueNotMyTopic(); }
VI. rebalanceByTopic -- 核心步驟之一
rebalanceByTopic方法中根據消費者的消費類型爲BROADCASTING或CLUSTERING作不一樣的邏輯處理。CLUSTERING邏輯包括BROADCASTING邏輯,本部分只介紹集羣消費負載均衡的邏輯。
集羣消費負載均衡邏輯主要代碼以下(省略了log等代碼):算法
//1.從topicSubscribeInfoTable列表中獲取與該topic相關的全部消息隊列 Set<MessageQueue> mqSet = this.topicSubscribeInfoTable.get(topic); //2. 從broker端獲取消費該消費組的全部客戶端clientId List<String> cidAll = this.mQClientFactory.findConsumerIdList(topic, consumerGroup); f (null == mqSet) { ... } if (null == cidAll) { ... } if (mqSet != null && cidAll != null) { List<MessageQueue> mqAll = new ArrayList<MessageQueue>(); mqAll.addAll(mqSet); Collections.sort(mqAll); Collections.sort(cidAll); // 3.建立DefaultMQPushConsumer對象時默認設置爲AllocateMessageQueueAveragely AllocateMessageQueueStrategy strategy = this.allocateMessageQueueStrategy; List<MessageQueue> allocateResult = null; try { // 4.調用AllocateMessageQueueAveragely.allocate方法,獲取當前client分配消費隊列 allocateResult = strategy.allocate( this.consumerGroup, this.mQClientFactory.getClientId(), mqAll, cidAll); } catch (Throwable e) { return; } // 5. 將分配獲得的allocateResult 中的隊列放入allocateResultSet 集合 Set<MessageQueue> allocateResultSet = new HashSet<MessageQueue>(); if (allocateResult != null) { allocateResultSet.addAll(allocateResult); } 、 //6. 更新updateProcessQueue boolean changed = this.updateProcessQueueTableInRebalance(topic, allocateResultSet); if (changed) { this.messageQueueChanged(topic, mqSet, allocateResultSet); } }