RocketMQ探索-namesrv接收到broker的註冊請求

上一篇大致看了broker是怎麼註冊到namesrv中的,這篇文章看看namesrv接收到broker的註冊請求的後,namesrv是怎麼處理的。app

注:由於後面準備單獨看看RocketMQ的通訊這塊,因此這裏就直接看namesrv處理broker註冊請求的方法。this

在前面的講namesrv時,在initialize---->registerProcessor方法中,向remotingServer(NettyRemotingServer通訊)註冊了DefaultRequestProcessor對象(在Netty接收到請求後,會調用DefaultRequestProcessor類的processRequest方法,對請求進行處理)。這個對象裏面中的processRequest方法就是處理各類請求的。spa

在processRequest方法中,根據request.getCode()來肯定是什麼請求(註冊broker的code:REGISTER_BROKER)。.net

case RequestCode.REGISTER_BROKER:
    Version brokerVersion = MQVersion.value2Version(request.getVersion());
    if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) {
        return this.registerBrokerWithFilterServer(ctx, request);
    }
    else {
        return this.registerBroker(ctx, request);
    }

能夠看到broker的版本大於3_0_11就使用registerBrokerWithFilterServer進行broker的註冊。這裏又引出了一個FilterServer的概念,後面咱們再單獨看看。接着咱們分別來看看這2個方法:registerBrokerWithFilterServer和registerBroker(這2個方法註冊的過程大致同樣,只是在處理FilterServer有稍微的不一樣)。最終處理這個請求的地方是在RouteInfoManager的registerBroker方法中:code

public RegisterBrokerResult registerBroker(//
                                           final String clusterName,// 1
                                           final String brokerAddr,// 2
                                           final String brokerName,// 3
                                           final long brokerId,// 4
                                           final String haServerAddr,// 5
                                           final TopicConfigSerializeWrapper topicConfigWrapper,// 6
                                           final List<String> filterServerList, // 7
                                           final Channel channel// 8
) {
    RegisterBrokerResult result = new RegisterBrokerResult();
    try {
        try {
            this.lock.writeLock().lockInterruptibly();


            //根據集羣名字,獲取當前集羣下面的全部brokerName
            //brokerName表示是一組broker(主從):如一個brokerName的值爲:broker-a,可能包括一個master跟它的多個slave
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) {
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            }
            brokerNames.add(brokerName);

            boolean registerFirst = false;


            //根據brokerName從brokerAddrTable中獲取brokerDate信息
            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            // 若是當前不存在brokerDate,即尚未broker向namesrv註冊,則直接將當前broker信息put加入
            if (null == brokerData) {
                registerFirst = true;
                brokerData = new BrokerData();
                brokerData.setBrokerName(brokerName);
                HashMap<Long, String> brokerAddrs = new HashMap<Long, String>();
                brokerData.setBrokerAddrs(brokerAddrs);

                this.brokerAddrTable.put(brokerName, brokerData);
            }
            // 保存當前註冊broker的brokerAddr地址信息都brokerData中去
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);


            if (null != topicConfigWrapper //
                    && MixAll.MASTER_ID == brokerId) { //若是topicConfigWrapper不爲空,且當前brokerId == 0,即爲當前broker爲master
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())//
                        || registerFirst) {// 若是Topic配置信息發生變動或者該broker爲第一次註冊
                    ConcurrentHashMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();// 獲取全部topic信息
                    if (tcTable != null) {
                        for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet()){
                            this.createAndUpdateQueueData(brokerName, entry.getValue());// 根據brokername及topicconfig(read、write queue數量等)新增或者更新到topicQueueTable中
                        }
                    }
                }
            }


            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, //
                    new BrokerLiveInfo(//
                            System.currentTimeMillis(), //
                            topicConfigWrapper.getDataVersion(),//
                            channel, //
                            haServerAddr));// 更新最後變動時間(將brokerLiveTable中保存的對應的broker的更新時間戳,設置爲當前時間)
            if (null == prevBrokerLiveInfo) {
                log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr);
            }


            // 更新Filter Server列表
            if (filterServerList != null) {
                if (filterServerList.isEmpty()) {
                    this.filterServerTable.remove(brokerAddr);
                } else {
                    this.filterServerTable.put(brokerAddr, filterServerList);
                }
            }


            //若是當前broker爲slave節點,則將haServerAddr、masterAddr等信息設置到result返回值中
            if (MixAll.MASTER_ID != brokerId) {
                // 經過brokename的brokedate獲取當前slave節點的master節點addr
                String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID);
                if (masterAddr != null) {
                    BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr);
                    if (brokerLiveInfo != null) {
                        result.setHaServerAddr(brokerLiveInfo.getHaServerAddr());
                        result.setMasterAddr(masterAddr);
                    }
                }
            }
        } finally {
            this.lock.writeLock().unlock();
        }
    } catch (Exception e) {
        log.error("registerBroker Exception", e);
    }

    return result;
}
  1. 將當前請求註冊的broker信息保存或者更新到clusterAddrTable、brokerAddrTable中
  2. 將當前請求註冊的broker的topic信息,保存或者更新到topicQueueTable中

broker定時上報,namesrv定時更新!對象

相關文章
相關標籤/搜索