【源碼分析】Canal之Binlog的尋找過程

binlog的尋找過程可能的場景以下:java

  • instance第一次啓動
  • 發生數據庫主備切換
  • canal server HA狀況下的切換

因此這個過程是可以保證binlog不丟失的關鍵點。mysql

本文從源碼的角度來分析下啓動過程當中的binlog尋找過程。sql

1、流程圖

下圖是根據源碼畫出的流程圖,須要結合源碼分析來一塊兒看。
流程圖數據庫

2、源碼分析

入口在AbstractEventParser的start()方法中,這個start方法實際上是instance的整個啓動過程。具體啓動過程當中都作了哪些事情,請見另外一篇文章的分析。這塊再也不贅述。咱們主要看的地方是緩存

// 4. 獲取最後的位置信息
EntryPosition position = findStartPosition(erosaConnection);

這一行就是獲取binlog的解析位置,也是本文着重要分析的地方。由於咱們目前所配置的都是MysqlEventParser,因此咱們分析的也是這個類中的相關代碼。服務器

protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException {
    if (isGTIDMode()) {
        // GTID模式下,CanalLogPositionManager裏取最後的gtid,沒有則取instanc配置中的
        LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination);
        if (logPosition != null) {
            return logPosition.getPostion();
        }

        if (StringUtils.isNotEmpty(masterPosition.getGtid())) {
            return masterPosition;
        }
    }

    EntryPosition startPosition = findStartPositionInternal(connection);
    if (needTransactionPosition.get()) {
        logger.warn("prepare to find last position : {}", startPosition.toString());
        Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition);
        if (!preTransactionStartPosition.equals(startPosition.getPosition())) {
            logger.warn("find new start Transaction Position , old : {} , new : {}",
                startPosition.getPosition(),
                preTransactionStartPosition);
            startPosition.setPosition(preTransactionStartPosition);
        }
        needTransactionPosition.compareAndSet(true, false);
    }
    return startPosition;
}

2.1 GTID模式

咱們目前的數據庫架構通常都是M-S,因此binlog的位點極可能不一致,這就須要開啓數據庫GTID模式(經過在instance.properties中配置canal.instance.gtidon=true便可開啓),這是一個全局的事務ID,可以防止主從位點不一致的狀況下,找不到位點的問題。目前這塊是從CanalLogPositionManager中取最後的GTID。default-instance.xml中,使用的CanalLogPositionManager是FailbackLogPositionManager,一個兩級的位點管理器,XML配置以下:架構

<!-- 解析位點記錄 -->
<property name="logPositionManager">
    <bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager">
        <constructor-arg>
            <bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" />
        </constructor-arg>
        <constructor-arg>
            <bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager">
                <constructor-arg ref="metaManager"/>
            </bean>
        </constructor-arg>
    </bean>
</property>

一級是放到本地緩存中,第二級直接打了個info日誌,有點弱,其實考慮的狀況是性能,另外一個考慮多是由於DB的主從切換,並不會致使instance掛掉,內存中仍是存儲了以前DB的一些解析位點信息。其實都沒有放到zk中,不利於作HA,因此這塊目前還不是很完善,真正要使用GTID的話,須要對CanalLogPositionManager進行修改。目前已經提供了其餘的一些實現,包括定時刷新到zk中等等。源碼分析

若是CanalLogPositionManager中沒有存儲的話,也能夠在instance.properties裏面指定位點和GTID信息,也能從binlog中獲取。性能

2.2 非GTID模式

若是canal沒有開啓GTID模式,那麼咱們就須要走一個binlog的尋找過程。fetch

EntryPosition startPosition = findStartPositionInternal(connection);

這個方法是個冗長的方法,裏面的判斷邏輯就是上面的流程圖,咱們來梳理一下。

首先仍是從CanalLogPositionManager中獲取,也就是基本上從內存中獲取LogPosition。

LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);

2.2.1 內存中不存在LogPosition

2.2.1.1

首先判斷配置文件中的主庫信息是否與當前的數據庫鏈接connection的地址一致,若是一致,若是一致,那麼直接取properties文件中的master的位點信息。

2.2.1.2

若是主庫不一致,那麼判斷從庫standby的connection地址,若是是從庫,那麼直接取從庫的位點信息。

咱們能夠在xml配置中看到properties的一些信息。

<!-- 解析起始位點 -->
<property name="masterPosition">
    <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
        <property name="journalName" value="${canal.instance.master.journal.name}" />
        <property name="position" value="${canal.instance.master.position}" />
        <property name="timestamp" value="${canal.instance.master.timestamp}" />
        <property name="gtid" value="${canal.instance.master.gtid}" />
    </bean>
</property>
<property name="standbyPosition">
    <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition">
        <property name="journalName" value="${canal.instance.standby.journal.name}" />
        <property name="position" value="${canal.instance.standby.position}" />
        <property name="timestamp" value="${canal.instance.standby.timestamp}" />
        <property name="gtid" value="${canal.instance.standby.gtid}" />
    </bean>
</property>

2.2.1.3

若是內存中沒有,配置文件中也沒有,那麼系統默認從當前時間開始消費。

entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默認從當前最後一個位置進行消費

protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) {
    MysqlConnection mysqlConnection = (MysqlConnection) connection;
    final EntryPosition endPosition = findEndPosition(mysqlConnection);//獲取當前最新的位點信息
    if (tableMetaTSDB != null) {
        long startTimestamp = System.currentTimeMillis();
        return findAsPerTimestampInSpecificLogFile(mysqlConnection,
            startTimestamp,
            endPosition,
            endPosition.getJournalName(),
            true);
    } else {
        return endPosition;
    }
}

這裏的findEndPosition()方法,其實就是執行了一個Mysql命令:

show master status

返回的內容中,包含binlog文件信息和位點position,甚至包括GTID信息。

找到了最新的binlog位點信息後,根據當前時間戳和binlog的時間戳等信息,去服務器上面尋找binlog。其實邏輯基本上都在findAsPerTimestampInSpecificLogFile()中,這個方法是根據時間戳去尋找,離時間戳最近(小於時間戳)的一個事務起始位置。因爲這塊的代碼比較長,因此咱們只作分析,不作代碼粘貼,具體的代碼在MysqlEventParser這個類中。整個尋找的過程以下:

先看一下這個seek的過程,見代碼註釋:

/**
 * 加速主備切換時的查找速度,作一些特殊優化,好比只解析事務頭或者尾
 */
public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
    updateSettings();//在mysql中執行一些dump以前的命令

    sendBinlogDump(binlogfilename, binlogPosition);//指定位點和binlog文件,發送dump命令,COM_BINLOG_DUMP
    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());//開始獲取
    LogDecoder decoder = new LogDecoder();
    decoder.handle(LogEvent.ROTATE_EVENT);
    decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT);
    decoder.handle(LogEvent.QUERY_EVENT);
    decoder.handle(LogEvent.XID_EVENT);
    LogContext context = new LogContext();
    while (fetcher.fetch()) {//遍歷獲取
        LogEvent event = null;
        event = decoder.decode(fetcher, context);//解析爲event

        if (event == null) {
            throw new CanalParseException("parse failed");
        }

        if (!func.sink(event)) {//調用SinkFunction.sink()過濾
            break;
        }
    }
}

下面咱們看下數據過濾這塊:

  • 起始位置爲4,也就是跳過一個魔法值,具體能夠看binlog的結構說明
  • 以後就是一個過濾的過程
    • 首先把事件event解析一個entry,這個entry使用的是消息模型EntryProtocol.proto
    • 首先判斷當前事件是否爲事務開始或者結束的位置,若是是,判斷事件的時間,若是在咱們須要的時間以後,直接過濾這條entry
    • 若是當前entry的binlog文件名和最新的binlog文件名相同,而且最新的位點小於entry的位點,那麼直接過濾
    • 若是當前entry的類型表示的是事務開始或者事務結束,那麼直接取當前entry的位點信息,利用當前entry構建位點信息,也就是找到了咱們須要的事務起點。

2.2.1.4

若是binlog文件名爲空,首先判斷時間戳是否存在,若是存在,那麼直接按照時間戳去取,不然默認從當前最後一個位置進行消費。

// 若是沒有指定binlogName,嘗試按照timestamp進行查找
if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) {
    logger.warn("prepare to find start position {}:{}:{}",
        new Object[] { "", "", entryPosition.getTimestamp() });
    return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp());
} else {
    logger.warn("prepare to find start position just show master status");
    return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默認從當前最後一個位置進行消費
}

這塊咱們看下findByStartTimestamp()這個方法,也就是隻根據時間來查找binlog。這塊的邏輯是這樣的:

  • 首先獲取最新和最老的binlog文件
  • 從最新的binlog中,根據時間去找,調用的方法也是findAsPerTimestampInSpecificLogFile()
  • 若是已經從最新的到最老的binlog文件中找遍了,沒找到,說明根本沒有對應時間的binlog
  • 不然不斷的遍歷binlog文件,由於binlog文件名的後綴都是連續的,因此能夠很快的尋找

2.2.1.5

binlog文件名不爲空,首先判斷是否有位點信息,若是有的話,直接根據當前內存中存儲的位點和文件信息去Mysql獲取。

不然,根據當前內存中管理的時間戳去獲取,根據時間戳和binlog文件名去獲取位點。固然,若是時間戳也不存在,直接從binlog文件名的文件開頭去獲取binlog。

2.2.2 內存中存在歷史成功記錄

2.2.2.1 內存中的位點信息對應的數據庫ip和當前鏈接的ip一致

若是dump錯誤的次數超過了必定的閾值,默認是2次,也就是連續幾回定位失敗,有幾種狀況:

  • binlog位點被刪除
  • vip模式的mysql,發生了主備切換

這種須要進行一次判斷,判斷內容:

boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null)
                && logPosition.getPostion().getServerId() != null
                && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));

判斷幾個,第一個配置文件中的standby爲空,第二個內存中的logPosition存在數據庫ip,第三個內存中的logPosition的數據庫ip和當前數據庫鏈接connection的數據庫ip不一致。

知足這三個條件,說明發生了vip的主備切換,此時須要把logPosition中的時間戳向前推一個回退時間,默認60s,而後根據新的時間戳去找binlog文件和位點信息。

if (case2) {
    long timestamp = logPosition.getPostion().getTimestamp();
    long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000;
    logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "",
            logPosition.getPostion().getTimestamp() });
    EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp);
    // 從新置爲一下
    dumpErrorCount = 0;
    return findPosition;
}

2.2.2.2 不一致的狀況

說明發生了主從切換,這種狀況下,直接把logPosition中的時間回退60s,而後根據回退後的時間去binlog中尋找,而後返回。

相關文章
相關標籤/搜索