根據前面介紹,能夠看到broker在啓動的時候要向namesrv註冊本身,這裏咱們就來看看broker怎麼向namesrv註冊本身。app
在BrokerController的start()中會向註冊namesrv註冊本身,並啓動定時註冊Broker到Name Server的任務。ide
// 啓動時,強制註冊
this.registerBrokerAll(true, false);ui
// 定時註冊Broker到Name Server
this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {this
@Override
public void run() {
try {
BrokerController.this.registerBrokerAll(true, false);
} catch (Throwable e) {
log.error("registerBrokerAll Exception", e);
}
}
}, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);spa
/** * 定時註冊Broker到Name Server * @param checkOrderConfig * @param oneway */ public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) { TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper(); //每次註冊的時候都會 同步Broker最新的讀寫權限到每個topic中(TopicConfig對象中),放到TopicConfigTable中 //註冊的時候要攜帶這些信息 if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission()) || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) { ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>(); for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) { TopicConfig tmp = new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(), this.brokerConfig.getBrokerPermission()); topicConfigTable.put(topicConfig.getTopicName(), tmp); } topicConfigWrapper.setTopicConfigTable(topicConfigTable); } /** * 註冊時攜帶的信息是: * cluster name,broker 地址,broker name,broker id //0爲master 1爲slave,HA地址,即slave能夠經過該地址與master進行數據同步 * topic信息 包含topic name,read queue num, write queue num, permission */ RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(// this.brokerConfig.getBrokerClusterName(), // this.getBrokerAddr(), // this.brokerConfig.getBrokerName(), // this.brokerConfig.getBrokerId(), // this.getHAServerAddr(), // topicConfigWrapper,// this.filterServerManager.buildNewFilterServerList(),// oneway,// this.brokerConfig.getRegisterBrokerTimeoutMills()); if (registerBrokerResult != null) { if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) { this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr()); } this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr()); // 檢查 topic config 的順序消息配置 if (checkOrderConfig) { this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable()); } } }
/** * 註冊Broker到Name Server * @param clusterName * @param brokerAddr * @param brokerName * @param brokerId * @param haServerAddr * @param topicConfigWrapper * @param filterServerList * @param oneway * @param timeoutMills * @return */ public RegisterBrokerResult registerBrokerAll(// final String clusterName, // 1 final String brokerAddr, // 2 final String brokerName, // 3 final long brokerId, // 4 final String haServerAddr, // 5 final TopicConfigSerializeWrapper topicConfigWrapper, // 6 final List<String> filterServerList, // 7 final boolean oneway,// 8 final int timeoutMills// 9 ) { RegisterBrokerResult registerBrokerResult = null; //獲取namesrv的地址信息列表 List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList(); if (nameServerAddressList != null) { for (String namesrvAddr : nameServerAddressList) { //循環namesrv的信息列表把broker註冊到每個namesrv中 try { RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId, haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills); if (result != null) { registerBrokerResult = result; } log.info("register broker to name server {} OK", namesrvAddr); } catch (Exception e) { log.warn("registerBroker Exception, " + namesrvAddr, e); } } } return registerBrokerResult; }
private RegisterBrokerResult registerBroker(// final String namesrvAddr, // final String clusterName, // 1 final String brokerAddr, // 2 final String brokerName, // 3 final long brokerId, // 4 final String haServerAddr, // 5 final TopicConfigSerializeWrapper topicConfigWrapper, // 6 final List<String> filterServerList, // 7 final boolean oneway,// 8 final int timeoutMills// 9 ) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException { RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader(); requestHeader.setBrokerAddr(brokerAddr); requestHeader.setBrokerId(brokerId); requestHeader.setBrokerName(brokerName); requestHeader.setClusterName(clusterName); requestHeader.setHaServerAddr(haServerAddr); //這裏利用命令模式對請求的信息進行封裝,告訴接收請求的service端,該請求的目的是什麼(好比這裏的REGISTER_BROKER,告訴service端該請求是註冊broker) RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader); RegisterBrokerBody requestBody = new RegisterBrokerBody(); requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper); requestBody.setFilterServerList(filterServerList); request.setBody(requestBody.encode()); //以上就完成了向namesrv註冊broker的請求頭和請求體 if (oneway) { try { this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills); } catch (RemotingTooMuchRequestException e) { } return null; } RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills); assert response != null; switch (response.getCode()) { case ResponseCode.SUCCESS: { RegisterBrokerResponseHeader responseHeader = (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class); RegisterBrokerResult result = new RegisterBrokerResult(); result.setMasterAddr(responseHeader.getMasterAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); result.setHaServerAddr(responseHeader.getHaServerAddr()); if (response.getBody() != null) { result.setKvTable(KVTable.decode(response.getBody(), KVTable.class)); } return result; } default: break; } throw new MQBrokerException(response.getCode(), response.getRemark()); }
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
調用invokeSync方法進行broker的註冊。通訊後面專門看看.net
大致的說明都在寫在了方法的註釋上面了。code