本文主要研究一下canal的EventTransactionBufferjava
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.javamysql
public class EventTransactionBuffer extends AbstractCanalLifeCycle { private static final long INIT_SQEUENCE = -1; private int bufferSize = 1024; private int indexMask; private CanalEntry.Entry[] entries; private AtomicLong putSequence = new AtomicLong(INIT_SQEUENCE); // 表明當前put操做最後一次寫操做發生的位置 private AtomicLong flushSequence = new AtomicLong(INIT_SQEUENCE); // 表明知足flush條件後最後一次數據flush的時間 private TransactionFlushCallback flushCallback; public EventTransactionBuffer(){ } public EventTransactionBuffer(TransactionFlushCallback flushCallback){ this.flushCallback = flushCallback; } public void start() throws CanalStoreException { super.start(); if (Integer.bitCount(bufferSize) != 1) { throw new IllegalArgumentException("bufferSize must be a power of 2"); } Assert.notNull(flushCallback, "flush callback is null!"); indexMask = bufferSize - 1; entries = new CanalEntry.Entry[bufferSize]; } public void stop() throws CanalStoreException { putSequence.set(INIT_SQEUENCE); flushSequence.set(INIT_SQEUENCE); entries = null; super.stop(); } public void add(List<CanalEntry.Entry> entrys) throws InterruptedException { for (CanalEntry.Entry entry : entrys) { add(entry); } } public void add(CanalEntry.Entry entry) throws InterruptedException { switch (entry.getEntryType()) { case TRANSACTIONBEGIN: flush();// 刷新上一次的數據 put(entry); break; case TRANSACTIONEND: put(entry); flush(); break; case ROWDATA: put(entry); // 針對非DML的數據,直接輸出,不進行buffer控制 EventType eventType = entry.getHeader().getEventType(); if (eventType != null && !isDml(eventType)) { flush(); } break; case HEARTBEAT: // master過來的heartbeat,說明binlog已經讀完了,是idle狀態 put(entry); flush(); break; default: break; } } public void reset() { putSequence.set(INIT_SQEUENCE); flushSequence.set(INIT_SQEUENCE); } private void put(CanalEntry.Entry data) throws InterruptedException { // 首先檢查是否有空位 if (checkFreeSlotAt(putSequence.get() + 1)) { long current = putSequence.get(); long next = current + 1; // 先寫數據,再更新對應的cursor,併發度高的狀況,putSequence會被get請求可見,拿出了ringbuffer中的老的Entry值 entries[getIndex(next)] = data; putSequence.set(next); } else { flush();// buffer區滿了,刷新一下 put(data);// 繼續加一下新數據 } } private void flush() throws InterruptedException { long start = this.flushSequence.get() + 1; long end = this.putSequence.get(); if (start <= end) { List<CanalEntry.Entry> transaction = new ArrayList<CanalEntry.Entry>(); for (long next = start; next <= end; next++) { transaction.add(this.entries[getIndex(next)]); } flushCallback.flush(transaction); flushSequence.set(end);// flush成功後,更新flush位置 } } //...... }
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/EventTransactionBuffer.javagit
public static interface TransactionFlushCallback { public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException; }
canal-1.1.4/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.protogithub
syntax = "proto3"; package com.alibaba.otter.canal.protocol; option java_package = "com.alibaba.otter.canal.protocol"; option java_outer_classname = "CanalEntry"; option optimize_for = SPEED; /**************************************************************** * message model *若是要在Enum中新增類型,確保之前的類型的下標值不變. ****************************************************************/ message Entry { /**協議頭部信息**/ Header header = 1; ///**打散後的事件類型**/ [default = ROWDATA] oneof entryType_present{ EntryType entryType = 2; } /**傳輸的二進制數組**/ bytes storeValue = 3; } /**message Header**/ message Header { /**協議的版本號**/ //[default = 1] oneof version_present { int32 version = 1; } /**binlog/redolog 文件名**/ string logfileName = 2; /**binlog/redolog 文件的偏移位置**/ int64 logfileOffset = 3; /**服務端serverId**/ int64 serverId = 4; /** 變動數據的編碼 **/ string serverenCode = 5; /**變動數據的執行時間 **/ int64 executeTime = 6; /** 變動數據的來源**/ //[default = MYSQL] oneof sourceType_present { Type sourceType = 7; } /** 變動數據的schemaname**/ string schemaName = 8; /**變動數據的tablename**/ string tableName = 9; /**每一個event的長度**/ int64 eventLength = 10; /**數據變動類型**/ // [default = UPDATE] oneof eventType_present { EventType eventType = 11; } /**預留擴展**/ repeated Pair props = 12; /**當前事務的gitd**/ string gtid = 13; } /**每一個字段的數據結構**/ message Column { /**字段下標**/ int32 index = 1; /**字段java中類型**/ int32 sqlType = 2; /**字段名稱(忽略大小寫),在mysql中是沒有的**/ string name = 3; /**是不是主鍵**/ bool isKey = 4; /**若是EventType=UPDATE,用於標識這個字段值是否有修改**/ bool updated = 5; /** 標識是否爲空 **/ //[default = false] oneof isNull_present { bool isNull = 6; } /**預留擴展**/ repeated Pair props = 7; /** 字段值,timestamp,Datetime是一個時間格式的文本 **/ string value = 8; /** 對應數據對象原始長度 **/ int32 length = 9; /**字段mysql類型**/ string mysqlType = 10; } message RowData { /** 字段信息,增量數據(修改前,刪除前) **/ repeated Column beforeColumns = 1; /** 字段信息,增量數據(修改後,新增後) **/ repeated Column afterColumns = 2; /**預留擴展**/ repeated Pair props = 3; } /**message row 每行變動數據的數據結構**/ message RowChange { /**tableId,由數據庫產生**/ int64 tableId = 1; /**數據變動類型**/ //[default = UPDATE] oneof eventType_present { EventType eventType = 2; } /** 標識是不是ddl語句 **/ // [default = false] oneof isDdl_present { bool isDdl = 10; } /** ddl/query的sql語句 **/ string sql = 11; /** 一次數據庫變動可能存在多行 **/ repeated RowData rowDatas = 12; /**預留擴展**/ repeated Pair props = 13; /** ddl/query的schemaName,會存在跨庫ddl,須要保留執行ddl的當前schemaName **/ string ddlSchemaName = 14; } /**開始事務的一些信息**/ message TransactionBegin{ /**已廢棄,請使用header裏的executeTime**/ int64 executeTime = 1; /**已廢棄,Begin裏不提供事務id**/ string transactionId = 2; /**預留擴展**/ repeated Pair props = 3; /**執行的thread Id**/ int64 threadId = 4; } /**結束事務的一些信息**/ message TransactionEnd{ /**已廢棄,請使用header裏的executeTime**/ int64 executeTime = 1; /**事務號**/ string transactionId = 2; /**預留擴展**/ repeated Pair props = 3; } /**預留擴展**/ message Pair{ string key = 1; string value = 2; } /**打散後的事件類型,主要用於標識事務的開始,變動數據,結束**/ enum EntryType{ ENTRYTYPECOMPATIBLEPROTO2 = 0; TRANSACTIONBEGIN = 1; ROWDATA = 2; TRANSACTIONEND = 3; /** 心跳類型,內部使用,外部暫不可見,可忽略 **/ HEARTBEAT = 4; GTIDLOG = 5; } /** 事件類型 **/ enum EventType { EVENTTYPECOMPATIBLEPROTO2 = 0; INSERT = 1; UPDATE = 2; DELETE = 3; CREATE = 4; ALTER = 5; ERASE = 6; QUERY = 7; TRUNCATE = 8; RENAME = 9; /**CREATE INDEX**/ CINDEX = 10; DINDEX = 11; GTID = 12; /** XA **/ XACOMMIT = 13; XAROLLBACK = 14; /** MASTER HEARTBEAT **/ MHEARTBEAT = 15; } /**數據庫類型**/ enum Type { TYPECOMPATIBLEPROTO2 = 0; ORACLE = 1; MYSQL = 2; PGSQL = 3; }
canal-1.1.4/parse/src/main/java/com/alibaba/otter/canal/parse/inbound/AbstractEventParser.javasql
public abstract class AbstractEventParser<EVENT> extends AbstractCanalLifeCycle implements CanalEventParser<EVENT> { //...... public AbstractEventParser(){ // 初始化一下 transactionBuffer = new EventTransactionBuffer(new TransactionFlushCallback() { public void flush(List<CanalEntry.Entry> transaction) throws InterruptedException { boolean successed = consumeTheEventAndProfilingIfNecessary(transaction); if (!running) { return; } if (!successed) { throw new CanalParseException("consume failed!"); } LogPosition position = buildLastTransactionPosition(transaction); if (position != null) { // 可能position爲空 logPositionManager.persistLogPosition(AbstractEventParser.this.destination, position); } } }); } protected boolean consumeTheEventAndProfilingIfNecessary(List<CanalEntry.Entry> entrys) throws CanalSinkException, InterruptedException { long startTs = -1; boolean enabled = getProfilingEnabled(); if (enabled) { startTs = System.currentTimeMillis(); } boolean result = eventSink.sink(entrys, (runningInfo == null) ? null : runningInfo.getAddress(), destination); if (enabled) { this.processingInterval = System.currentTimeMillis() - startTs; } if (consumedEventCount.incrementAndGet() < 0) { consumedEventCount.set(0); } return result; } //...... }
EventTransactionBuffer繼承了AbstractCanalLifeCycle,其start方法建立bufferSize大小的CanalEntry.Entry數組;其stop方法設置putSequence及flushSequence爲INIT_SQEUENCE,設置entries爲null;其add方法根據entry.getEntryType()的不一樣類型作不一樣的處理,基本是執行put及flush方法;其reset方法設置putSequence及flushSequence爲INIT_SQEUENCE;put方法給entries複製的同時更新putSequence,若是buffer滿了則執行flush在從新put;flush方法則執行flushCallback.flush(transaction),並更新flushSequence數據庫