上一篇大致看了broker是怎麼註冊到namesrv中的,這篇文章看看namesrv接收到broker的註冊請求的後,namesrv是怎麼處理的。app
注:由於後面準備單獨看看RocketMQ的通訊這塊,因此這裏就直接看namesrv處理broker註冊請求的方法。this
在前面的講namesrv時,在initialize---->registerProcessor方法中,向remotingServer(NettyRemotingServer通訊)註冊了DefaultRequestProcessor對象(在Netty接收到請求後,會調用DefaultRequestProcessor類的processRequest方法,對請求進行處理)。這個對象裏面中的processRequest方法就是處理各類請求的。spa
在processRequest方法中,根據request.getCode()來肯定是什麼請求(註冊broker的code:REGISTER_BROKER)。.net
case RequestCode.REGISTER_BROKER: Version brokerVersion = MQVersion.value2Version(request.getVersion()); if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) { return this.registerBrokerWithFilterServer(ctx, request); } else { return this.registerBroker(ctx, request); }
能夠看到broker的版本大於3_0_11就使用registerBrokerWithFilterServer進行broker的註冊。這裏又引出了一個FilterServer的概念,後面咱們再單獨看看。接着咱們分別來看看這2個方法:registerBrokerWithFilterServer和registerBroker(這2個方法註冊的過程大致同樣,只是在處理FilterServer有稍微的不一樣)。最終處理這個請求的地方是在RouteInfoManager的registerBroker方法中:code
public RegisterBrokerResult registerBroker(// final String clusterName,// 1 final String brokerAddr,// 2 final String brokerName,// 3 final long brokerId,// 4 final String haServerAddr,// 5 final TopicConfigSerializeWrapper topicConfigWrapper,// 6 final List<String> filterServerList, // 7 final Channel channel// 8 ) { RegisterBrokerResult result = new RegisterBrokerResult(); try { try { this.lock.writeLock().lockInterruptibly(); //根據集羣名字,獲取當前集羣下面的全部brokerName //brokerName表示是一組broker(主從):如一個brokerName的值爲:broker-a,可能包括一個master跟它的多個slave Set<String> brokerNames = this.clusterAddrTable.get(clusterName); if (null == brokerNames) { brokerNames = new HashSet<String>(); this.clusterAddrTable.put(clusterName, brokerNames); } brokerNames.add(brokerName); boolean registerFirst = false; //根據brokerName從brokerAddrTable中獲取brokerDate信息 BrokerData brokerData = this.brokerAddrTable.get(brokerName); // 若是當前不存在brokerDate,即尚未broker向namesrv註冊,則直接將當前broker信息put加入 if (null == brokerData) { registerFirst = true; brokerData = new BrokerData(); brokerData.setBrokerName(brokerName); HashMap<Long, String> brokerAddrs = new HashMap<Long, String>(); brokerData.setBrokerAddrs(brokerAddrs); this.brokerAddrTable.put(brokerName, brokerData); } // 保存當前註冊broker的brokerAddr地址信息都brokerData中去 String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr); registerFirst = registerFirst || (null == oldAddr); if (null != topicConfigWrapper // && MixAll.MASTER_ID == brokerId) { //若是topicConfigWrapper不爲空,且當前brokerId == 0,即爲當前broker爲master if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())// || registerFirst) {// 若是Topic配置信息發生變動或者該broker爲第一次註冊 ConcurrentHashMap<String, TopicConfig> tcTable = topicConfigWrapper.getTopicConfigTable();// 獲取全部topic信息 if (tcTable != null) { for(Map.Entry<String,TopicConfig> entry: tcTable.entrySet()){ this.createAndUpdateQueueData(brokerName, entry.getValue());// 根據brokername及topicconfig(read、write queue數量等)新增或者更新到topicQueueTable中 } } } } BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr, // new BrokerLiveInfo(// System.currentTimeMillis(), // topicConfigWrapper.getDataVersion(),// channel, // haServerAddr));// 更新最後變動時間(將brokerLiveTable中保存的對應的broker的更新時間戳,設置爲當前時間) if (null == prevBrokerLiveInfo) { log.info("new broker registerd, {} HAServer: {}", brokerAddr, haServerAddr); } // 更新Filter Server列表 if (filterServerList != null) { if (filterServerList.isEmpty()) { this.filterServerTable.remove(brokerAddr); } else { this.filterServerTable.put(brokerAddr, filterServerList); } } //若是當前broker爲slave節點,則將haServerAddr、masterAddr等信息設置到result返回值中 if (MixAll.MASTER_ID != brokerId) { // 經過brokename的brokedate獲取當前slave節點的master節點addr String masterAddr = brokerData.getBrokerAddrs().get(MixAll.MASTER_ID); if (masterAddr != null) { BrokerLiveInfo brokerLiveInfo = this.brokerLiveTable.get(masterAddr); if (brokerLiveInfo != null) { result.setHaServerAddr(brokerLiveInfo.getHaServerAddr()); result.setMasterAddr(masterAddr); } } } } finally { this.lock.writeLock().unlock(); } } catch (Exception e) { log.error("registerBroker Exception", e); } return result; }
broker定時上報,namesrv定時更新!對象