Canal的數據傳輸有兩塊,一塊是進行binlog訂閱時,binlog轉換爲咱們所定義的Message,第二塊是client與server進行TCP交互時,傳輸的TCP協議。mysql
這塊是binlog的一個存儲。主要的格式以下:git
Entry Header version [協議的版本號,default = 1] logfileName [binlog文件名] logfileOffset [binlog position] serverId [服務端serverId] serverenCode [變動數據的編碼] executeTime [變動數據的執行時間] sourceType [變動數據的來源,default = MYSQL] schemaName [變動數據的schemaname] tableName [變動數據的tablename] eventLength [每一個event的長度] eventType [insert/update/delete類型,default = UPDATE] props [預留擴展] gtid [當前事務的gitd] entryType [事務頭BEGIN/事務尾END/數據ROWDATA/HEARTBEAT/GTIDLOG] storeValue [byte數據,可展開,對應的類型爲RowChange] RowChange tableId [tableId,由數據庫產生] eventType [數據變動類型,default = UPDATE] isDdl [標識是不是ddl語句,好比create table/drop table] sql [ddl/query的sql語句] rowDatas [具體insert/update/delete的變動數據,可爲多條,1個binlog event事件可對應多條變動,好比批處理] beforeColumns [字段信息,增量數據(修改前,刪除前),Column類型的數組] afterColumns [字段信息,增量數據(修改後,新增後),Column類型的數組] props [預留擴展] props [預留擴展] ddlSchemaName [ddl/query的schemaName,會存在跨庫ddl,須要保留執行ddl的當前schemaName] Column index [字段下標] sqlType [jdbc type] name [字段名稱(忽略大小寫),在mysql中是沒有的] isKey [是否爲主鍵] updated [是否發生過變動] isNull [值是否爲null] props [預留擴展] value [字段值,timestamp,Datetime是一個時間格式的文本] length [對應數據對象原始長度] mysqlType [字段mysql類型]
這塊主要定義了client和server交互的協議。sql
Packet magic_number [default = 17] version [default = 1] type [PacketType,類型] compression [壓縮,default = NONE] body [具體內容]
主要的類型和對應的body,均可以在CanalProtocal.proto裏面查看到。數據庫
enum PacketType { HANDSHAKE = 1; CLIENTAUTHENTICATION = 2; ACK = 3; SUBSCRIPTION = 4; UNSUBSCRIPTION = 5; GET = 6; MESSAGES = 7; CLIENTACK = 8; // management part SHUTDOWN = 9; // integration DUMP = 10; HEARTBEAT = 11; CLIENTROLLBACK = 12; }
//心跳 message HeartBeat { optional int64 send_timestamp = 1; optional int64 start_timestamp = 2; } //握手 message Handshake { optional string communication_encoding = 1 [default = "utf8"]; optional bytes seeds = 2; repeated Compression supported_compressions = 3; } // client authentication message ClientAuth { optional string username = 1; optional bytes password = 2; // hashed password with seeds from Handshake message optional int32 net_read_timeout = 3 [default = 0]; // in seconds optional int32 net_write_timeout = 4 [default = 0]; // in seconds optional string destination = 5; optional string client_id = 6; optional string filter = 7; optional int64 start_timestamp = 8; } //服務端響應 message Ack { optional int32 error_code = 1 [default = 0]; optional string error_message = 2; // if something like compression is not supported, erorr_message will tell about it. } //客戶端提交 message ClientAck { optional string destination = 1; optional string client_id = 2; optional int64 batch_id = 3; } // subscription message Sub { optional string destination = 1; optional string client_id = 2; optional string filter = 7; } // Unsubscription message Unsub { optional string destination = 1; optional string client_id = 2; optional string filter = 7; } // PullRequest message Get { optional string destination = 1; optional string client_id = 2; optional int32 fetch_size = 3; optional int64 timeout = 4 [default = -1]; // 默認-1時表明不控制 optional int32 unit = 5 [default = 2];// 數字類型,0:納秒,1:毫秒,2:微秒,3:秒,4:分鐘,5:小時,6:天 optional bool auto_ack = 6 [default = false]; // 是否自動ack } //消息 message Messages { optional int64 batch_id = 1; repeated bytes messages = 2; } // TBD when new packets are required message Dump{ optional string journal = 1; optional int64 position = 2; optional int64 timestamp = 3 [default = 0]; } // 客戶端回滾 message ClientRollback{ optional string destination = 1; optional string client_id = 2; optional int64 batch_id = 3; }