首先,咱們來看看該模塊下面的類圖,經過類圖就能夠清晰地掌握整個模塊的骨架結構。 java
EventTransactionBuffer是事件事務緩存區。它主要是在內存中開闢一個緩衝區,避免太高的flush頻率致使的IO次數過分而致使的性能問題。 mysql
CanalEventParser是數據複製的控制器。該接口是核心的數據複製接口。 sql
CanalLogPositionManager是日誌的位置管理器。提供了讀取和存儲當前日誌位置的接口。 數據庫
CanalHAController是高可用的複製控制器。 緩存
圖中全部接口都實現了CanalLifeCycle(生命週期接口)。 服務器
AbstractEventParser是一個模板方法的抽閒實現類,它 最大化共用mysql/oracle版本的實現類,提供了一些抽象方法交給子類實現。 多線程
AbstractMysqlEventParser是抽象的MySQL日誌複製控制器的模板類。共享了MySQL的日誌複製控制實現。 併發
LocalBinlogEventParser是基於本地MySQL的binlog文件的複製控制器實現類。 oracle
MysqlEventParser是基於向mysql server複製binlog實現類。該實現類是MySQL使用最多的一種實現方式。 dom
GroupEventParser是合多個EventParser進行合併處理,group只是作爲一個delegate處理。它是一個組合模式的實現。
從上圖所示能夠看出,canal項目並未實現oracle數據庫的日誌複製器的實現,也就是不支持oracle數據庫。
從類圖中的介紹能夠看出MysqlEventParser 是咱們最核心的一個實現類,本文將重點描述該類的一個時序。
該相似parse模塊中最核心的一個類, 它是一個事件解析的一個模板方法類,定義了事件解析的一個公共流程,幾乎全部的子類都是擴展自該類的,所以閱讀該類可以掌握最核心的binlog事件解析流程。
public AbstractEventParser(){ // 初始化一下 transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() { public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException { boolean successed = consumeTheEventAndProfilingIfNecessary(transaction); if (!running) { return; } if (!successed) { throw new CanalParseException("consume failed!"); } LogPosition position = buildLastTransactionPosition(transaction); if (position != null) { // 可能position爲空 logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position); } } }); }
首先看上述代碼,它是構造方法中的代碼,實例化本對象的同時,也實例化了一個EventTransactionBuffer對象。傳入了一個TransactionFlushCallback的回調匿名類對象。回調類中定義了一個flush方法,該方法實現的內容是先消費事件,若是消費成功了,則存儲當前的position。若是消費失敗則拋出異常信息。EventTransactionBuffer寫緩衝區的使用,是一種應對高併發的手段,它至關於在內存中收集一個個的事件,而後再批量的調用flush方法。這個與日誌中的實現是同樣的。
public void start() { super.start(); MDC.put("destination", destination); // 配置transaction buffer // 初始化緩衝隊列 transactionBuffer.setBufferSize(transactionSize);// 設置buffer大小 transactionBuffer.start(); // 構造bin log parser binlogParser = buildParser();// 初始化一下BinLogParser binlogParser.start(); // 啓動工做線程 parseThread = new Thread(new Runnable() { public void run() { MDC.put("destination", String.valueOf(destination)); ErosaConnection erosaConnection = null; while (running) { try { // 開始執行replication // 1. 構造Erosa鏈接 erosaConnection = buildErosaConnection(); // 2. 啓動一個心跳線程 startHeartBeat(erosaConnection); // 3. 執行dump前的準備工做 preDump(erosaConnection); erosaConnection.connect();// 連接 // 4. 獲取最後的位置信息 final EntryPosition startPosition = findStartPosition(erosaConnection); if (startPosition == null) { throw new CanalParseException("can't find start position for " + destination); } logger.info("find start position : {}", startPosition.toString()); // 從新連接,由於在找position過程當中可能有狀態,須要斷開後重建 erosaConnection.reconnect(); final SinkFunction sinkHandler = new SinkFunction<EVENT>() { private LogPosition lastPosition; public boolean sink(EVENT event) { try { CanalEntry.Entry entry = parseAndProfilingIfNecessary(event); if (!running) { return false; } if (entry != null) { exception = null; // 有正常數據流過,清空exception transactionBuffer.add(entry); // 記錄一下對應的positions this.lastPosition = buildLastPosition(entry); // 記錄一下最後一次有數據的時間 lastEntryTime = System.currentTimeMillis(); } return running; } catch (TableIdNotFoundException e) { throw e; } catch (Exception e) { // 記錄一下,出錯的位點信息 processError(e, this.lastPosition, startPosition.getJournalName(), startPosition.getPosition()); throw new CanalParseException(e); // 繼續拋出異常,讓上層統一感知 } } }; // 4. 開始dump數據 if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) { erosaConnection.dump(startPosition.getTimestamp(), sinkHandler); } else { erosaConnection.dump(startPosition.getJournalName(), startPosition.getPosition(), sinkHandler); } } catch (TableIdNotFoundException e) { exception = e; // 特殊處理TableIdNotFound異常,出現這樣的異常,一種可能就是起始的position是一個事務當中,致使tablemap // Event時間沒解析過 needTransactionPosition.compareAndSet(false, true); logger.error(String.format("dump address %s has an error, retrying. caused by ", runningInfo.getAddress().toString()), e); } catch (Throwable e) { exception = e; if (!running) { if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) { throw new CanalParseException(String.format("dump address %s has an error, retrying. ", runningInfo.getAddress().toString()), e); } } else { logger.error(String.format("dump address %s has an error, retrying. caused by ", runningInfo.getAddress().toString()), e); sendAlarm(destination, ExceptionUtils.getFullStackTrace(e)); } } finally { // 從新置爲中斷狀態 Thread.interrupted(); // 關閉一下連接 afterDump(erosaConnection); try { if (erosaConnection != null) { erosaConnection.disconnect(); } } catch (IOException e1) { if (!running) { throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", runningInfo.getAddress().toString()), e1); } else { logger.error("disconnect address {} has an error, retrying., caused by ", runningInfo.getAddress().toString(), e1); } } } // 出異常了,退出sink消費,釋放一下狀態 eventSink.interrupt(); transactionBuffer.reset();// 重置一下緩衝隊列,從新記錄數據 binlogParser.reset();// 從新置位 if (running) { // sleep一段時間再進行重試 try { Thread.sleep(10000 + RandomUtils.nextInt(10000)); } catch (InterruptedException e) { } } } MDC.remove("destination"); } }); parseThread.setUncaughtExceptionHandler(handler); parseThread.setName(String.format("destination = %s , address = %s , EventParser", destination, runningInfo == null ? null : runningInfo.getAddress().toString())); parseThread.start(); }
start()方法是實現了生命週期的啓動方法,是被上層的組件調用的,parser組件的start方法應該是被instance組件調用的。該方法開始啓動組件,接收binlog,而且解析處理它。該方法的流程是這樣的。
問題是:沒作一次dump以後,就會進入一次循環,會再次建立和鏈接,這樣鏈接就沒法複用,高併發性能豈不是不好,服務器壓力也很是大呢?這個問題從源碼開起來是有些問題,先記錄下來,後面經過查看其它部分源碼和調試就能獲得答案了。
通過排查dump方法內部的實現,發現該方法內部是會阻塞的,當鏈接的緩衝區中沒有新的內容的狀況下,會阻塞請求,等待數據。所以鏈接是能夠被複用的。