【Canal源碼分析】數據傳輸協議

Canal的數據傳輸有兩塊,一塊是進行binlog訂閱時,binlog轉換爲咱們所定義的Message,第二塊是client與server進行TCP交互時,傳輸的TCP協議。mysql

1、EntryProtocal

這塊是binlog的一個存儲。主要的格式以下:git

Entry
    Header
        version         [協議的版本號,default = 1]
        logfileName     [binlog文件名]
        logfileOffset   [binlog position]
        serverId        [服務端serverId]
        serverenCode    [變動數據的編碼]
        executeTime     [變動數據的執行時間]
        sourceType      [變動數據的來源,default = MYSQL]
        schemaName      [變動數據的schemaname]
        tableName       [變動數據的tablename]
        eventLength     [每一個event的長度]
        eventType       [insert/update/delete類型,default = UPDATE]
        props           [預留擴展]
        gtid            [當前事務的gitd]
    entryType           [事務頭BEGIN/事務尾END/數據ROWDATA/HEARTBEAT/GTIDLOG]
    storeValue          [byte數據,可展開,對應的類型爲RowChange]    
RowChange
    tableId             [tableId,由數據庫產生]
    eventType           [數據變動類型,default = UPDATE]
    isDdl               [標識是不是ddl語句,好比create table/drop table]
    sql                 [ddl/query的sql語句]
    rowDatas            [具體insert/update/delete的變動數據,可爲多條,1個binlog event事件可對應多條變動,好比批處理]
        beforeColumns   [字段信息,增量數據(修改前,刪除前),Column類型的數組]
        afterColumns    [字段信息,增量數據(修改後,新增後),Column類型的數組] 
        props           [預留擴展]
    props               [預留擴展]
    ddlSchemaName       [ddl/query的schemaName,會存在跨庫ddl,須要保留執行ddl的當前schemaName]
Column 
    index               [字段下標]      
    sqlType             [jdbc type]
    name                [字段名稱(忽略大小寫),在mysql中是沒有的]
    isKey               [是否爲主鍵]
    updated             [是否發生過變動]
    isNull              [值是否爲null]
    props               [預留擴展]
    value               [字段值,timestamp,Datetime是一個時間格式的文本]
    length              [對應數據對象原始長度]
    mysqlType           [字段mysql類型]

2、CanalProtocal

這塊主要定義了client和server交互的協議。sql

Packet
    magic_number    [default = 17]
    version         [default = 1]
    type            [PacketType,類型]
    compression     [壓縮,default = NONE]
    body            [具體內容]

主要的類型和對應的body,均可以在CanalProtocal.proto裏面查看到。數據庫

enum PacketType {
    HANDSHAKE = 1;
    CLIENTAUTHENTICATION = 2;
    ACK = 3;
    SUBSCRIPTION = 4;
    UNSUBSCRIPTION = 5;
    GET = 6;
    MESSAGES = 7;
    CLIENTACK = 8;
    // management part
    SHUTDOWN = 9;
    // integration
    DUMP = 10;
    HEARTBEAT = 11;
    CLIENTROLLBACK = 12;
}
//心跳
message HeartBeat {
    optional int64 send_timestamp = 1;
    optional int64 start_timestamp = 2;
}

//握手
message Handshake {
    optional string communication_encoding = 1 [default = "utf8"];
    optional bytes seeds = 2;
    repeated Compression supported_compressions = 3;
}

// client authentication
message ClientAuth {
    optional string username = 1;
    optional bytes password = 2; // hashed password with seeds from Handshake message
    optional int32 net_read_timeout = 3 [default = 0]; // in seconds
    optional int32 net_write_timeout = 4 [default = 0]; // in seconds
    optional string destination = 5;
    optional string client_id = 6;
    optional string filter = 7;
    optional int64 start_timestamp = 8;
}

//服務端響應
message Ack {
    optional int32 error_code = 1 [default = 0];
    optional string error_message = 2; // if something like compression is not supported, erorr_message will tell about it.
}

//客戶端提交
message ClientAck {
    optional string destination = 1;
    optional string client_id = 2;
    optional int64 batch_id = 3;
}

// subscription
message Sub {
    optional string destination = 1;
    optional string client_id = 2;
    optional string filter = 7;
}

// Unsubscription
message Unsub {
    optional string destination = 1;
    optional string client_id = 2;
    optional string filter = 7;
}

//  PullRequest
message Get {
    optional string destination = 1;
    optional string client_id = 2;
    optional int32 fetch_size = 3;
    optional int64 timeout = 4 [default = -1]; // 默認-1時表明不控制
    optional int32 unit = 5 [default = 2];// 數字類型,0:納秒,1:毫秒,2:微秒,3:秒,4:分鐘,5:小時,6:天
    optional bool auto_ack = 6 [default = false]; // 是否自動ack
}

//消息
message Messages {
	optional int64 batch_id = 1;
    repeated bytes messages = 2;
}

// TBD when new packets are required
message Dump{
    optional string journal = 1;
    optional int64  position = 2;
    optional int64 timestamp = 3 [default = 0];
}

// 客戶端回滾
message ClientRollback{
    optional string destination = 1;
    optional string client_id = 2;
    optional int64 batch_id = 3;
}
相關文章
相關標籤/搜索