前面的文章裏,咱們瞭解到 canal 能夠從 MySQL 中感知數據的變化。這是由於它模擬 MySQL slave 的交互協議,假裝本身爲 MySQL slave ,從而實現了主從複製。mysql
正是瞭解到這一點,筆者有兩個問題便一直縈繞於心:sql
今天,筆者準備就着這兩個問題,扒拉扒拉 canal 的代碼,一探究竟。數據庫
在談 canal 以前,咱們有必要再重溫下 MySQL 主從複製的原理。數組
總結上圖的流程以下:bash
上圖就很形象的描述了 canal 的角色。它的原理也很簡單:服務器
看完了 MySQL 主從複製和 canal 原理以後,爲了方便 debug ,筆者已經在 GitHub Fork 了源碼,並導入本地。網絡
能夠找到 com.alibaba.otter.canal.deployer.CanalLauncher
類,它就是 canal 獨立版本啓動的入口類。數據結構
在這裏,直接運行 main 方法便可運行 canal ,和在 /canal/bin/startup.sh
中效果同樣。架構
事實上,canal 的代碼比較多,在架構上又分了不少模塊設計,好比事件解析器、事件消費、內存存儲、服務實例、元數據、高可用等。併發
本文不打算面面俱到介紹每個的實現,那就得正兒八經寫一個 canal 系列才行。主要仍是爲了開頭咱們提出的那兩個問題。
上面咱們已經說到,CanalLauncher
是canal 啓動的入口類。
運行 main 方法以後, canal 會先作不少準備工做。好比加載配置文件、初始化消息隊列、啓動 canal Admin、加載Spring配置、註冊鉤子程序等。
canal 模擬 slave 協議,是在EventParser
模塊中開始進行的。
在 canal 代碼中,整個流程簡化以下:
// 開始執行replication
// 1. 構造Erosa鏈接
ErosaConnection erosaConnection = buildErosaConnection();
// 2. 啓動一個心跳線程
startHeartBeat(erosaConnection);
// 3. 執行dump前的準備工做
preDump(erosaConnection);
erosaConnection.connect();// 連接
// 查詢master serverId
long queryServerId = erosaConnection.queryServerId();
if (queryServerId != 0) {
serverId = queryServerId;
}
// 4. 獲取binlog最後的位置信息
EntryPosition position = findStartPosition(erosaConnection);
final EntryPosition startPosition = position;
// 加載元數據
processTableMeta(startPosition);
// 從新連接,由於在找position過程當中可能有狀態,須要斷開後重建
erosaConnection.reconnect();
// 4. 開始dump數據
erosaConnection.dump(startPosition.getJournalName(),startPosition.getPosition(),sinkHandler);
複製代碼
在開始以前,canal 必須先要和 MySQL 服務器創建鏈接,並完成客戶端身份驗證。
在 MySQL 中,鏈接過程協議以下:
在代碼中,咱們看一下它的鏈接方法:
其中,negotiate
方法是握手協議和客戶端驗證的具體實現。就是按照 MySQL 的協議規範,經過上面建立的Socket channel
來讀寫網絡數據。
正確鏈接到 MySQL 後,在開始執行 dump 指令以前,還要初始化一些配置信息。
思路就是經過 MySQL 執行器,執行 SQL 語句,獲取信息。
代碼就不粘了,不過它們執行的語句以下:
show variables like 'binlog_format' #獲取binlog format格式
show variables like 'binlog_row_image' #獲取binlog image格式
show variables like 'server_id' #獲取matser serverId
show master status #獲取binlog名稱和position
複製代碼
如今開始調用 erosaConnection.dump(binlogfilename,binlogPosition,func)
方法,來註冊slave和發送dump命令。
在使用COM_BINLOG_DUMP
請求binlog事件以前發送,在主服務器上註冊一個從服務器,它的指令是COM_REGISTER_SLAVE
。
註冊完以後,就是發送dump請求,它的指令是COM_BINLOG_DUMP
。
在執行完這段代碼後,咱們經過show processlist;
查看進程,就能夠看到這個dump線程的狀態。
id | user | host | db | command | time | state |
---|---|---|---|---|---|---|
139 | canal | localhost:62901 | null | Binlog Dump | 3 | Master has sent all binlog to slave; waiting for more updates |
在上面章節中,咱們已經看到,MySQL主服務器已經接受了 canal 這個從服務器,那麼當canal拿到binlog內容後, 又是怎麼解析它的呢?
首先,還記得在配置MySQL服務器的時候,咱們將binlog-format
設置爲ROW模式,它是基於行的複製。
binlog中每個數據變動能夠叫作事件,在ROW模式下,有幾個主要的事件類型:
事件 | SQL命令 | rows 內容 |
---|---|---|
TABLE_MAP_EVENT | null | 定義將要更改的表。 |
WRITE_ROWS_EVENT | 插入 | 要插入的行數據 |
DELETE_ROWS_EVENT | 刪除 | 被刪除的數據 |
UPDATE_ROWS_EVENT | 更新 | 原數據+要更改的數據 |
每一次數據的變動,都會觸發2個事件,先把要更改的表信息告訴你,而後再告訴你更改的row內容。
好比TABLE_MAP_EVENT + WRITE_ROWS_EVENT
。
canal在接收到binlog數據後,並不會立刻把它解析成咱們熟悉的JSON數據,而是在發送的時候纔開始。
好比咱們選擇使用RocketMQ
,那麼在發送以前纔開始將binlog裏面的byte數組轉化爲對象。
// 併發構造
EntryRowData[] datas = MQMessageUtils.buildMessageData(message, executor);
// 串行分區
List<FlatMessage> flatMessages = MQMessageUtils.messageConverter(datas, message.getId());
複製代碼
在這兩個方法裏,就完成了byte數組到對象的轉化。轉化成的FlatMessage
對象,就成了咱們在消息隊列中消費到的數據結構。
public class FlatMessage implements Serializable {
private long id;
private String database;
private String table;
private List<String> pkNames;
private Boolean isDdl;
private String type;
// binlog executeTime
private Long es;
// dml build timeStamp
private Long ts;
private String sql;
private Map<String, Integer> sqlType;
private Map<String, String> mysqlType;
private List<Map<String, String>> data;
private List<Map<String, String>> old;
}
複製代碼
正如本文開頭所言,筆者在剛瞭解到canal機制的時候,確實以爲很難以想象。
咦,它是怎麼模擬MySQL slave的呢 ? 總以爲是否是有啥黑科技在裏面。。。
事實上,這是源於筆者對MySQL的無知。
MySQL早就制定好了各類接口協議,怎麼鏈接、驗證、註冊和dump都明明白白的寫在那兒啦。
正是應了那句話:花開正好,只待君來~