聊聊rocketmq的SlaveSynchronize

本文主要研究一下rocketmq的SlaveSynchronizejava

BrokerController

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/BrokerController.javagit

public class BrokerController {

    //......

    private void handleSlaveSynchronize(BrokerRole role) {
        if (role == BrokerRole.SLAVE) {
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
            slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        BrokerController.this.slaveSynchronize.syncAll();
                    }
                    catch (Throwable e) {
                        log.error("ScheduledTask SlaveSynchronize syncAll error.", e);
                    }
                }
            }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS);
        } else {
            //handle the slave synchronise
            if (null != slaveSyncFuture) {
                slaveSyncFuture.cancel(false);
            }
            this.slaveSynchronize.setMasterAddr(null);
        }
    }

    //......
}
  • BrokerController有個handleSlaveSynchronize方法,在role爲BrokerRole.SLAVE的時候,會註冊一個定時任務,每隔10秒鐘執行一次BrokerController.this.slaveSynchronize.syncAll()

SlaveSynchronize

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.javagithub

public class SlaveSynchronize {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.BROKER_LOGGER_NAME);
    private final BrokerController brokerController;
    private volatile String masterAddr = null;

    public SlaveSynchronize(BrokerController brokerController) {
        this.brokerController = brokerController;
    }

    public String getMasterAddr() {
        return masterAddr;
    }

    public void setMasterAddr(String masterAddr) {
        this.masterAddr = masterAddr;
    }

    public void syncAll() {
        this.syncTopicConfig();
        this.syncConsumerOffset();
        this.syncDelayOffset();
        this.syncSubscriptionGroupConfig();
    }

    //......
}
  • SlaveSynchronize的syncAll方法分別調用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法

syncTopicConfig

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.javaapache

public class SlaveSynchronize {
    
    //......

    private void syncTopicConfig() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                TopicConfigSerializeWrapper topicWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak);
                if (!this.brokerController.getTopicConfigManager().getDataVersion()
                    .equals(topicWrapper.getDataVersion())) {

                    this.brokerController.getTopicConfigManager().getDataVersion()
                        .assignNewOne(topicWrapper.getDataVersion());
                    this.brokerController.getTopicConfigManager().getTopicConfigTable().clear();
                    this.brokerController.getTopicConfigManager().getTopicConfigTable()
                        .putAll(topicWrapper.getTopicConfigTable());
                    this.brokerController.getTopicConfigManager().persist();

                    log.info("Update slave topic config from master, {}", masterAddrBak);
                }
            } catch (Exception e) {
                log.error("SyncTopicConfig Exception, {}", masterAddrBak, e);
            }
        }
    }

    //......
}
  • syncTopicConfig方法從this.brokerController.getBrokerOuterAPI().getAllTopicConfig(masterAddrBak)方法獲取TopicConfigSerializeWrapper,以後判斷其dataVersion是否與this.brokerController.getTopicConfigManager().getDataVersion()相同,不一樣的話則使用wrapper的數據更新brokerController.getTopicConfigManager(),而後持久化

syncConsumerOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.javaapp

public class SlaveSynchronize {
    
    //......

    private void syncConsumerOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                ConsumerOffsetSerializeWrapper offsetWrapper =
                    this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak);
                this.brokerController.getConsumerOffsetManager().getOffsetTable()
                    .putAll(offsetWrapper.getOffsetTable());
                this.brokerController.getConsumerOffsetManager().persist();
                log.info("Update slave consumer offset from master, {}", masterAddrBak);
            } catch (Exception e) {
                log.error("SyncConsumerOffset Exception, {}", masterAddrBak, e);
            }
        }
    }

    //......
}
  • syncConsumerOffset方法從this.brokerController.getBrokerOuterAPI().getAllConsumerOffset(masterAddrBak)獲取ConsumerOffsetSerializeWrapper,以後用其數據更新brokerController.getConsumerOffsetManager().getOffsetTable()並持久化

syncDelayOffset

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.javaide

public class SlaveSynchronize {
    
    //......

    private void syncDelayOffset() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                String delayOffset =
                    this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak);
                if (delayOffset != null) {

                    String fileName =
                        StorePathConfigHelper.getDelayOffsetStorePath(this.brokerController
                            .getMessageStoreConfig().getStorePathRootDir());
                    try {
                        MixAll.string2File(delayOffset, fileName);
                    } catch (IOException e) {
                        log.error("Persist file Exception, {}", fileName, e);
                    }
                }
                log.info("Update slave delay offset from master, {}", masterAddrBak);
            } catch (Exception e) {
                log.error("SyncDelayOffset Exception, {}", masterAddrBak, e);
            }
        }
    }

    //......
}
  • syncDelayOffset方法從this.brokerController.getBrokerOuterAPI().getAllDelayOffset(masterAddrBak)獲取delayOffset,而後使用MixAll.string2File(delayOffset, fileName)持久化到文件

syncSubscriptionGroupConfig

rocketmq-all-4.6.0-source-release/broker/src/main/java/org/apache/rocketmq/broker/slave/SlaveSynchronize.javathis

public class SlaveSynchronize {
    
    //......

    private void syncSubscriptionGroupConfig() {
        String masterAddrBak = this.masterAddr;
        if (masterAddrBak != null  && !masterAddrBak.equals(brokerController.getBrokerAddr())) {
            try {
                SubscriptionGroupWrapper subscriptionWrapper =
                    this.brokerController.getBrokerOuterAPI()
                        .getAllSubscriptionGroupConfig(masterAddrBak);

                if (!this.brokerController.getSubscriptionGroupManager().getDataVersion()
                    .equals(subscriptionWrapper.getDataVersion())) {
                    SubscriptionGroupManager subscriptionGroupManager =
                        this.brokerController.getSubscriptionGroupManager();
                    subscriptionGroupManager.getDataVersion().assignNewOne(
                        subscriptionWrapper.getDataVersion());
                    subscriptionGroupManager.getSubscriptionGroupTable().clear();
                    subscriptionGroupManager.getSubscriptionGroupTable().putAll(
                        subscriptionWrapper.getSubscriptionGroupTable());
                    subscriptionGroupManager.persist();
                    log.info("Update slave Subscription Group from master, {}", masterAddrBak);
                }
            } catch (Exception e) {
                log.error("SyncSubscriptionGroup Exception, {}", masterAddrBak, e);
            }
        }
    }

    //......
}
  • syncSubscriptionGroupConfig方法從this.brokerController.getBrokerOuterAPI().getAllSubscriptionGroupConfig(masterAddrBak)獲取SubscriptionGroupWrapper,以後判斷其dataVersion是否與this.brokerController.getSubscriptionGroupManager().getDataVersion()相同,不一樣的話則使用wrapper的數據更新subscriptionGroupManager.getSubscriptionGroupTable(),而後持久化

小結

BrokerController有個handleSlaveSynchronize方法,在role爲BrokerRole.SLAVE的時候,會註冊一個定時任務,每隔10秒鐘執行一次BrokerController.this.slaveSynchronize.syncAll();SlaveSynchronize的syncAll方法分別調用了syncTopicConfig、syncConsumerOffset、syncDelayOffset、syncSubscriptionGroupConfig方法code

doc

相關文章
相關標籤/搜索