這部分代碼其實在ServerRunningMonitor的start()方法中。針對不一樣的destination,啓動不一樣的CanalInstance。主要的方法在於initRunning()。java
private void initRunning() { if (!isStart()) { return; } String path = ZookeeperPathUtils.getDestinationServerRunning(destination); // 序列化 byte[] bytes = JsonUtils.marshalToByte(serverData); try { mutex.set(false); zkClient.create(path, bytes, CreateMode.EPHEMERAL); activeData = serverData; processActiveEnter();// 觸發一下事件 mutex.set(true); } catch (ZkNodeExistsException e) { bytes = zkClient.readData(path, true); if (bytes == null) {// 若是不存在節點,當即嘗試一次 initRunning(); } else { activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class); } } catch (ZkNoNodeException e) { zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 嘗試建立父節點 initRunning(); } }
首先在zk中新增一個臨時節點,表示的是正在運行destination的ip和端口,而後觸發一下processActiveEnter()。咱們主要看下這個方法,在controller啓動時定義的。mysql
public void processActiveEnter() { try { MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination)); embededCanalServer.start(destination); } finally { MDC.remove(CanalConstants.MDC_DESTINATION); } } public void start(final String destination) { final CanalInstance canalInstance = canalInstances.get(destination); if (!canalInstance.isStart()) { try { MDC.put("destination", destination); canalInstance.start(); logger.info("start CanalInstances[{}] successfully", destination); } finally { MDC.remove("destination"); } } }
主要在embededCanalServer.start中,咱們看下這個canalInstance.start(),跟蹤到AbstractCanalInstance。spring
在默認的instance配置文件中,咱們選擇的metaManager是PeriodMixedMetaManager,定時(默認1s)刷新數據到zk中,因此咱們主要關注這個類的start方法。這個類繼承了MemoryMetaManager,首先啓動一個MemoryMetaManager,而後再啓動一個ZooKeeperMetaManager。sql
destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() { public List<ClientIdentity> apply(String destination) { return zooKeeperMetaManager.listAllSubscribeInfo(destination); } });
從/otter/canal/destinations/{destination}獲取全部的client信息,返回的內容是List<ClientIdentity>,包括了destination、clientId、filter等等。緩存
根據ClientIdentity去zk獲取指針,從zk的/otter/canal/destinations/{destination}/{clientId}/cursor下面去獲取,返回的內容是個LogPosition。app
cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>() { public Position apply(ClientIdentity clientIdentity) { Position position = zooKeeperMetaManager.getCursor(clientIdentity); if (position == null) { return nullCursor; // 返回一個空對象標識,避免出現異常 } else { return position; } } });
有可能返回一個空。ide
建立一個基於內存的MemoryClientIdentityBatch,包含位點的start、end、ack信息。而後從zk節點/otter/canal/destinations/{destination}/{clientId}/mark獲取,取出來的數據進行排序,而後從/otter/canal/destinations/{destination}/{clientId}/mark/{batchId}中取出PositionRange這個類,描述的是一個position的範圍。源碼分析
batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() { public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) { // 讀取一下zookeeper信息,初始化一次 MemoryClientIdentityBatch batches = MemoryClientIdentityBatch.create(clientIdentity); Map<Long, PositionRange> positionRanges = zooKeeperMetaManager.listAllBatchs(clientIdentity); for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet()) { batches.addPositionRange(entry.getValue(), entry.getKey()); // 添加記錄到指定batchId } return batches; } });
// 啓動定時工做任務 executor.scheduleAtFixedRate(new Runnable() { public void run() { List<ClientIdentity> tasks = new ArrayList<ClientIdentity>(updateCursorTasks); for (ClientIdentity clientIdentity : tasks) { try { // 定時將內存中的最新值刷到zookeeper中,屢次變動只刷一次 zooKeeperMetaManager.updateCursor(clientIdentity, getCursor(clientIdentity)); updateCursorTasks.remove(clientIdentity); } catch (Throwable e) { // ignore logger.error("period update" + clientIdentity.toString() + " curosr failed!", e); } } } }, period, period, TimeUnit.MILLISECONDS);
定時刷新position到zk後,從任務中刪除。刷新的頻率爲1s。ui
這塊比較簡單。this
if (!alarmHandler.isStart()) { alarmHandler.start(); }
其實默認是LogAlarmHandler,用於發送告警信息的。
啓動EventStore,默認是MemoryEventStoreWithBuffer。start方法也比較簡單。
public void start() throws CanalStoreException { super.start(); if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } indexMask = bufferSize - 1; entries = new Event[bufferSize]; }
這塊默認是EntryEventSink。這塊也不復雜。
public void start() { super.start(); Assert.notNull(eventStore); for (CanalEventDownStreamHandler handler : getHandlers()) { if (!handler.isStart()) { handler.start(); } } }
正常的啓動,將running狀態置爲true。
if (!eventParser.isStart()) { beforeStartEventParser(eventParser); eventParser.start(); afterStartEventParser(eventParser); }
咱們分別看下。
protected void beforeStartEventParser(CanalEventParser eventParser) { boolean isGroup = (eventParser instanceof GroupEventParser); if (isGroup) { // 處理group的模式 List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers(); for (CanalEventParser singleEventParser : eventParsers) {// 須要遍歷啓動 startEventParserInternal(singleEventParser, true); } } else { startEventParserInternal(eventParser, false); } }
判斷是否是集羣的parser(用於分庫),若是是GroupParser,須要一個個啓動CanalEventParser。咱們主要看下startEventParserInternal方法。咱們只關注MysqlEventParser,由於他支持HA。
if (eventParser instanceof MysqlEventParser) { MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser; CanalHAController haController = mysqlEventParser.getHaController(); if (haController instanceof HeartBeatHAController) { ((HeartBeatHAController) haController).setCanalHASwitchable(mysqlEventParser); } if (!haController.isStart()) { haController.start(); } }
啓動一個HeartBeatHAController。主要做用是用於當parser失敗次數超過閾值時,執行mysql的主備切換。
這裏也區分是GroupParser仍是單個的MysqlParser,其實最終都是啓動Parser,不過前者是啓動多個而已。咱們看下單個的start方法。具體實如今AbstractMysqlEventParser中
public void start() throws CanalParseException { if (enableTsdb) { if (tableMetaTSDB == null) { // 初始化 tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml); } } super.start(); }
首先若是啓用了Tsdb功能(也就是DDL後表結構的回溯),那麼須要從xml中初始化表結構源數據,而後調用AbstractEventParser的start方法。
protected void afterStartEventParser(CanalEventParser eventParser) { // 讀取一下歷史訂閱的filter信息 List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination); for (ClientIdentity clientIdentity : clientIdentitys) { subscribeChange(clientIdentity); } }
這塊訂閱的主要是filter的變化。
public boolean subscribeChange(ClientIdentity identity) { if (StringUtils.isNotEmpty(identity.getFilter())) { logger.info("subscribe filter change to " + identity.getFilter()); AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter()); boolean isGroup = (eventParser instanceof GroupEventParser); if (isGroup) { // 處理group的模式 List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers(); for (CanalEventParser singleEventParser : eventParsers) {// 須要遍歷啓動 ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter); } } else { ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter); } } // filter的處理規則 // a. parser處理數據過濾處理 // b. sink處理數據的路由&分發,一份parse數據通過sink後能夠分發爲多份,每份的數據能夠根據本身的過濾規則不一樣而有不一樣的數據 // 後續內存版的一對多分發,能夠考慮 return true; }
至此,CanalInstance啓動成功。
一樣的,中止的觸發也是在ServerRunningMonitor中,中止的代碼以下:
public void stop() { super.stop(); logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination }); if (eventParser.isStart()) { beforeStopEventParser(eventParser); eventParser.stop(); afterStopEventParser(eventParser); } if (eventSink.isStart()) { eventSink.stop(); } if (eventStore.isStart()) { eventStore.stop(); } if (metaManager.isStart()) { metaManager.stop(); } if (alarmHandler.isStart()) { alarmHandler.stop(); } logger.info("stop successful...."); }
和啓動同樣,在先後也能夠作一些事情。
public void stop() { super.stop(); stopHeartBeat(); // 先中止心跳 parseThread.interrupt(); // 嘗試中斷 eventSink.interrupt(); try { parseThread.join();// 等待其結束 } catch (InterruptedException e) { // ignore } if (binlogParser.isStart()) { binlogParser.stop(); } if (transactionBuffer.isStart()) { transactionBuffer.stop(); } }
首先關閉心跳的定時器,而後中斷解析線程,等待當前運行的任務結束後,中止binlogParser,清空transactionBuffer。這裏看下怎麼清空transactionBuffer的。
public void stop() throws CanalStoreException { putSequence.set(INIT_SQEUENCE); flushSequence.set(INIT_SQEUENCE); entries = null; super.stop(); }
將put和flush的序列置爲初始序列,也就是再也不容許向隊列中put數據。
中止parser後,中止位點管理和HAController。其實只是將running置爲false。
相似於啓動,中止也不復雜。
public void stop() { super.stop(); for (CanalEventDownStreamHandler handler : getHandlers()) { if (handler.isStart()) { handler.stop(); } } }
主要部分在這邊
public void cleanAll() throws CanalStoreException { final ReentrantLock lock = this.lock; lock.lock(); try { putSequence.set(INIT_SQEUENCE); getSequence.set(INIT_SQEUENCE); ackSequence.set(INIT_SQEUENCE); putMemSize.set(0); getMemSize.set(0); ackMemSize.set(0); entries = null; // for (int i = 0; i < entries.length; i++) { // entries[i] = null; // } } finally { lock.unlock(); } }
其實也是將RingBuffer的指針置爲初始值。
咱們看下PeriodMixedMetaManager。主要調用了兩塊的stop,一個是MemoryMetaManager,另外一個是ZooKeeperMetaManager。清理內存中的數據,而後讓zk的管理器running狀態改成false。
將running狀態置爲false。