【Canal源碼分析】Canal Server的啓動和中止過程

本文主要解析下canal server的啓動過程,但願能有所收穫。java

1、序列圖

1.1 啓動

image

1.2 中止

image

2、源碼分析

整個server啓動的過程比較複雜,看圖難以理解,須要輔以文字說明。spring

首先程序的入口在CanalLauncher的main方法中。緩存

2.1 加載配置文件

String conf = System.getProperty("canal.conf", "classpath:canal.properties");
Properties properties = new Properties();
if (conf.startsWith(CLASSPATH_URL_PREFIX)) {
    conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX);
    properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf));
} else {
    properties.load(new FileInputStream(conf));
}

從canal.properties文件中load全部的配置信息,加載到上下文中。再也不贅述。網絡

2.2 構造CanalController

根據配置文件來構造CanalController,這塊的代碼比較多,主要分爲七個步驟,具體以下。socket

2.2.1 初始化全局參數配置

調用initGlobalConfig方法,過程以下:ide

  • 判斷運行模式,是從spring加載仍是manager加載,目前開源版本建議使用spring
  • 獲取是否懶加載
  • 若是是manager模式啓動,獲取manager的ip地址;若是是spring模式啓動,獲取spring xml的文件地址,加載到所有配置中
  • 構造一個實例構造器CanalInstanceGenerator,咱們用到的就是在spring的beanFactory中加上destination的bean,這個destination就是canal instance的名稱

這塊邏輯在CanalController的initGlobalConfig方法中。函數

2.2.2 初始化實例配置

這塊的邏輯是從instance.properties裏面初始化實例。源碼分析

private void initInstanceConfig(Properties properties) {
    String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS);
    String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT);

    for (String destination : destinations) {
        InstanceConfig config = parseInstanceConfig(properties, destination);
        InstanceConfig oldConfig = instanceConfigs.put(destination, config);

        if (oldConfig != null) {
            logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination,
                    oldConfig, config });
        }
    }
}

從這段代碼中能夠看出,咱們在一個canal.properties文件中,能夠配置多個destination,也就是能夠配置多個instance,不一樣的instance以逗號隔開。這裏主要看的是parseInstanceConfig()方法,裏面的邏輯以下:spa

  • 獲取啓動模式,是manager仍是spring,咱們這邊默認都是spring。
  • 獲取懶加載字段
  • 獲取spring xml配置文件地址

2.2.3 初始SocketChannel

從配置文件中獲取canal.socketChannel字段,放到全局變量中。code

2.2.4 準備canal server

從配置文件中分別獲取canal.id、ip、port(對外提供socket服務的端口),獲取一個內存級的server單例,同時也獲取一個對外提供Netty服務的單例。

cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID));
ip = getProperty(properties, CanalConstants.CANAL_IP);
port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT));
embededCanalServer = CanalServerWithEmbedded.instance();
embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設置自定義的instanceGenerator
canalServer = CanalServerWithNetty.instance();
canalServer.setIp(ip);
canalServer.setPort(port);

2.2.5 初始化系統目錄

從配置文件中獲取zk地址(canal.zkServers),啓動一個zk客戶端,而後初始化兩個系統目錄,分別是:

  • /otter/canal/destinations
  • /otter/canal/cluster

2.2.6 初始化系統監控

根據destination構造運行時監控,其實就是根據instance名來構造ServerRunningMonitor。其實就是實現了ServerRunningListener中的一些方法。

public interface ServerRunningListener {

    /**
     * 啓動時回調作點事情
     */
    public void processStart();

    /**
     * 關閉時回調作點事情
     */
    public void processStop();

    /**
     * 觸發如今輪到本身作爲active,須要載入上一個active的上下文數據
     */
    public void processActiveEnter();

    /**
     * 觸發一下當前active模式失敗
     */
    public void processActiveExit();

}

而後初始化一下ServerRunningMonitor。

runningMonitor.init();

這個init方法跟蹤的結果,其實就是執行了ServerRunningListener中的processStart方法。

public void processStart() {
    try {
        if (zkclientx != null) {
            final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port);
            initCid(path);
            zkclientx.subscribeStateChanges(new IZkStateListener() {

                public void handleStateChanged(KeeperState state) throws Exception {

                }

                public void handleNewSession() throws Exception {
                    initCid(path);
                }

                @Override
                public void handleSessionEstablishmentError(Throwable error) throws Exception {
                    logger.error("failed to connect to zookeeper", error);
                }
            });
        }
    } finally {
        MDC.remove(CanalConstants.MDC_DESTINATION);
    }
}

首先獲取了/otter/canal/destinations/{destination}/cluster/ip:port的內容,其實就是server的地址,最後一個ip:port是個zk的臨時節點。而後訂閱一下節點事件,當節點有事件推送過來後,作一些動做。

2.2.7 初始化配置文件監控

若是canal.auto.scan配置爲true(默認爲true),首先定義一個InstanceAction,包含了啓動、中止、重啓instance的動做。

定義一個SpringInstanceConfigMonitor,配置定時掃描的事件爲canal.auto.scan.interval,默認5s,掃描canal.conf.dir目錄下的文件,與上面定義的InstanceAction結合起來。

2.3 啓動CanalController

上面的構造方法其實就是定義一些必要的內容,真正的啓動在這個方法中。

2.3.1 建立工做節點

建立臨時節點/otter/canal/cluster/ip:port,同時啓動監聽器.

2.3.2 啓動embeded服務

embededCanalServer.start();

這個start裏面,一個是將當前server的running狀態置爲true,同時根據destination構建CanalInstance。

2.3.3 HA啓動

遍歷Map<String, InstanceConfig>中的InstanceConfig,若是CanalInsance還沒啓動,若是不是懶加載的話,直接HA啓動ServerRunningMonitor。

ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination);
if (!config.getLazy() && !runningMonitor.isStart()) {
    runningMonitor.start();
}

public synchronized void start() {
    super.start();
    try {
        processStart();
        if (zkClient != null) {
            // 若是須要儘量釋放instance資源,不須要監聽running節點,否則即便stop了這臺機器,另外一臺機器立馬會start
            String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
            zkClient.subscribeDataChanges(path, dataListener);

            initRunning();
        } else {
            processActiveEnter();// 沒有zk,直接啓動
        }
    } catch (Exception e) {
        logger.error("start failed", e);
        // 沒有正常啓動,重置一下狀態,避免干擾下一次start
        stop();
    }

}

這裏面啓動的內容咱們來看看。

  • 首先調用super.start()把當前的running狀態置爲true。
  • 而後啓動zk節點的監聽(這邊的processStart是否多餘了?)。
  • 監聽路徑/otter/canal/destinations/{destination}/running節點的變化
zkClient.subscribeDataChanges(path, dataListener);
  • 這裏的dataListener是ServerRunningMonitor構造函數中定義的,就是定義一些zk節點監聽的動做。
    • 若是有數據變化,若是running節點中的內容ServerRunningData發生了變化,字段active變爲了false,並且address就是本機,說明本機出現了主動釋放,須要釋放運行時狀態。此時須要調用到processActiveExit方法,其實就是中止了本機的server中destination對應的instance。
    • 若是節點發生了刪除動做,若是上一次active的狀態就是本機,則即時觸發一下active搶佔,調用initRunning()方法,固然,若是啓動失敗,也不是當即切換,而是會等待5s,再嘗試啓動。這個啓動方法中,主要調用的是processActiveEnter()方法,來啓動了embededCanalServer.start(destination)。其實就是啓動canalInstance,這塊後續再分析。
  • 其實除了監聽器,在自己的ServerRunningMonitor的start方法中,也有initRunning方法。這塊啓動canalInstance的方法,咱們下一篇文章分析。

2.3.4 instance文件掃描啓動

在掃描以前,把destination和InstanceAction綁定到緩存中。

instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);

首先啓動一個全局掃描,而後再對應的destination配置文件的掃描。

if (autoScan) {
    instanceConfigMonitors.get(globalInstanceConfig.getMode()).start();
    for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
        if (!monitor.isStart()) {
            monitor.start();
        }
    }
}

這個start方法啓動了一個定時器,默認5s掃描一次。掃描的內容就是配置文件路徑下的內容,針對文件的新增、刪除、修改,對應InstanceAction中的start,stop和reload方法。也就是說,咱們在canal運行的過程當中,經過動態修改配置文件,來實現動態調整運行時參數,主要能夠用來進行重複消費,位點的遷移等等。

2.3.5 網絡接口啓動

CanalServerWithNetty的啓動,首先須要啓動CanalServerWithEmbedded,主要的業務邏輯在SessionHandler中。這塊實際上是暴露外部服務,給canal client進行調用。

2.4 增長關閉hook

Runtime.getRuntime().addShutdownHook(new Thread() {

    public void run() {
        try {
            logger.info("## stop the canal server");
            controller.stop();
        } catch (Throwable e) {
            logger.warn("##something goes wrong when stopping canal Server:", e);
        } finally {
            logger.info("## canal server is down.");
        }
    }

});

在server中止時,調用controller.stop()方法。

public void stop() throws Throwable {
    canalServer.stop();

    if (autoScan) {
        for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) {
            if (monitor.isStart()) {
                monitor.stop();
            }
        }
    }

    for (ServerRunningMonitor runningMonitor : ServerRunningMonitors.getRunningMonitors().values()) {
        if (runningMonitor.isStart()) {
            runningMonitor.stop();
        }
    }

    // 釋放canal的工做節點
    releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port));
    logger.info("## stop the canal server[{}:{}]", ip, port);
        
    if (zkclientx != null) {
        zkclientx.close();
    }
}

主要是中止controller,server相關的monitor,instance相關的monitor,而後釋放zk節點,關閉zk鏈接。

相關文章
相關標籤/搜索