RocketMQ源碼解讀——同一消費組下不一樣消費者訂閱關係不一樣時

RocketMQ源碼解讀——同一消費組下不一樣消費者訂閱關係不一樣時

@(rocketmq源碼解讀)java


先解釋一下題目,咱們假設有一個Producer和兩個ConsumerProducerTOPICATOPICB發送消息,兩個Consumer分別訂閱兩個topic。咱們看下這時候會出現的問題,以及根據源碼分析一下爲何出現問題。ide

現象

現象其實仍是比較隱蔽的,broker上會打印:the consumer's subscription not exist,group ...的日誌(Consumer端也會打印相似的日誌)。源碼分析

還會有一些subscription changed, group: ...相似的日誌,而且若是仔細的話還會發現,其中一個消費者消費消息時,另一個就不會消費。ui

源碼分析

咱們看一下爲何會致使這樣的問題,一開始生看或者debug都是很難下手,這時候可能就須要使用必殺技(通常不外傳那種)——this

問天問地,谷歌百度必應。我直接問了一個大神——芋艿。大神說這種狀況會出問題,具體緣由他也記不清了,致使這種現象的問題應該是消費關係不停地相互覆蓋spa

好了,聽到這句話咱們就有入口了,至少知道應該從Broker上找起。debug

順藤摸瓜找到了緣由,下面一塊兒看一下源碼。日誌

首先咱們知道,消費者的兩種實現(推和拉)中都維護一個MQClientInstance,這個類很是重要,在啓動消費者的時候,都會去啓動這個類,咱們看下啓動的代碼,其中有這麼一部分:code

// Start various schedule tasks
this.startScheduledTask();
複製代碼

這裏啓動了好多定時任務,咱們追進去看一下:ip

this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {

    @Override
    public void run() {
        try {
            MQClientInstance.this.cleanOfflineBroker();
            //定時發送心跳
            MQClientInstance.this.sendHeartbeatToAllBrokerWithLock();
        } catch (Exception e) {
            log.error("ScheduledTask sendHeartbeatToAllBroker exception", e);
        }
    }
}, 1000, this.clientConfig.getHeartbeatBrokerInterval(), TimeUnit.MILLISECONDS);
複製代碼

這裏咱們看到,消費者會定時發送心跳給Broker,咱們繼續追進去,最後找到sendHeartbeatToAllBroker方法:

//給全部的broker發送心跳
if (!this.brokerAddrTable.isEmpty()) {
    long times = this.sendHeartbeatTimesTotal.getAndIncrement();
    Iterator<Entry<String, HashMap<Long, String>>> it = this.brokerAddrTable.entrySet().iterator();
    while (it.hasNext()) {
        Entry<String, HashMap<Long, String>> entry = it.next();
        String brokerName = entry.getKey();
        HashMap<Long, String> oneTable = entry.getValue();
        if (oneTable != null) {
            for (Map.Entry<Long, String> entry1 : oneTable.entrySet()) {
                Long id = entry1.getKey();
                String addr = entry1.getValue();
                if (addr != null) {
                    if (consumerEmpty) {
                        if (id != MixAll.MASTER_ID)
                            continue;
                    }

                    try {
	                    //真正發送心跳的部分
                        int version = this.mQClientAPIImpl.sendHearbeat(addr, heartbeatData, 3000);
                        if (!this.brokerVersionTable.containsKey(brokerName)) {
                            this.brokerVersionTable.put(brokerName, new HashMap<String, Integer>(4));
                        }
                        this.brokerVersionTable.get(brokerName).put(addr, version);
                        if (times % 20 == 0) {
                            log.info("send heart beat to broker[{} {} {}] success", brokerName, id, addr);
                            log.info(heartbeatData.toString());
                        }
                    } catch (Exception e) {
                        if (this.isBrokerInNameServer(addr)) {
                            log.info("send heart beat to broker[{} {} {}] failed", brokerName, id, addr);
                        } else {
                            log.info("send heart beat to broker[{} {} {}] exception, because the broker not up, forget it", brokerName,
                                id, addr);
                        }
                    }
                }
            }
        }
    }
}
複製代碼

這裏會向全部的Broker發送心跳,咱們根據咱們的例子,這時候Broker是一臺,咱們再去Broker上看一下Broker如何處理心跳消息,咱們根據發送的是HEART_BEAT類型的消息,能夠在Broker上看到,這類消息使用ClientManageProcessor處理,咱們看下處理心跳的部分(heartBeat方法):

//循環全部發送過來的數據
for (ConsumerData data : heartbeatData.getConsumerDataSet()) {
	//根據消費組的名字獲取broker上記錄的消費消息
    SubscriptionGroupConfig subscriptionGroupConfig = this.brokerController.getSubscriptionGroupManager().findSubscriptionGroupConfig(data.getGroupName());
    boolean isNotifyConsumerIdsChangedEnable = true;
    if (null != subscriptionGroupConfig) {
        isNotifyConsumerIdsChangedEnable = subscriptionGroupConfig.isNotifyConsumerIdsChangedEnable();
        int topicSysFlag = 0;
        if (data.isUnitMode()) {
            topicSysFlag = TopicSysFlag.buildSysFlag(false, true);
        }
        String newTopic = MixAll.getRetryTopic(data.getGroupName());
        this.brokerController.getTopicConfigManager().createTopicInSendMessageBackMethod(
            newTopic,
            subscriptionGroupConfig.getRetryQueueNums(),
            PermName.PERM_WRITE | PermName.PERM_READ, topicSysFlag);
    }
	//註冊消費者
    boolean changed = this.brokerController.getConsumerManager().registerConsumer(
        data.getGroupName(),
        clientChannelInfo,
        data.getConsumeType(),
        data.getMessageModel(),
        data.getConsumeFromWhere(),
        data.getSubscriptionDataSet(),
        isNotifyConsumerIdsChangedEnable
    );

    if (changed) {
        log.info("registerConsumer info changed {} {}",
            data.toString(),
            RemotingHelper.parseChannelRemoteAddr(ctx.channel())
        );
    }
}
複製代碼

咱們能夠看到,broker會根據consumer放過來的消息,獲取本身這邊記錄的消費者訂閱的信息,注意,獲取時是按照消費組獲取的,咱們看下registerConsumer

//根據消費組獲取消費者信息
ConsumerGroupInfo consumerGroupInfo = this.consumerTable.get(group);
if (null == consumerGroupInfo) {
    ConsumerGroupInfo tmp = new ConsumerGroupInfo(group, consumeType, messageModel, consumeFromWhere);
	//注意這裏,這裏consumerTable的鍵就是group
    ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);
    consumerGroupInfo = prev != null ? prev : tmp;
}
boolean r1 =
    consumerGroupInfo.updateChannel(clientChannelInfo, consumeType, messageModel,
        consumeFromWhere);
boolean r2 = consumerGroupInfo.updateSubscription(subList);
if (r1 || r2) {
    if (isNotifyConsumerIdsChangedEnable) {
        this.consumerIdsChangeListener.handle(ConsumerGroupEvent.CHANGE, group, consumerGroupInfo.getAllChannel());
    }
}
this.consumerIdsChangeListener.handle(ConsumerGroupEvent.REGISTER, group, subList);
return r1 || r2;
複製代碼

咱們注意ConsumerGroupInfo prev = this.consumerTable.putIfAbsent(group, tmp);這裏,這句話告訴咱們consumerTable中存放的消費者信息是按照消費組來的,那麼一個組的消費信息若是不同,按照咱們的例子中,則訂閱了TOPICA的消費者心跳信息告訴Broker:咱們組訂閱的是TOPICA!而後Broker就記錄下來了。過了一會訂閱了TOPICB的消費者心跳信息高速Broker:咱們訂閱的是TOPICB

這裏就致使了訂閱消息相互覆蓋,那麼拉取消息時,確定有一個消費者無法拉到消息,由於Broker上查詢不到訂閱信息。

至此咱們就知道了致使上述現象的緣由。

相關文章
相關標籤/搜索