本文主要解析下canal server的啓動過程,但願能有所收穫。java
整個server啓動的過程比較複雜,看圖難以理解,須要輔以文字說明。spring
首先程序的入口在CanalLauncher的main方法中。緩存
String conf = System.getProperty("canal.conf", "classpath:canal.properties"); Properties properties = new Properties(); if (conf.startsWith(CLASSPATH_URL_PREFIX)) { conf = StringUtils.substringAfter(conf, CLASSPATH_URL_PREFIX); properties.load(CanalLauncher.class.getClassLoader().getResourceAsStream(conf)); } else { properties.load(new FileInputStream(conf)); }
從canal.properties文件中load全部的配置信息,加載到上下文中。再也不贅述。網絡
根據配置文件來構造CanalController,這塊的代碼比較多,主要分爲七個步驟,具體以下。socket
調用initGlobalConfig方法,過程以下:ide
這塊邏輯在CanalController的initGlobalConfig方法中。函數
這塊的邏輯是從instance.properties裏面初始化實例。源碼分析
private void initInstanceConfig(Properties properties) { String destinationStr = getProperty(properties, CanalConstants.CANAL_DESTINATIONS); String[] destinations = StringUtils.split(destinationStr, CanalConstants.CANAL_DESTINATION_SPLIT); for (String destination : destinations) { InstanceConfig config = parseInstanceConfig(properties, destination); InstanceConfig oldConfig = instanceConfigs.put(destination, config); if (oldConfig != null) { logger.warn("destination:{} old config:{} has replace by new config:{}", new Object[] { destination, oldConfig, config }); } } }
從這段代碼中能夠看出,咱們在一個canal.properties文件中,能夠配置多個destination,也就是能夠配置多個instance,不一樣的instance以逗號隔開。這裏主要看的是parseInstanceConfig()方法,裏面的邏輯以下:spa
從配置文件中獲取canal.socketChannel字段,放到全局變量中。code
從配置文件中分別獲取canal.id、ip、port(對外提供socket服務的端口),獲取一個內存級的server單例,同時也獲取一個對外提供Netty服務的單例。
cid = Long.valueOf(getProperty(properties, CanalConstants.CANAL_ID)); ip = getProperty(properties, CanalConstants.CANAL_IP); port = Integer.valueOf(getProperty(properties, CanalConstants.CANAL_PORT)); embededCanalServer = CanalServerWithEmbedded.instance(); embededCanalServer.setCanalInstanceGenerator(instanceGenerator);// 設置自定義的instanceGenerator canalServer = CanalServerWithNetty.instance(); canalServer.setIp(ip); canalServer.setPort(port);
從配置文件中獲取zk地址(canal.zkServers),啓動一個zk客戶端,而後初始化兩個系統目錄,分別是:
根據destination構造運行時監控,其實就是根據instance名來構造ServerRunningMonitor。其實就是實現了ServerRunningListener中的一些方法。
public interface ServerRunningListener { /** * 啓動時回調作點事情 */ public void processStart(); /** * 關閉時回調作點事情 */ public void processStop(); /** * 觸發如今輪到本身作爲active,須要載入上一個active的上下文數據 */ public void processActiveEnter(); /** * 觸發一下當前active模式失敗 */ public void processActiveExit(); }
而後初始化一下ServerRunningMonitor。
runningMonitor.init();
這個init方法跟蹤的結果,其實就是執行了ServerRunningListener中的processStart方法。
public void processStart() { try { if (zkclientx != null) { final String path = ZookeeperPathUtils.getDestinationClusterNode(destination, ip + ":" + port); initCid(path); zkclientx.subscribeStateChanges(new IZkStateListener() { public void handleStateChanged(KeeperState state) throws Exception { } public void handleNewSession() throws Exception { initCid(path); } @Override public void handleSessionEstablishmentError(Throwable error) throws Exception { logger.error("failed to connect to zookeeper", error); } }); } } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } }
首先獲取了/otter/canal/destinations/{destination}/cluster/ip:port的內容,其實就是server的地址,最後一個ip:port是個zk的臨時節點。而後訂閱一下節點事件,當節點有事件推送過來後,作一些動做。
若是canal.auto.scan配置爲true(默認爲true),首先定義一個InstanceAction,包含了啓動、中止、重啓instance的動做。
定義一個SpringInstanceConfigMonitor,配置定時掃描的事件爲canal.auto.scan.interval,默認5s,掃描canal.conf.dir目錄下的文件,與上面定義的InstanceAction結合起來。
上面的構造方法其實就是定義一些必要的內容,真正的啓動在這個方法中。
建立臨時節點/otter/canal/cluster/ip:port,同時啓動監聽器.
embededCanalServer.start();
這個start裏面,一個是將當前server的running狀態置爲true,同時根據destination構建CanalInstance。
遍歷Map<String, InstanceConfig>中的InstanceConfig,若是CanalInsance還沒啓動,若是不是懶加載的話,直接HA啓動ServerRunningMonitor。
ServerRunningMonitor runningMonitor = ServerRunningMonitors.getRunningMonitor(destination); if (!config.getLazy() && !runningMonitor.isStart()) { runningMonitor.start(); } public synchronized void start() { super.start(); try { processStart(); if (zkClient != null) { // 若是須要儘量釋放instance資源,不須要監聽running節點,否則即便stop了這臺機器,另外一臺機器立馬會start String path = ZookeeperPathUtils.getDestinationServerRunning(destination); zkClient.subscribeDataChanges(path, dataListener); initRunning(); } else { processActiveEnter();// 沒有zk,直接啓動 } } catch (Exception e) { logger.error("start failed", e); // 沒有正常啓動,重置一下狀態,避免干擾下一次start stop(); } }
這裏面啓動的內容咱們來看看。
zkClient.subscribeDataChanges(path, dataListener);
在掃描以前,把destination和InstanceAction綁定到緩存中。
instanceConfigMonitors.get(config.getMode()).register(destination, defaultAction);
首先啓動一個全局掃描,而後再對應的destination配置文件的掃描。
if (autoScan) { instanceConfigMonitors.get(globalInstanceConfig.getMode()).start(); for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (!monitor.isStart()) { monitor.start(); } } }
這個start方法啓動了一個定時器,默認5s掃描一次。掃描的內容就是配置文件路徑下的內容,針對文件的新增、刪除、修改,對應InstanceAction中的start,stop和reload方法。也就是說,咱們在canal運行的過程當中,經過動態修改配置文件,來實現動態調整運行時參數,主要能夠用來進行重複消費,位點的遷移等等。
CanalServerWithNetty的啓動,首先須要啓動CanalServerWithEmbedded,主要的業務邏輯在SessionHandler中。這塊實際上是暴露外部服務,給canal client進行調用。
Runtime.getRuntime().addShutdownHook(new Thread() { public void run() { try { logger.info("## stop the canal server"); controller.stop(); } catch (Throwable e) { logger.warn("##something goes wrong when stopping canal Server:", e); } finally { logger.info("## canal server is down."); } } });
在server中止時,調用controller.stop()方法。
public void stop() throws Throwable { canalServer.stop(); if (autoScan) { for (InstanceConfigMonitor monitor : instanceConfigMonitors.values()) { if (monitor.isStart()) { monitor.stop(); } } } for (ServerRunningMonitor runningMonitor : ServerRunningMonitors.getRunningMonitors().values()) { if (runningMonitor.isStart()) { runningMonitor.stop(); } } // 釋放canal的工做節點 releaseCid(ZookeeperPathUtils.getCanalClusterNode(ip + ":" + port)); logger.info("## stop the canal server[{}:{}]", ip, port); if (zkclientx != null) { zkclientx.close(); } }
主要是中止controller,server相關的monitor,instance相關的monitor,而後釋放zk節點,關閉zk鏈接。