MySQL協議和canal實現

前言

前面的文章裏,咱們瞭解到 canal 能夠從 MySQL 中感知數據的變化。這是由於它模擬 MySQL slave 的交互協議,假裝本身爲 MySQL slave ,從而實現了主從複製。mysql

正是瞭解到這一點,筆者有兩個問題便一直縈繞於心:sql

  • 它是如何模擬 MySQL slave 交互協議的?
  • 它又是怎麼解析 binlog 日誌的呢?

今天,筆者準備就着這兩個問題,扒拉扒拉 canal 的代碼,一探究竟。數據庫

1、MySQL 主從複製

在談 canal 以前,咱們有必要再重溫下 MySQL 主從複製的原理。數組

總結上圖的流程以下:bash

  • MySQL master 將數據變動寫入二進制日誌 (binary log , 其中記錄叫作二進制日誌事件binary log events);
  • MySQL slave 將 master 的 binary log events 拷貝到它的中繼日誌 (relay log);
  • MySQL slave 重放 relay log 中的事件,將數據變動反映到本身的數據庫。

2、canal 原理

上圖就很形象的描述了 canal 的角色。它的原理也很簡單:服務器

  • canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議;
  • mysql master收到dump請求,開始推送binary log給slave(也就是canal);
  • canal解析binary log對象(原始爲byte流);
  • canal將解析後的對象,根據業務場景,分發到好比 MySQL 、RocketMQ 或者 ES 中。

3、源碼啓動

看完了 MySQL 主從複製和 canal 原理以後,爲了方便 debug ,筆者已經在 GitHub Fork 了源碼,並導入本地。網絡

能夠找到 com.alibaba.otter.canal.deployer.CanalLauncher 類,它就是 canal 獨立版本啓動的入口類。數據結構

在這裏,直接運行 main 方法便可運行 canal ,和在 /canal/bin/startup.sh 中效果同樣。架構

事實上,canal 的代碼比較多,在架構上又分了不少模塊設計,好比事件解析器、事件消費、內存存儲、服務實例、元數據、高可用等。併發

本文不打算面面俱到介紹每個的實現,那就得正兒八經寫一個 canal 系列才行。主要仍是爲了開頭咱們提出的那兩個問題。

4、如何模擬slave ?

上面咱們已經說到,CanalLauncher是canal 啓動的入口類。

運行 main 方法以後, canal 會先作不少準備工做。好比加載配置文件、初始化消息隊列、啓動 canal Admin、加載Spring配置、註冊鉤子程序等。

canal 模擬 slave 協議,是在EventParser模塊中開始進行的。

在 canal 代碼中,整個流程簡化以下:

// 開始執行replication
// 1. 構造Erosa鏈接
ErosaConnection erosaConnection = buildErosaConnection();
// 2. 啓動一個心跳線程
startHeartBeat(erosaConnection);
// 3. 執行dump前的準備工做
preDump(erosaConnection);
erosaConnection.connect();// 連接
// 查詢master serverId
long queryServerId = erosaConnection.queryServerId();
if (queryServerId != 0) {
    serverId = queryServerId;
}
// 4. 獲取binlog最後的位置信息
EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
// 加載元數據
processTableMeta(startPosition);
// 從新連接,由於在找position過程當中可能有狀態,須要斷開後重建
erosaConnection.reconnect();
// 4. 開始dump數據
erosaConnection.dump(startPosition.getJournalName(),startPosition.getPosition(),sinkHandler);
複製代碼

一、握手、驗證

在開始以前,canal 必須先要和 MySQL 服務器創建鏈接,並完成客戶端身份驗證。

在 MySQL 中,鏈接過程協議以下:

在代碼中,咱們看一下它的鏈接方法:

其中,negotiate方法是握手協議和客戶端驗證的具體實現。就是按照 MySQL 的協議規範,經過上面建立的Socket channel來讀寫網絡數據。

二、dump前的準備

正確鏈接到 MySQL 後,在開始執行 dump 指令以前,還要初始化一些配置信息。

思路就是經過 MySQL 執行器,執行 SQL 語句,獲取信息。

代碼就不粘了,不過它們執行的語句以下:

show variables like 'binlog_format'      #獲取binlog format格式
show variables like 'binlog_row_image'   #獲取binlog image格式
show variables like 'server_id'          #獲取matser serverId
show master status                       #獲取binlog名稱和position
複製代碼

三、註冊slave

如今開始調用 erosaConnection.dump(binlogfilename,binlogPosition,func)方法,來註冊slave和發送dump命令。

在使用COM_BINLOG_DUMP請求binlog事件以前發送,在主服務器上註冊一個從服務器,它的指令是COM_REGISTER_SLAVE

註冊完以後,就是發送dump請求,它的指令是COM_BINLOG_DUMP

在執行完這段代碼後,咱們經過show processlist;查看進程,就能夠看到這個dump線程的狀態。

id user host db command time state
139 canal localhost:62901 null Binlog Dump 3 Master has sent all binlog to slave; waiting for more updates

5、如何解析binlog數據 ?

在上面章節中,咱們已經看到,MySQL主服務器已經接受了 canal 這個從服務器,那麼當canal拿到binlog內容後, 又是怎麼解析它的呢?

首先,還記得在配置MySQL服務器的時候,咱們將binlog-format設置爲ROW模式,它是基於行的複製。

binlog中每個數據變動能夠叫作事件,在ROW模式下,有幾個主要的事件類型:

事件 SQL命令 rows 內容
TABLE_MAP_EVENT null 定義將要更改的表。
WRITE_ROWS_EVENT 插入 要插入的行數據
DELETE_ROWS_EVENT 刪除 被刪除的數據
UPDATE_ROWS_EVENT 更新 原數據+要更改的數據

每一次數據的變動,都會觸發2個事件,先把要更改的表信息告訴你,而後再告訴你更改的row內容。

好比TABLE_MAP_EVENT + WRITE_ROWS_EVENT

canal在接收到binlog數據後,並不會立刻把它解析成咱們熟悉的JSON數據,而是在發送的時候纔開始。

好比咱們選擇使用RocketMQ,那麼在發送以前纔開始將binlog裏面的byte數組轉化爲對象。

// 併發構造
EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
// 串行分區
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
複製代碼

在這兩個方法裏,就完成了byte數組到對象的轉化。轉化成的FlatMessage對象,就成了咱們在消息隊列中消費到的數據結構。

public class FlatMessage implements Serializable {
    private long                      id;
    private String                    database;
    private String                    table;
    private List<String>              pkNames;
    private Boolean                   isDdl;
    private String                    type;
    // binlog executeTime
    private Long                      es;
    // dml build timeStamp
    private Long                      ts;
    private String                    sql;
    private Map<String, Integer>      sqlType;
    private Map<String, String>       mysqlType;
    private List<Map<String, String>> data;
    private List<Map<String, String>> old;
}
複製代碼

總結

正如本文開頭所言,筆者在剛瞭解到canal機制的時候,確實以爲很難以想象。

咦,它是怎麼模擬MySQL slave的呢 ? 總以爲是否是有啥黑科技在裏面。。。

事實上,這是源於筆者對MySQL的無知。

MySQL早就制定好了各類接口協議,怎麼鏈接、驗證、註冊和dump都明明白白的寫在那兒啦。

正是應了那句話:花開正好,只待君來~

相關文章
相關標籤/搜索