canal源碼分析——parse模塊源碼分析

高層類圖

首先,咱們來看看該模塊下面的類圖,經過類圖就能夠清晰地掌握整個模塊的骨架結構。 java

EventTransactionBuffer是事件事務緩存區。它主要是在內存中開闢一個緩衝區,避免太高的flush頻率致使的IO次數過分而致使的性能問題。 mysql

CanalEventParser是數據複製的控制器。該接口是核心的數據複製接口。 sql

CanalLogPositionManager是日誌的位置管理器。提供了讀取和存儲當前日誌位置的接口。 數據庫

CanalHAController是高可用的複製控制器。 緩存

圖中全部接口都實現了CanalLifeCycle(生命週期接口)。 服務器

AbstractEventParser是一個模板方法的抽閒實現類,它 最大化共用mysql/oracle版本的實現類,提供了一些抽象方法交給子類實現。 多線程

AbstractMysqlEventParser是抽象的MySQL日誌複製控制器的模板類。共享了MySQL的日誌複製控制實現。 併發

LocalBinlogEventParser是基於本地MySQL的binlog文件的複製控制器實現類。 oracle

MysqlEventParser是基於向mysql server複製binlog實現類。該實現類是MySQL使用最多的一種實現方式。 dom

GroupEventParser是合多個EventParser進行合併處理,group只是作爲一個delegate處理。它是一個組合模式的實現。

從上圖所示能夠看出,canal項目並未實現oracle數據庫的日誌複製器的實現,也就是不支持oracle數據庫。

MysqlEventParser時序圖

從類圖中的介紹能夠看出MysqlEventParser 是咱們最核心的一個實現類,本文將重點描述該類的一個時序。


AbstractEventParser類源碼解析

該相似parse模塊中最核心的一個類, 它是一個事件解析的一個模板方法類,定義了事件解析的一個公共流程,幾乎全部的子類都是擴展自該類的,所以閱讀該類可以掌握最核心的binlog事件解析流程。

解析器對象實例化

public AbstractEventParser(){
        // 初始化一下
        transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() {

            public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException {
                boolean successed = consumeTheEventAndProfilingIfNecessary(transaction);
                if (!running) {
                    return;
                }

                if (!successed) {
                    throw new CanalParseException("consume failed!");
                }

                LogPosition position = buildLastTransactionPosition(transaction);
                if (position != null) { // 可能position爲空
                    logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position);
                }
            }
        });
    }


首先看上述代碼,它是構造方法中的代碼,實例化本對象的同時,也實例化了一個EventTransactionBuffer對象。傳入了一個TransactionFlushCallback的回調匿名類對象。回調類中定義了一個flush方法,該方法實現的內容是先消費事件,若是消費成功了,則存儲當前的position。若是消費失敗則拋出異常信息。EventTransactionBuffer寫緩衝區的使用,是一種應對高併發的手段,它至關於在內存中收集一個個的事件,而後再批量的調用flush方法。這個與日誌中的實現是同樣的。

啓動解析器方法

public void start() {
        super.start();
        MDC.put("destination", destination);
        // 配置transaction buffer
        // 初始化緩衝隊列
        transactionBuffer.setBufferSize(transactionSize);// 設置buffer大小
        transactionBuffer.start();
        // 構造bin log parser
        binlogParser = buildParser();// 初始化一下BinLogParser
        binlogParser.start();
        // 啓動工做線程
        parseThread = new Thread(new Runnable() {

            public void run() {
                MDC.put("destination", String.valueOf(destination));
                ErosaConnection erosaConnection = null;
                while (running) {
                    try {

                        // 開始執行replication
                        // 1. 構造Erosa鏈接
                        erosaConnection = buildErosaConnection();

                        // 2. 啓動一個心跳線程
                        startHeartBeat(erosaConnection);

                        // 3. 執行dump前的準備工做
                        preDump(erosaConnection);

                        erosaConnection.connect();// 連接
                        // 4. 獲取最後的位置信息
                        final EntryPosition startPosition = findStartPosition(erosaConnection);
                        if (startPosition == null) {
                            throw new CanalParseException("can't find start position for " + destination);
                        }
                        logger.info("find start position : {}", startPosition.toString());
                        // 從新連接,由於在找position過程當中可能有狀態,須要斷開後重建
                        erosaConnection.reconnect();

                        final SinkFunction sinkHandler = new SinkFunction<EVENT>() {

                            private LogPosition lastPosition;

                            public boolean sink(EVENT event) {
                                try {
                                    CanalEntry.Entry entry = parseAndProfilingIfNecessary(event);

                                    if (!running) {
                                        return false;
                                    }

                                    if (entry != null) {
                                        exception = null; // 有正常數據流過,清空exception
                                        transactionBuffer.add(entry);
                                        // 記錄一下對應的positions
                                        this.lastPosition = buildLastPosition(entry);
                                        // 記錄一下最後一次有數據的時間
                                        lastEntryTime = System.currentTimeMillis();
                                    }
                                    return running;
                                } catch (TableIdNotFoundException e) {
                                    throw e;
                                } catch (Exception e) {
                                    // 記錄一下,出錯的位點信息
                                    processError(e,
                                        this.lastPosition,
                                        startPosition.getJournalName(),
                                        startPosition.getPosition());
                                    throw new CanalParseException(e); // 繼續拋出異常,讓上層統一感知
                                }
                            }

                        };

                        // 4. 開始dump數據
                        if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) {
                            erosaConnection.dump(startPosition.getTimestamp(), sinkHandler);
                        } else {
                            erosaConnection.dump(startPosition.getJournalName(),
                                startPosition.getPosition(),
                                sinkHandler);
                        }

                    } catch (TableIdNotFoundException e) {
                        exception = e;
                        // 特殊處理TableIdNotFound異常,出現這樣的異常,一種可能就是起始的position是一個事務當中,致使tablemap
                        // Event時間沒解析過
                        needTransactionPosition.compareAndSet(false, true);
                        logger.error(String.format("dump address %s has an error, retrying. caused by ",
                            runningInfo.getAddress().toString()), e);
                    } catch (Throwable e) {
                        exception = e;
                        if (!running) {
                            if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) {
                                throw new CanalParseException(String.format("dump address %s has an error, retrying. ",
                                    runningInfo.getAddress().toString()), e);
                            }
                        } else {
                            logger.error(String.format("dump address %s has an error, retrying. caused by ",
                                runningInfo.getAddress().toString()), e);
                            sendAlarm(destination, ExceptionUtils.getFullStackTrace(e));
                        }
                    } finally {
                        // 從新置爲中斷狀態
                        Thread.interrupted();
                        // 關閉一下連接
                        afterDump(erosaConnection);
                        try {
                            if (erosaConnection != null) {
                                erosaConnection.disconnect();
                            }
                        } catch (IOException e1) {
                            if (!running) {
                                throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ",
                                    runningInfo.getAddress().toString()),
                                    e1);
                            } else {
                                logger.error("disconnect address {} has an error, retrying., caused by ",
                                    runningInfo.getAddress().toString(),
                                    e1);
                            }
                        }
                    }
                    // 出異常了,退出sink消費,釋放一下狀態
                    eventSink.interrupt();
                    transactionBuffer.reset();// 重置一下緩衝隊列,從新記錄數據
                    binlogParser.reset();// 從新置位

                    if (running) {
                        // sleep一段時間再進行重試
                        try {
                            Thread.sleep(10000 + RandomUtils.nextInt(10000));
                        } catch (InterruptedException e) {
                        }
                    }
                }
                MDC.remove("destination");
            }
        });

        parseThread.setUncaughtExceptionHandler(handler);
        parseThread.setName(String.format("destination = %s , address = %s , EventParser",
            destination,
            runningInfo == null ? null : runningInfo.getAddress().toString()));
        parseThread.start();
    }

start()方法是實現了生命週期的啓動方法,是被上層的組件調用的,parser組件的start方法應該是被instance組件調用的。該方法開始啓動組件,接收binlog,而且解析處理它。該方法的流程是這樣的。


  1. 初始化並啓動transactionBuffer組件。
  2. 構造binlogParser組件,並啓動它。
  3. 開啓新的線程並啓動它。避免阻塞上級組件的啓動。
  • 開啓循環,直到終止組件運行。判斷標誌是protected volatile boolean running = false。定義爲volatile修飾的成員變量,讓多線程可見。
  • 構造erosa鏈接。
  • 啓動一個心跳線程。用Timer實現。會按期消費一個事件類型爲EntryType.HEARTBEAT的事件。應該是告知下游組件,上有組件還活着。
  • dump數據庫複製日誌前的準備處理。
  • erosa鏈接建立鏈接。
  • 查找日誌起始位置。
  • erosa鏈接重建鏈接。由於在找position過程當中可能有狀態,須要斷開後重建
  • 開始dump數據庫複製日誌。傳入一個回調的SinkFunction匿名類對象。回調方法sink的實現就是解析dump到的日誌事件,將其轉化爲Entry對象。並強Entry對象加入到緩衝區transactionBuffer中,而且記錄當前日誌位置和時間。
  • 最後dump後的處理。關閉鏈接等過後處理。
  • 若未中止運行,則再次進入第一步。

問題是:沒作一次dump以後,就會進入一次循環,會再次建立和鏈接,這樣鏈接就沒法複用,高併發性能豈不是不好,服務器壓力也很是大呢?這個問題從源碼開起來是有些問題,先記錄下來,後面經過查看其它部分源碼和調試就能獲得答案了。

通過排查dump方法內部的實現,發現該方法內部是會阻塞的,當鏈接的緩衝區中沒有新的內容的狀況下,會阻塞請求,等待數據。所以鏈接是能夠被複用的。

相關文章
相關標籤/搜索