binlog的尋找過程可能的場景以下:java
因此這個過程是可以保證binlog不丟失的關鍵點。mysql
本文從源碼的角度來分析下啓動過程當中的binlog尋找過程。sql
下圖是根據源碼畫出的流程圖,須要結合源碼分析來一塊兒看。
數據庫
入口在AbstractEventParser的start()方法中,這個start方法實際上是instance的整個啓動過程。具體啓動過程當中都作了哪些事情,請見另外一篇文章的分析。這塊再也不贅述。咱們主要看的地方是緩存
// 4. 獲取最後的位置信息 EntryPosition position = findStartPosition(erosaConnection);
這一行就是獲取binlog的解析位置,也是本文着重要分析的地方。由於咱們目前所配置的都是MysqlEventParser,因此咱們分析的也是這個類中的相關代碼。服務器
protected EntryPosition findStartPosition(ErosaConnection connection) throws IOException { if (isGTIDMode()) { // GTID模式下,CanalLogPositionManager裏取最後的gtid,沒有則取instanc配置中的 LogPosition logPosition = getLogPositionManager().getLatestIndexBy(destination); if (logPosition != null) { return logPosition.getPostion(); } if (StringUtils.isNotEmpty(masterPosition.getGtid())) { return masterPosition; } } EntryPosition startPosition = findStartPositionInternal(connection); if (needTransactionPosition.get()) { logger.warn("prepare to find last position : {}", startPosition.toString()); Long preTransactionStartPosition = findTransactionBeginPosition(connection, startPosition); if (!preTransactionStartPosition.equals(startPosition.getPosition())) { logger.warn("find new start Transaction Position , old : {} , new : {}", startPosition.getPosition(), preTransactionStartPosition); startPosition.setPosition(preTransactionStartPosition); } needTransactionPosition.compareAndSet(true, false); } return startPosition; }
咱們目前的數據庫架構通常都是M-S,因此binlog的位點極可能不一致,這就須要開啓數據庫GTID模式(經過在instance.properties中配置canal.instance.gtidon=true便可開啓),這是一個全局的事務ID,可以防止主從位點不一致的狀況下,找不到位點的問題。目前這塊是從CanalLogPositionManager中取最後的GTID。default-instance.xml中,使用的CanalLogPositionManager是FailbackLogPositionManager,一個兩級的位點管理器,XML配置以下:架構
<!-- 解析位點記錄 --> <property name="logPositionManager"> <bean class="com.alibaba.otter.canal.parse.index.FailbackLogPositionManager"> <constructor-arg> <bean class="com.alibaba.otter.canal.parse.index.MemoryLogPositionManager" /> </constructor-arg> <constructor-arg> <bean class="com.alibaba.otter.canal.parse.index.MetaLogPositionManager"> <constructor-arg ref="metaManager"/> </bean> </constructor-arg> </bean> </property>
一級是放到本地緩存中,第二級直接打了個info日誌,有點弱,其實考慮的狀況是性能,另外一個考慮多是由於DB的主從切換,並不會致使instance掛掉,內存中仍是存儲了以前DB的一些解析位點信息。其實都沒有放到zk中,不利於作HA,因此這塊目前還不是很完善,真正要使用GTID的話,須要對CanalLogPositionManager進行修改。目前已經提供了其餘的一些實現,包括定時刷新到zk中等等。源碼分析
若是CanalLogPositionManager中沒有存儲的話,也能夠在instance.properties裏面指定位點和GTID信息,也能從binlog中獲取。性能
若是canal沒有開啓GTID模式,那麼咱們就須要走一個binlog的尋找過程。fetch
EntryPosition startPosition = findStartPositionInternal(connection);
這個方法是個冗長的方法,裏面的判斷邏輯就是上面的流程圖,咱們來梳理一下。
首先仍是從CanalLogPositionManager中獲取,也就是基本上從內存中獲取LogPosition。
LogPosition logPosition = logPositionManager.getLatestIndexBy(destination);
首先判斷配置文件中的主庫信息是否與當前的數據庫鏈接connection的地址一致,若是一致,若是一致,那麼直接取properties文件中的master的位點信息。
若是主庫不一致,那麼判斷從庫standby的connection地址,若是是從庫,那麼直接取從庫的位點信息。
咱們能夠在xml配置中看到properties的一些信息。
<!-- 解析起始位點 --> <property name="masterPosition"> <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition"> <property name="journalName" value="${canal.instance.master.journal.name}" /> <property name="position" value="${canal.instance.master.position}" /> <property name="timestamp" value="${canal.instance.master.timestamp}" /> <property name="gtid" value="${canal.instance.master.gtid}" /> </bean> </property> <property name="standbyPosition"> <bean class="com.alibaba.otter.canal.protocol.position.EntryPosition"> <property name="journalName" value="${canal.instance.standby.journal.name}" /> <property name="position" value="${canal.instance.standby.position}" /> <property name="timestamp" value="${canal.instance.standby.timestamp}" /> <property name="gtid" value="${canal.instance.standby.gtid}" /> </bean> </property>
若是內存中沒有,配置文件中也沒有,那麼系統默認從當前時間開始消費。
entryPosition = findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默認從當前最後一個位置進行消費 protected EntryPosition findEndPositionWithMasterIdAndTimestamp(MysqlConnection connection) { MysqlConnection mysqlConnection = (MysqlConnection) connection; final EntryPosition endPosition = findEndPosition(mysqlConnection);//獲取當前最新的位點信息 if (tableMetaTSDB != null) { long startTimestamp = System.currentTimeMillis(); return findAsPerTimestampInSpecificLogFile(mysqlConnection, startTimestamp, endPosition, endPosition.getJournalName(), true); } else { return endPosition; } }
這裏的findEndPosition()方法,其實就是執行了一個Mysql命令:
show master status
返回的內容中,包含binlog文件信息和位點position,甚至包括GTID信息。
找到了最新的binlog位點信息後,根據當前時間戳和binlog的時間戳等信息,去服務器上面尋找binlog。其實邏輯基本上都在findAsPerTimestampInSpecificLogFile()中,這個方法是根據時間戳去尋找,離時間戳最近(小於時間戳)的一個事務起始位置。因爲這塊的代碼比較長,因此咱們只作分析,不作代碼粘貼,具體的代碼在MysqlEventParser這個類中。整個尋找的過程以下:
先看一下這個seek的過程,見代碼註釋:
/** * 加速主備切換時的查找速度,作一些特殊優化,好比只解析事務頭或者尾 */ public void seek(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException { updateSettings();//在mysql中執行一些dump以前的命令 sendBinlogDump(binlogfilename, binlogPosition);//指定位點和binlog文件,發送dump命令,COM_BINLOG_DUMP DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize()); fetcher.start(connector.getChannel());//開始獲取 LogDecoder decoder = new LogDecoder(); decoder.handle(LogEvent.ROTATE_EVENT); decoder.handle(LogEvent.FORMAT_DESCRIPTION_EVENT); decoder.handle(LogEvent.QUERY_EVENT); decoder.handle(LogEvent.XID_EVENT); LogContext context = new LogContext(); while (fetcher.fetch()) {//遍歷獲取 LogEvent event = null; event = decoder.decode(fetcher, context);//解析爲event if (event == null) { throw new CanalParseException("parse failed"); } if (!func.sink(event)) {//調用SinkFunction.sink()過濾 break; } } }
下面咱們看下數據過濾這塊:
若是binlog文件名爲空,首先判斷時間戳是否存在,若是存在,那麼直接按照時間戳去取,不然默認從當前最後一個位置進行消費。
// 若是沒有指定binlogName,嘗試按照timestamp進行查找 if (entryPosition.getTimestamp() != null && entryPosition.getTimestamp() > 0L) { logger.warn("prepare to find start position {}:{}:{}", new Object[] { "", "", entryPosition.getTimestamp() }); return findByStartTimeStamp(mysqlConnection, entryPosition.getTimestamp()); } else { logger.warn("prepare to find start position just show master status"); return findEndPositionWithMasterIdAndTimestamp(mysqlConnection); // 默認從當前最後一個位置進行消費 }
這塊咱們看下findByStartTimestamp()這個方法,也就是隻根據時間來查找binlog。這塊的邏輯是這樣的:
binlog文件名不爲空,首先判斷是否有位點信息,若是有的話,直接根據當前內存中存儲的位點和文件信息去Mysql獲取。
不然,根據當前內存中管理的時間戳去獲取,根據時間戳和binlog文件名去獲取位點。固然,若是時間戳也不存在,直接從binlog文件名的文件開頭去獲取binlog。
若是dump錯誤的次數超過了必定的閾值,默認是2次,也就是連續幾回定位失敗,有幾種狀況:
這種須要進行一次判斷,判斷內容:
boolean case2 = (standbyInfo == null || standbyInfo.getAddress() == null) && logPosition.getPostion().getServerId() != null && !logPosition.getPostion().getServerId().equals(findServerId(mysqlConnection));
判斷幾個,第一個配置文件中的standby爲空,第二個內存中的logPosition存在數據庫ip,第三個內存中的logPosition的數據庫ip和當前數據庫鏈接connection的數據庫ip不一致。
知足這三個條件,說明發生了vip的主備切換,此時須要把logPosition中的時間戳向前推一個回退時間,默認60s,而後根據新的時間戳去找binlog文件和位點信息。
if (case2) { long timestamp = logPosition.getPostion().getTimestamp(); long newStartTimestamp = timestamp - fallbackIntervalInSeconds * 1000; logger.warn("prepare to find start position by last position {}:{}:{}", new Object[] { "", "", logPosition.getPostion().getTimestamp() }); EntryPosition findPosition = findByStartTimeStamp(mysqlConnection, newStartTimestamp); // 從新置爲一下 dumpErrorCount = 0; return findPosition; }
說明發生了主從切換,這種狀況下,直接把logPosition中的時間回退60s,而後根據回退後的時間去binlog中尋找,而後返回。