本文主要分析的部分是instance啓動時,parser的一個啓動和工做過程。主要關注的是AbstractEventParser的start()方法中的parseThread。html
parseThread中包含的內容比較清晰,代碼不是很長,咱們逐步分析下。java
erosaConnection = buildErosaConnection();
這裏構造的,應該是一個mysql的連接,包括的內容都是從配置文件中過來的一些信息,包括mysql的地址,帳號密碼等。mysql
startHeartBeat(erosaConnection);
這裏的心跳,感受是個假的心跳,並無用到connection相關的內容。啓動一個定時任務,默認3s發送一個心跳的binlog給sink階段,表名parser還在工做。在sink階段,會把心跳的binlog直接過濾,不會走到store過程。sql
這一步的代碼也不復雜。數據庫
preDump(erosaConnection);
咱們看看preDump都可以作什麼?在MysqlEventParser中,咱們能夠看到,主要作了幾件事:緩存
這一步是比較核心的,也是保證binlog不丟失的核心代碼。dom
EntryPosition position = findStartPosition(erosaConnection); final EntryPosition startPosition = position; if (startPosition == null) { throw new CanalParseException("can't find start position for " + destination); } if (!processTableMeta(startPosition)) { throw new CanalParseException("can't find init table meta for " + destination + " with position : " + startPosition); }
具體的findStartPosition是怎麼實現的,請查閱下一篇文章。源碼分析
若是沒有找到最後的位置信息,那麼直接拋出異常,不然還要進行一次判斷,也就是processTableMeta,咱們看下這個方法作了什麼。fetch
protected boolean processTableMeta(EntryPosition position) { if (isGTIDMode()) { if (binlogParser instanceof LogEventConvert) { // 記錄gtid ((LogEventConvert) binlogParser).setGtidSet(MysqlGTIDSet.parse(position.getGtid())); } } if (tableMetaTSDB != null) { if (position.getTimestamp() == null || position.getTimestamp() <= 0) { throw new CanalParseException("use gtid and TableMeta TSDB should be config timestamp > 0"); } return tableMetaTSDB.rollback(position); } return true; }
若是開啓了GTID模式,那麼直接設置GTID集合。若是tableMetaTSDB不爲空,那麼直接根據位置信息回滾到對應的表結構。這個tableMetaTSDB記錄的是一個表結構的時序,使用的是Druid的一個功能,把全部DDL記錄在數據庫中,通常來講,每24小時生成一份快照插入到數據庫中,這樣能解決DDL產生的表結構不一致的問題,也就是增長了一個表結構的回溯功能。ui
這邊的rollback主要作的事情爲:
在dump以前,代碼中構造了一個sink類,也就是SinkFunction。裏面定義了一個sink方法,主要的內容是對哪些數據進行過濾。
try { CanalEntry.Entry entry = parseAndProfilingIfNecessary(event, false); if (!running) { return false; } if (entry != null) { exception = null; // 有正常數據流過,清空exception transactionBuffer.add(entry); // 記錄一下對應的positions this.lastPosition = buildLastPosition(entry); // 記錄一下最後一次有數據的時間 lastEntryTime = System.currentTimeMillis(); } return running; } catch (TableIdNotFoundException e) { throw e; } catch (Throwable e) { if (e.getCause() instanceof TableIdNotFoundException) { throw (TableIdNotFoundException) e.getCause(); } // 記錄一下,出錯的位點信息 processSinkError(e, this.lastPosition, startPosition.getJournalName(), startPosition.getPosition()); throw new CanalParseException(e); // 繼續拋出異常,讓上層統一感知 }
首先判斷parser是否在運行,若是不運行,那麼就直接拋棄。運行時,判斷entry是否爲空,不爲空的狀況下,直接將entry加入到transactionBuffer中。這裏咱們說下這個transactionBuffer,其實相似於Disruptor中的一個環形隊列(默認長度爲1024),維護了幾個指針,包括put、get、ack三個指針,裏面存儲了須要進行傳遞到下一階段的數據。
加到環形隊列以後,記錄一下當前的位置信息和時間。若是這個過程出錯了,須要記錄下出錯的位置信息,這裏的processSinkError其實就是打印了一下錯誤日誌,而後拋出了一個CanalException,讓上一層感知。
說了這麼多,還沒到真正開始dump的地方。下面開始吧。
if (isGTIDMode()) { erosaConnection.dump(MysqlGTIDSet.parse(startPosition.getGtid()), sinkHandler); } else { if (StringUtils.isEmpty(startPosition.getJournalName()) && startPosition.getTimestamp() != null) { erosaConnection.dump(startPosition.getTimestamp(), sinkHandler); } else { erosaConnection.dump(startPosition.getJournalName(), startPosition.getPosition(), sinkHandler); } }
在新版本中,增長了GTID的模式,因此這裏的dump須要判斷怎麼dump,發送什麼命令給mysql來獲取什麼樣的binlog。
若是開啓了GTID模式(在instance.properties開啓),那麼須要發送COM_BINLOG_DUMP_GTID命令,而後開始接受binlog信息,進行binlog處理。
public void dump(GTIDSet gtidSet, SinkFunction func) throws IOException { updateSettings(); sendBinlogDumpGTID(gtidSet); DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize()); fetcher.start(connector.getChannel()); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); while (fetcher.fetch()) { LogEvent event = null; event = decoder.decode(fetcher, context); if (event == null) { throw new CanalParseException("parse failed"); } if (!func.sink(event)) { break; } } }
調用LogDecoder.decode方法,對二進制進行解析,解析爲咱們須要的LogEvent,若是解析失敗,拋出異常。不然進行sink,若是sink返回的false,那麼直接跳過,不然加入到transactionBuffer中。
這塊有個邏輯判斷,若是找到的最後的位置信息中包含了時間戳,若是沒有binlog文件名,那麼在MysqlConnection中直接報錯,也就是必須既要有時間戳,又要有binlog文件名,才能進行dump操做。
這裏的dump分了兩步,第一步就是發送COM_REGISTER_SLAVE命令,假裝本身是一個slave,而後發送COM_BINLOG_DUMP命令接收binlog。
public void dump(String binlogfilename, Long binlogPosition, SinkFunction func) throws IOException { updateSettings(); sendRegisterSlave(); sendBinlogDump(binlogfilename, binlogPosition); DirectLogFetcher fetcher = new DirectLogFetcher(connector.getReceiveBufferSize()); fetcher.start(connector.getChannel()); LogDecoder decoder = new LogDecoder(LogEvent.UNKNOWN_EVENT, LogEvent.ENUM_END_EVENT); LogContext context = new LogContext(); while (fetcher.fetch()) { LogEvent event = null; event = decoder.decode(fetcher, context); if (event == null) { throw new CanalParseException("parse failed"); } if (!func.sink(event)) { break; } if (event.getSemival() == 1) { sendSemiAck(context.getLogPosition().getFileName(), binlogPosition); } } }
這裏有個mysql半同步的標識,semival。若是semival==1,說明須要進行ack,發送SEMI_SYNC_ACK給master(咱們這邊more都不開啓)。
若是整個過程當中發生了異常,有如下幾種處理方式:
} catch (TableIdNotFoundException e) { exception = e; // 特殊處理TableIdNotFound異常,出現這樣的異常,一種可能就是起始的position是一個事務當中,致使tablemap // Event時間沒解析過 needTransactionPosition.compareAndSet(false, true); logger.error(String.format("dump address %s has an error, retrying. caused by ", runningInfo.getAddress().toString()), e); } catch (Throwable e) { processDumpError(e); exception = e; if (!running) { if (!(e instanceof java.nio.channels.ClosedByInterruptException || e.getCause() instanceof java.nio.channels.ClosedByInterruptException)) { throw new CanalParseException(String.format("dump address %s has an error, retrying. ", runningInfo.getAddress().toString()), e); } } else { logger.error(String.format("dump address %s has an error, retrying. caused by ", runningInfo.getAddress().toString()), e); sendAlarm(destination, ExceptionUtils.getFullStackTrace(e)); } } finally { // 從新置爲中斷狀態 Thread.interrupted(); // 關閉一下連接 afterDump(erosaConnection); try { if (erosaConnection != null) { erosaConnection.disconnect(); } } catch (IOException e1) { if (!running) { throw new CanalParseException(String.format("disconnect address %s has an error, retrying. ", runningInfo.getAddress().toString()), e1); } else { logger.error("disconnect address {} has an error, retrying., caused by ", runningInfo.getAddress().toString(), e1); } } } // 出異常了,退出sink消費,釋放一下狀態 eventSink.interrupt(); transactionBuffer.reset();// 重置一下緩衝隊列,從新記錄數據 binlogParser.reset();// 從新置位 if (running) { // sleep一段時間再進行重試 try { Thread.sleep(10000 + RandomUtils.nextInt(10000)); } catch (InterruptedException e) { } }