【Canal源碼分析】parser工做過程

本文主要分析的部分是instance啓動時,parser的一個啓動和工做過程。主要關注的是AbstractEventParser的start()方法中的parseThread。html

1、序列圖

輸入圖片說明

2、源碼分析

parseThread中包含的內容比較清晰,代碼不是很長,咱們逐步分析下。java

2.1 構造數據庫鏈接

erosaConnection = buildErosaConnection();

這裏構造的,應該是一個mysql的連接,包括的內容都是從配置文件中過來的一些信息,包括mysql的地址,帳號密碼等。mysql

2.2 啓動心跳線程

startHeartBeat(erosaConnection);

這裏的心跳,感受是個假的心跳,並無用到connection相關的內容。啓動一個定時任務,默認3s發送一個心跳的binlog給sink階段,表名parser還在工做。在sink階段,會把心跳的binlog直接過濾,不會走到store過程。sql

2.3 dump以前準備工做

這一步的代碼也不復雜。數據庫

preDump(erosaConnection);

咱們看看preDump都可以作什麼?在MysqlEventParser中,咱們能夠看到,主要作了幾件事:緩存

  • 針對binlog格式進行過濾,也就是咱們在配置文件中指定binlog的格式,不過目前咱們默認的都是ROW模式。
  • 針對binlog image進行過濾,目前默認是FULL,也就是binlog記錄的是變動先後的數據,若是配置爲minimal,那麼只記錄變動後的值,能夠減小binlog的文件大小。
  • 構造表結構源數據的緩存TableMetaCache

2.4 獲取最後的位置信息

這一步是比較核心的,也是保證binlog不丟失的核心代碼。dom

EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
if (startPosition == null) {
    throw new CanalParseException("can't find start position for " + destination);
}

if (!processTableMeta(startPosition)) {
    throw new CanalParseException("can't find init table meta for " + destination
                                  + " with position : " + startPosition);
}

具體的findStartPosition是怎麼實現的,請查閱下一篇文章源碼分析

若是沒有找到最後的位置信息,那麼直接拋出異常,不然還要進行一次判斷,也就是processTableMeta,咱們看下這個方法作了什麼。fetch

protected boolean processTableMeta(EntryPosition position) {
    if (isGTIDMode()) {
        if (binlogParser instanceof LogEventConvert) {
            // 記錄gtid
            ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid()));
        }
    }

    if (tableMetaTSDB != null) {
        if (position.getTimestamp() == null || position.getTimestamp() <= 0) {
            throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0");
        }

        return tableMetaTSDB.rollback(position);
    }

    return true;
}

若是開啓了GTID模式,那麼直接設置GTID集合。若是tableMetaTSDB不爲空,那麼直接根據位置信息回滾到對應的表結構。這個tableMetaTSDB記錄的是一個表結構的時序,使用的是Druid的一個功能,把全部DDL記錄在數據庫中,通常來講,每24小時生成一份快照插入到數據庫中,這樣能解決DDL產生的表結構不一致的問題,也就是增長了一個表結構的回溯功能。ui

這邊的rollback主要作的事情爲:

  • 根據位置信息position從數據庫去查詢對應的信息,包括binlog文件名、位點等。而後記錄到內存中,使用的Druid的SchemaRepository.console方法。

2.5 開始dump數據

在dump以前,代碼中構造了一個sink類,也就是SinkFunction。裏面定義了一個sink方法,主要的內容是對哪些數據進行過濾。

try {
    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false);

    if (!running) {
        return false;
    }

    if (entry != null) {
        exception = null; // 有正常數據流過,清空exception
        transactionBuffer.add(entry);
        // 記錄一下對應的positions
        this.lastPosition = buildLastPosition(entry);
        // 記錄一下最後一次有數據的時間
        lastEntryTime = System.currentTimeMillis();
    }
    return running;
} catch (TableIdNotFoundException e) {
    throw e;
} catch (Throwable e) {
    if (e.getCause() instanceof TableIdNotFoundException) {
        throw (TableIdNotFoundException) e.getCause();
    }
    // 記錄一下,出錯的位點信息
    processSinkError(e,
        this.lastPosition,
        startPosition.getJournalName(),
        startPosition.getPosition());
    throw new CanalParseException(e); // 繼續拋出異常,讓上層統一感知
}

首先判斷parser是否在運行,若是不運行,那麼就直接拋棄。運行時,判斷entry是否爲空,不爲空的狀況下,直接將entry加入到transactionBuffer中。這裏咱們說下這個transactionBuffer,其實相似於Disruptor中的一個環形隊列(默認長度爲1024),維護了幾個指針,包括put、get、ack三個指針,裏面存儲了須要進行傳遞到下一階段的數據。

加到環形隊列以後,記錄一下當前的位置信息和時間。若是這個過程出錯了,須要記錄下出錯的位置信息,這裏的processSinkError其實就是打印了一下錯誤日誌,而後拋出了一個CanalException,讓上一層感知。

說了這麼多,還沒到真正開始dump的地方。下面開始吧。

if (isGTIDMode()) {
    erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler);
} else {
    if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
        erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
    } else {
        erosaConnection.dump(startPosition.getJournalName(),
                startPosition.getPosition(),
                sinkHandler);
    }
}

在新版本中,增長了GTID的模式,因此這裏的dump須要判斷怎麼dump,發送什麼命令給mysql來獲取什麼樣的binlog。

2.5.1 GTID模式

若是開啓了GTID模式(在instance.properties開啓),那麼須要發送COM_BINLOG_DUMP_GTID命令,而後開始接受binlog信息,進行binlog處理。

public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException {
    updateSettings();
    sendBinlogDumpGTID(gtidSet);

    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());
    LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
    LogContext context = new LogContext();
    while (fetcher.fetch()) {
        LogEvent event = null;
        event = decoder.decode(fetcher, context);

        if (event == null) {
            throw new CanalParseException("parse failed");
        }

        if (!func.sink(event)) {
            break;
        }
    }
}

調用LogDecoder.decode方法,對二進制進行解析,解析爲咱們須要的LogEvent,若是解析失敗,拋出異常。不然進行sink,若是sink返回的false,那麼直接跳過,不然加入到transactionBuffer中。

2.5.2 非GTID模式

這塊有個邏輯判斷,若是找到的最後的位置信息中包含了時間戳,若是沒有binlog文件名,那麼在MysqlConnection中直接報錯,也就是必須既要有時間戳,又要有binlog文件名,才能進行dump操做。

這裏的dump分了兩步,第一步就是發送COM_REGISTER_SLAVE命令,假裝本身是一個slave,而後發送COM_BINLOG_DUMP命令接收binlog。

public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException {
    updateSettings();
    sendRegisterSlave();
    sendBinlogDump(binlogfilename, binlogPosition);
    DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize());
    fetcher.start(connector.getChannel());
    LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT);
    LogContext context = new LogContext();
    while (fetcher.fetch()) {
        LogEvent event = null;
        event = decoder.decode(fetcher, context);

        if (event == null) {
            throw new CanalParseException("parse failed");
        }

        if (!func.sink(event)) {
            break;
        }

        if (event.getSemival() == 1) {
            sendSemiAck(context.getLogPosition().getFileName(), binlogPosition);
        }
    }
}

這裏有個mysql半同步的標識,semival。若是semival==1,說明須要進行ack,發送SEMI_SYNC_ACK給master(咱們這邊more都不開啓)。

2.5.3 異常處理

若是整個過程當中發生了異常,有如下幾種處理方式:

  • 沒有找到表,說明起始的position在一個事務中,須要從新找到事務的開始點
  • 其餘異常,processDumpError,若是是IO異常,並且message中包含errno = 1236錯誤,表示從master讀取binlog發生致命錯誤,處理方法以下:http://blog.sina.com.cn/s/blog_a1e9c7910102wv2v.html。
  • 若是當前parser不在運行,拋出異常;若是在運行,拋出異常以後,發送一個告警信息。
  • 異常處理完成後,在finally中,首先將當前線程置爲interrupt,而後關閉mysql鏈接。若是關閉鏈接過程當中,拋出異常,須要進行處理。
  • 整個異常處理後,首先暫停sink過程,而後重置緩衝隊列TransctionBuffer,重置binlogParser。最後,若是parser還在運行,那麼sleep一段時間後重試。
} catch (TableIdNotFoundException e) {
    exception = e;
    // 特殊處理TableIdNotFound異常,出現這樣的異常,一種可能就是起始的position是一個事務當中,致使tablemap
    // Event時間沒解析過
    needTransactionPosition.compareAndSet(false, true);
    logger.error(String.format("dump address %s has an error, retrying. caused by ",
        runningInfo.getAddress().toString()), e);
} catch (Throwable e) {
    processDumpError(e);
    exception = e;
    if (!running) {
        if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
            throw new CanalParseException(String.format("dump address %s has an error, retrying. ",
                runningInfo.getAddress().toString()), e);
        }
    } else {
        logger.error(String.format("dump address %s has an error, retrying. caused by ",
            runningInfo.getAddress().toString()), e);
        sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
    }
} finally {
    // 從新置爲中斷狀態
    Thread.interrupted();
    // 關閉一下連接
    afterDump(erosaConnection);
    try {
        if (erosaConnection != null) {
            erosaConnection.disconnect();
        }
    } catch (IOException e1) {
        if (!running) {
            throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ",
                runningInfo.getAddress().toString()),
                e1);
        } else {
            logger.error("disconnect address {} has an error, retrying., caused by ",
                runningInfo.getAddress().toString(),
                e1);
        }
    }
}
// 出異常了,退出sink消費,釋放一下狀態
eventSink.interrupt();
transactionBuffer.reset();// 重置一下緩衝隊列,從新記錄數據
binlogParser.reset();// 從新置位

if (running) {
    // sleep一段時間再進行重試
    try {
        Thread.sleep(10000 + RandomUtils.nextInt(10000));
    } catch (InterruptedException e) {
    }
}
相關文章
相關標籤/搜索