RocketMQ探索-broker註冊到namesrv中

根據前面介紹,能夠看到broker在啓動的時候要向namesrv註冊本身,這裏咱們就來看看broker怎麼向namesrv註冊本身。app

在BrokerController的start()中會向註冊namesrv註冊本身,並啓動定時註冊Broker到Name Server的任務。ide

  • start()方法中的註冊方法:

       // 啓動時,強制註冊
        this.registerBrokerAll(true, false);ui

        // 定時註冊Broker到Name Server
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {this

            @Override
            public void run() {
                try {
                    BrokerController.this.registerBrokerAll(true, false);
                } catch (Throwable e) {
                    log.error("registerBrokerAll Exception", e);
                }
            }
        }, 1000 * 10, 1000 * 30, TimeUnit.MILLISECONDS);spa

  • 接着看registerBrokerAll方法:
/**
 *  定時註冊Broker到Name Server
 * @param checkOrderConfig
 * @param oneway
 */
public synchronized void registerBrokerAll(final boolean checkOrderConfig, boolean oneway) {
    TopicConfigSerializeWrapper topicConfigWrapper = this.getTopicConfigManager().buildTopicConfigSerializeWrapper();

    //每次註冊的時候都會 同步Broker最新的讀寫權限到每個topic中(TopicConfig對象中),放到TopicConfigTable中
    //註冊的時候要攜帶這些信息
    if (!PermName.isWriteable(this.getBrokerConfig().getBrokerPermission())
            || !PermName.isReadable(this.getBrokerConfig().getBrokerPermission())) {
        ConcurrentHashMap<String, TopicConfig> topicConfigTable = new ConcurrentHashMap<String, TopicConfig>();
        for (TopicConfig topicConfig : topicConfigWrapper.getTopicConfigTable().values()) {
            TopicConfig tmp =
                    new TopicConfig(topicConfig.getTopicName(), topicConfig.getReadQueueNums(), topicConfig.getWriteQueueNums(),
                            this.brokerConfig.getBrokerPermission());
            topicConfigTable.put(topicConfig.getTopicName(), tmp);
        }
        topicConfigWrapper.setTopicConfigTable(topicConfigTable);
    }

    /**
     * 註冊時攜帶的信息是:
     * cluster name,broker 地址,broker name,broker id //0爲master 1爲slave,HA地址,即slave能夠經過該地址與master進行數據同步
     * topic信息 包含topic name,read queue num, write queue num, permission
     */
    RegisterBrokerResult registerBrokerResult = this.brokerOuterAPI.registerBrokerAll(//
            this.brokerConfig.getBrokerClusterName(), //
            this.getBrokerAddr(), //
            this.brokerConfig.getBrokerName(), //
            this.brokerConfig.getBrokerId(), //
            this.getHAServerAddr(), //
            topicConfigWrapper,//
            this.filterServerManager.buildNewFilterServerList(),//
            oneway,//
            this.brokerConfig.getRegisterBrokerTimeoutMills());

    if (registerBrokerResult != null) {
        if (this.updateMasterHAServerAddrPeriodically && registerBrokerResult.getHaServerAddr() != null) {
            this.messageStore.updateHaMasterAddress(registerBrokerResult.getHaServerAddr());
        }

        this.slaveSynchronize.setMasterAddr(registerBrokerResult.getMasterAddr());

        // 檢查 topic config 的順序消息配置
        if (checkOrderConfig) {
            this.getTopicConfigManager().updateOrderTopicConfig(registerBrokerResult.getKvTable());
        }
    }
}
  • 接着看BrokerOuterAPI類的registerBrokerAll方法
/**
 * 註冊Broker到Name Server
 * @param clusterName
 * @param brokerAddr
 * @param brokerName
 * @param brokerId
 * @param haServerAddr
 * @param topicConfigWrapper
 * @param filterServerList
 * @param oneway
 * @param timeoutMills
 * @return
 */
public RegisterBrokerResult registerBrokerAll(//
                                              final String clusterName, // 1
                                              final String brokerAddr, // 2
                                              final String brokerName, // 3
                                              final long brokerId, // 4
                                              final String haServerAddr, // 5
                                              final TopicConfigSerializeWrapper topicConfigWrapper, // 6
                                              final List<String> filterServerList, // 7
                                              final boolean oneway,// 8
                                              final int timeoutMills// 9
) {
    RegisterBrokerResult registerBrokerResult = null;

    //獲取namesrv的地址信息列表
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null) {
        for (String namesrvAddr : nameServerAddressList) { //循環namesrv的信息列表把broker註冊到每個namesrv中
            try {
                RegisterBrokerResult result = this.registerBroker(namesrvAddr, clusterName, brokerAddr, brokerName, brokerId,
                        haServerAddr, topicConfigWrapper, filterServerList, oneway, timeoutMills);
                if (result != null) {
                    registerBrokerResult = result;
                }

                log.info("register broker to name server {} OK", namesrvAddr);
            } catch (Exception e) {
                log.warn("registerBroker Exception, " + namesrvAddr, e);
            }
        }
    }

    return registerBrokerResult;
}
  • 接着看BrokerOuterAPI類的registerBroker方法
private RegisterBrokerResult registerBroker(//
                                            final String namesrvAddr, //
                                            final String clusterName, // 1
                                            final String brokerAddr, // 2
                                            final String brokerName, // 3
                                            final long brokerId, // 4
                                            final String haServerAddr, // 5
                                            final TopicConfigSerializeWrapper topicConfigWrapper, // 6
                                            final List<String> filterServerList, // 7
                                            final boolean oneway,// 8
                                            final int timeoutMills// 9
) throws RemotingCommandException, MQBrokerException, RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException,
        InterruptedException {
    RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
    requestHeader.setBrokerAddr(brokerAddr);
    requestHeader.setBrokerId(brokerId);
    requestHeader.setBrokerName(brokerName);
    requestHeader.setClusterName(clusterName);
    requestHeader.setHaServerAddr(haServerAddr);
    //這裏利用命令模式對請求的信息進行封裝,告訴接收請求的service端,該請求的目的是什麼(好比這裏的REGISTER_BROKER,告訴service端該請求是註冊broker)
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.REGISTER_BROKER, requestHeader);

    RegisterBrokerBody requestBody = new RegisterBrokerBody();
    requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
    requestBody.setFilterServerList(filterServerList);
    request.setBody(requestBody.encode());

    //以上就完成了向namesrv註冊broker的請求頭和請求體

    if (oneway) {
        try {
            this.remotingClient.invokeOneway(namesrvAddr, request, timeoutMills);
        } catch (RemotingTooMuchRequestException e) {
        }
        return null;
    }

    RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            RegisterBrokerResponseHeader responseHeader =
                    (RegisterBrokerResponseHeader) response.decodeCommandCustomHeader(RegisterBrokerResponseHeader.class);
            RegisterBrokerResult result = new RegisterBrokerResult();
            result.setMasterAddr(responseHeader.getMasterAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            result.setHaServerAddr(responseHeader.getHaServerAddr());
            if (response.getBody() != null) {
                result.setKvTable(KVTable.decode(response.getBody(), KVTable.class));
            }
            return result;
        }
        default:
            break;
    }

    throw new MQBrokerException(response.getCode(), response.getRemark());
}
RemotingCommand response = this.remotingClient.invokeSync(namesrvAddr, request, timeoutMills);

調用invokeSync方法進行broker的註冊。通訊後面專門看看.net

大致的說明都在寫在了方法的註釋上面了。code

相關文章
相關標籤/搜索