【Canal源碼分析】Canal Instance啓動和中止

1、序列圖

1.1 啓動

instance啓動.png

1.2 中止

instance中止.png

2、源碼分析

2.1 啓動

這部分代碼其實在ServerRunningMonitor的start()方法中。針對不一樣的destination,啓動不一樣的CanalInstance。主要的方法在於initRunning()。java

private void initRunning() {
    if (!isStart()) {
        return;
    }

    String path = ZookeeperPathUtils.getDestinationServerRunning(destination);
    // 序列化
    byte[] bytes = JsonUtils.marshalToByte(serverData);
    try {
        mutex.set(false);
        zkClient.create(path, bytes, CreateMode.EPHEMERAL);
        activeData = serverData;
        processActiveEnter();// 觸發一下事件
        mutex.set(true);
    } catch (ZkNodeExistsException e) {
        bytes = zkClient.readData(path, true);
        if (bytes == null) {// 若是不存在節點,當即嘗試一次
            initRunning();
        } else {
            activeData = JsonUtils.unmarshalFromByte(bytes, ServerRunningData.class);
        }
    } catch (ZkNoNodeException e) {
        zkClient.createPersistent(ZookeeperPathUtils.getDestinationPath(destination), true); // 嘗試建立父節點
        initRunning();
    }
}

首先在zk中新增一個臨時節點,表示的是正在運行destination的ip和端口,而後觸發一下processActiveEnter()。咱們主要看下這個方法,在controller啓動時定義的。mysql

public void processActiveEnter() {
    try {
        MDC.put(CanalConstants.MDC_DESTINATION, String.valueOf(destination));
        embededCanalServer.start(destination);
    } finally {
        MDC.remove(CanalConstants.MDC_DESTINATION);
    }
}

public void start(final String destination) {
    final CanalInstance canalInstance = canalInstances.get(destination);
    if (!canalInstance.isStart()) {
        try {
            MDC.put("destination", destination);
            canalInstance.start();
            logger.info("start CanalInstances[{}] successfully", destination);
        } finally {
            MDC.remove("destination");
        }
    }
}

主要在embededCanalServer.start中,咱們看下這個canalInstance.start(),跟蹤到AbstractCanalInstance。spring

2.1.1 啓動metaManager

在默認的instance配置文件中,咱們選擇的metaManager是PeriodMixedMetaManager,定時(默認1s)刷新數據到zk中,因此咱們主要關注這個類的start方法。這個類繼承了MemoryMetaManager,首先啓動一個MemoryMetaManager,而後再啓動一個ZooKeeperMetaManager。sql

2.1.1.1 獲取全部destination和client

destinations = MigrateMap.makeComputingMap(new Function<String, List<ClientIdentity>>() {

    public List<ClientIdentity> apply(String destination) {
        return zooKeeperMetaManager.listAllSubscribeInfo(destination);
    }
});

從/otter/canal/destinations/{destination}獲取全部的client信息,返回的內容是List<ClientIdentity>,包括了destination、clientId、filter等等。緩存

2.1.1.2 獲取client指針cursor

根據ClientIdentity去zk獲取指針,從zk的/otter/canal/destinations/{destination}/{clientId}/cursor下面去獲取,返回的內容是個LogPosition。app

cursors = MigrateMap.makeComputingMap(new Function<ClientIdentity, Position>() {

    public Position apply(ClientIdentity clientIdentity) {
        Position position = zooKeeperMetaManager.getCursor(clientIdentity);
        if (position == null) {
            return nullCursor; // 返回一個空對象標識,避免出現異常
        } else {
            return position;
        }
    }
});

有可能返回一個空。ide

2.1.1.3 獲取批次batch

建立一個基於內存的MemoryClientIdentityBatch,包含位點的start、end、ack信息。而後從zk節點/otter/canal/destinations/{destination}/{clientId}/mark獲取,取出來的數據進行排序,而後從/otter/canal/destinations/{destination}/{clientId}/mark/{batchId}中取出PositionRange這個類,描述的是一個position的範圍。源碼分析

batches = MigrateMap.makeComputingMap(new Function<ClientIdentity, MemoryClientIdentityBatch>() {

    public MemoryClientIdentityBatch apply(ClientIdentity clientIdentity) {
        // 讀取一下zookeeper信息,初始化一次
        MemoryClientIdentityBatch batches = MemoryClientIdentityBatch.create(clientIdentity);
        Map<Long, PositionRange> positionRanges = zooKeeperMetaManager.listAllBatchs(clientIdentity);
        for (Map.Entry<Long, PositionRange> entry : positionRanges.entrySet()) {
            batches.addPositionRange(entry.getValue(), entry.getKey()); // 添加記錄到指定batchId
        }
        return batches;
    }
});

2.1.1.4 啓動定時刷zk任務

// 啓動定時工做任務
executor.scheduleAtFixedRate(new Runnable() {

    public void run() {
        List<ClientIdentity> tasks = new ArrayList<ClientIdentity>(updateCursorTasks);
        for (ClientIdentity clientIdentity : tasks) {
            try {
                // 定時將內存中的最新值刷到zookeeper中,屢次變動只刷一次
                zooKeeperMetaManager.updateCursor(clientIdentity, getCursor(clientIdentity));
                updateCursorTasks.remove(clientIdentity);
            } catch (Throwable e) {
                // ignore
                logger.error("period update" + clientIdentity.toString() + " curosr failed!", e);
            }
        }
    }
}, period, period, TimeUnit.MILLISECONDS);

定時刷新position到zk後,從任務中刪除。刷新的頻率爲1s。ui

2.1.2 啓動alarmHandler

這塊比較簡單。this

if (!alarmHandler.isStart()) {
    alarmHandler.start();
}

其實默認是LogAlarmHandler,用於發送告警信息的。

2.1.3 啓動eventStore

啓動EventStore,默認是MemoryEventStoreWithBuffer。start方法也比較簡單。

public void start() throws CanalStoreException {
    super.start();
    if (Integer.bitCount(bufferSize) != 1) {
        throw new IllegalArgumentException("bufferSize must be a power of 2");
    }

    indexMask = bufferSize - 1;
    entries = new Event[bufferSize];
}

2.1.4 啓動eventSink

這塊默認是EntryEventSink。這塊也不復雜。

public void start() {
    super.start();
    Assert.notNull(eventStore);

    for (CanalEventDownStreamHandler handler : getHandlers()) {
        if (!handler.isStart()) {
            handler.start();
        }
    }
}

正常的啓動,將running狀態置爲true。

2.1.5 啓動eventParser

if (!eventParser.isStart()) {
    beforeStartEventParser(eventParser);
    eventParser.start();
    afterStartEventParser(eventParser);
}

咱們分別看下。

2.1.5.1 beforeStartEventParser

protected void beforeStartEventParser(CanalEventParser eventParser) {

    boolean isGroup = (eventParser instanceof GroupEventParser);
    if (isGroup) {
        // 處理group的模式
        List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
        for (CanalEventParser singleEventParser : eventParsers) {// 須要遍歷啓動
            startEventParserInternal(singleEventParser, true);
        }
    } else {
        startEventParserInternal(eventParser, false);
    }
}

判斷是否是集羣的parser(用於分庫),若是是GroupParser,須要一個個啓動CanalEventParser。咱們主要看下startEventParserInternal方法。咱們只關注MysqlEventParser,由於他支持HA。

if (eventParser instanceof MysqlEventParser) {
    MysqlEventParser mysqlEventParser = (MysqlEventParser) eventParser;
    CanalHAController haController = mysqlEventParser.getHaController();

    if (haController instanceof HeartBeatHAController) {
        ((HeartBeatHAController) haController).setCanalHASwitchable(mysqlEventParser);
    }

    if (!haController.isStart()) {
        haController.start();
    }
}

啓動一個HeartBeatHAController。主要做用是用於當parser失敗次數超過閾值時,執行mysql的主備切換。

2.1.5.2 eventParser.start()

這裏也區分是GroupParser仍是單個的MysqlParser,其實最終都是啓動Parser,不過前者是啓動多個而已。咱們看下單個的start方法。具體實如今AbstractMysqlEventParser中

public void start() throws CanalParseException {
    if (enableTsdb) {
        if (tableMetaTSDB == null) {
            // 初始化
            tableMetaTSDB = TableMetaTSDBBuilder.build(destination, tsdbSpringXml);
        }
    }

    super.start();
}

首先若是啓用了Tsdb功能(也就是DDL後表結構的回溯),那麼須要從xml中初始化表結構源數據,而後調用AbstractEventParser的start方法。

  • 首先初始化緩衝隊列transactionBuffer,默認隊列長度爲1024
  • 初始化BinlogParser,將其running狀態置爲true
  • 啓動工做線程parseThread,開始訂閱binlog,這個線程中作的事在下一篇文章中有。

2.1.5.3 afterStartEventParser

protected void afterStartEventParser(CanalEventParser eventParser) {
    // 讀取一下歷史訂閱的filter信息
    List<ClientIdentity> clientIdentitys = metaManager.listAllSubscribeInfo(destination);
    for (ClientIdentity clientIdentity : clientIdentitys) {
        subscribeChange(clientIdentity);
    }
}

這塊訂閱的主要是filter的變化。

public boolean subscribeChange(ClientIdentity identity) {
    if (StringUtils.isNotEmpty(identity.getFilter())) {
        logger.info("subscribe filter change to " + identity.getFilter());
        AviaterRegexFilter aviaterFilter = new AviaterRegexFilter(identity.getFilter());

        boolean isGroup = (eventParser instanceof GroupEventParser);
        if (isGroup) {
            // 處理group的模式
            List<CanalEventParser> eventParsers = ((GroupEventParser) eventParser).getEventParsers();
            for (CanalEventParser singleEventParser : eventParsers) {// 須要遍歷啓動
                ((AbstractEventParser) singleEventParser).setEventFilter(aviaterFilter);
            }
        } else {
            ((AbstractEventParser) eventParser).setEventFilter(aviaterFilter);
        }

    }

    // filter的處理規則
    // a. parser處理數據過濾處理
    // b. sink處理數據的路由&分發,一份parse數據通過sink後能夠分發爲多份,每份的數據能夠根據本身的過濾規則不一樣而有不一樣的數據
    // 後續內存版的一對多分發,能夠考慮
    return true;
}

至此,CanalInstance啓動成功。

2.2 中止

一樣的,中止的觸發也是在ServerRunningMonitor中,中止的代碼以下:

public void stop() {
    super.stop();
    logger.info("stop CannalInstance for {}-{} ", new Object[] { canalId, destination });

    if (eventParser.isStart()) {
        beforeStopEventParser(eventParser);
        eventParser.stop();
        afterStopEventParser(eventParser);
    }

    if (eventSink.isStart()) {
        eventSink.stop();
    }

    if (eventStore.isStart()) {
        eventStore.stop();
    }

    if (metaManager.isStart()) {
        metaManager.stop();
    }

    if (alarmHandler.isStart()) {
        alarmHandler.stop();
    }

    logger.info("stop successful....");
}

2.2.1 中止EventParser

和啓動同樣,在先後也能夠作一些事情。

  • 中止前,目前默認什麼都不作;
  • 中止時,咱們主要看MysqlEventParser
    • 首先斷開mysql的鏈接
    • 清理緩存中表結構源數據tableMetaCache.clearTableMeta()
    • 調用AbstractMysqlEventParser的stop方法,首先從spring上下文中,刪除tableMetaTSDB。而後調用AbstractEventParser中的stop方法。
public void stop() {
    super.stop();

    stopHeartBeat(); // 先中止心跳
    parseThread.interrupt(); // 嘗試中斷
    eventSink.interrupt();
    try {
        parseThread.join();// 等待其結束
    } catch (InterruptedException e) {
        // ignore
    }

    if (binlogParser.isStart()) {
        binlogParser.stop();
    }
    if (transactionBuffer.isStart()) {
        transactionBuffer.stop();
    }
}

首先關閉心跳的定時器,而後中斷解析線程,等待當前運行的任務結束後,中止binlogParser,清空transactionBuffer。這裏看下怎麼清空transactionBuffer的。

public void stop() throws CanalStoreException {
    putSequence.set(INIT_SQEUENCE);
    flushSequence.set(INIT_SQEUENCE);

    entries = null;
    super.stop();
}

將put和flush的序列置爲初始序列,也就是再也不容許向隊列中put數據。

中止parser後,中止位點管理和HAController。其實只是將running置爲false。

2.2.2 中止EventSink

相似於啓動,中止也不復雜。

public void stop() {
    super.stop();

    for (CanalEventDownStreamHandler handler : getHandlers()) {
        if (handler.isStart()) {
            handler.stop();
        }
    }
}

2.2.3 中止EventStore

主要部分在這邊

public void cleanAll() throws CanalStoreException {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        putSequence.set(INIT_SQEUENCE);
        getSequence.set(INIT_SQEUENCE);
        ackSequence.set(INIT_SQEUENCE);

        putMemSize.set(0);
        getMemSize.set(0);
        ackMemSize.set(0);
        entries = null;
        // for (int i = 0; i < entries.length; i++) {
        // entries[i] = null;
        // }
    } finally {
        lock.unlock();
    }
}

其實也是將RingBuffer的指針置爲初始值。

2.2.4 中止metaManager

咱們看下PeriodMixedMetaManager。主要調用了兩塊的stop,一個是MemoryMetaManager,另外一個是ZooKeeperMetaManager。清理內存中的數據,而後讓zk的管理器running狀態改成false。

2.2.5 中止alarmHandler

將running狀態置爲false。

相關文章
相關標籤/搜索