canal [kə'næl],譯意爲水道/管道/溝渠,主要用途是基於 MySQL 數據庫增量日誌解析,提供增量數據 訂閱 和 消費。應該是阿里雲DTS(Data Transfer Service)的開源版本。mysql
Canal與DTS提供的功能基本類似:git
1)基於Mysql的Slave協議實時dump binlog流,解析爲事件發送給訂閱方。github
2)單Canal instance,單DTS數據訂閱通道均只支持訂閱一個RDS,提供給一個消費者。sql
3)可使用canal-client客戶端進行消息消費。docker
4)也能夠經過簡單配置,也能夠不須要自行使用canal-client消費,能夠選擇直接投遞到kafka或者RocketMQ集羣,用戶只須要使用消息隊列的consumer消費便可。數據庫
5)成功消費消息後須要進行Ack,以確保一致性,服務端則會維護客戶端目前的消費位點。安全
MySQL的主從複製分紅三步:ruby
canal 就是模擬了這個過程。架構
canal 1.1.4開始支持admin管理,經過canal-admin爲canal提供總體配置管理、節點運維等面向運維的功能,提供相對友好的WebUI操做界面,方便更多用戶快速和安全的操做,替代了過去繁瑣的配置文件管理。運維
總體部署架構以下。
說明:
Instance模塊
EventParser模塊的類圖設計以下
每一個EventParser都會關聯兩個內部組件:CanalLogPositionManager , CanalHAController
EventParser根據HAController獲知連到哪裏,經過LogPositionManager獲知從哪一個位點開始解析,以後便經過Mysql Slave協議拉取binlog進行解析,推入EventSink
目前只提供了一個帶有實際做用的實現:GroupEventSink
GroupEventSink用於將多個instance上的數據進行歸併,經常使用於分庫後的多數據源歸併。
EventStore的類圖以下
官方提供的實現類是
MemoryEventStoreWIthBuffer,內部採用的是一個RingBuffer:
這些位點信息經過MetaManager進行管理。這也解釋了爲何一個canal instance只能支撐一個消費者:EventStore的RingBuffer只爲一個消費者維護信息。
數據格式已經在前文給出,Canal和DTS客戶端均採起:
拉取事件 -> 消費 -> 消費成功後ACK
這樣的消費模式,並支持消費不成功時進行rollback,從新消費該數據。
下面是一段簡單的客戶端調用實例(略去異常處理):
// 建立CanalConnector, 鏈接到localhost:11111
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");
connector.connect(); // 鏈接
connector.subscribe(); // 開始訂閱binlog
// 開始循環拉取
while (running) {
Message message = connector.getWithoutAck(1024); // 獲取指定數量的數據
long batchId = message.getId();
for (Entry entry : message.getEntries()){
// 對每條消息進行處理
}
connector.ack(batchId); // ack
}
5.1 優勢
1)性能優異、功能全面
2)運維方便
3)多語言支持
5.2 缺點
好了,花了10分鐘應該對canal有大體瞭解了,下一期,阿丸計劃手把手教你搭建canal集羣和admin管理平臺,記得關注哦。
都看到最後了,原創不易,點個關注,點個贊吧~
知識碎片從新梳理,構建Java知識圖譜: github.com/saigu/JavaK…(歷史文章查閱很是方便)