背景
早期,阿里巴巴B2B公司由於存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增 量變動,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務,今後 開啓了一段新紀元。ps. 目前內部使用的同步,已經支持mysql5.x和oracle部分版本的日誌解析html
基於日誌增量訂閱&消費支持的業務:java
- 數據庫鏡像
- 數據庫實時備份
- 多級索引 (賣家和買家各自分庫索引)
- search build
- 業務cache刷新
- 價格變化等重要業務消息
項目介紹
名稱:canal [kə'næl]mysql
譯意: 水道/管道/溝渠 git
語言: 純java開發github
定位: 基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了mysqlweb
工做原理
mysql主備複製實現
從上層來看,複製分紅三步:spring
- master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);
- slave將master的binary log events拷貝到它的中繼日誌(relay log);
- slave重作中繼日誌中的事件,將改變反映它本身的數據。
canal的工做原理:
原理相對比較簡單:sql
- canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log對象(原始爲byte流)
架構
說明:數據庫
- server表明一個canal運行實例,對應於一個jvm
- instance對應於一個數據隊列 (1個server對應1..n個instance)
instance模塊:api
- eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
- eventSink (Parser和Store連接器,進行數據過濾,加工,分發的工做)
- eventStore (數據存儲)
- metaManager (增量訂閱&消費信息管理器)
知識科普
mysql的Binlay Log介紹
簡單點說:
- mysql的binlog是多文件存儲,定位一個LogEvent須要經過binlog filename + binlog position,進行定位
- mysql的binlog數據格式,按照生成的方式,主要分爲:statement-based、row-based、mixed。
- mysql> show variables like 'binlog_format';
- +---------------+-------+
- | Variable_name | Value |
- +---------------+-------+
- | binlog_format | ROW |
- +---------------+-------+
- 1 row in set (0.00 sec)
目前canal只能支持row模式的增量訂閱(statement只有sql,沒有數據,因此沒法獲取原始的變動日誌)
EventParser設計
大體過程:
整個parser過程大體可分爲幾步:
- Connection獲取上一次解析成功的位置 (若是第一次啓動,則獲取初始指定的位置或者是當前數據庫的binlog位點)
- Connection創建連接,發送BINLOG_DUMP指令
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name
- Mysql開始推送Binaly Log
- 接收到的Binaly Log的經過Binlog parser進行協議解析,補充一些特定信息
// 補充字段名字,字段類型,主鍵信息,unsigned類型處理
- 傳遞給EventSink模塊進行數據存儲,是一個阻塞操做,直到存儲成功
- 存儲成功後,定時記錄Binaly Log位置
mysql的Binlay Log網絡協議:
說明:
EventSink設計
說明:
- 數據過濾:支持通配符的過濾模式,表名,字段內容等
- 數據路由/分發:解決1:n (1個parser對應多個store的模式)
- 數據歸併:解決n:1 (多個parser對應1個store)
- 數據加工:在進入store以前進行額外的處理,好比join
數據1:n業務
爲了合理的利用數據庫資源, 通常常見的業務都是按照schema進行隔離,而後在mysql上層或者dao這一層面上,進行一個數據源路由,屏蔽數據庫物理位置對開發的影響,阿里系主要是經過cobar/tddl來解決數據源路由問題。
因此,通常一個數據庫實例上,會部署多個schema,每一個schema會有由1個或者多個業務方關注
數據n:1業務
一樣,當一個業務的數據規模達到必定的量級後,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數據須要處理時,就須要連接多個store進行處理,消費的位點就會變成多份,並且數據消費的進度沒法獲得儘量有序的保證。
因此,在必定業務場景下,須要將拆分後的增量數據進行歸併處理,好比按照時間戳/全局id進行排序歸併.
EventStore設計
- 1. 目前僅實現了Memory內存模式,後續計劃增長本地file存儲,mixed混合模式
- 2. 借鑑了Disruptor的RingBuffer的實現思路
RingBuffer設計:
定義了3個cursor
- Put : Sink模塊進行數據存儲的最後一次寫入位置
- Get : 數據訂閱獲取的最後一次提取位置
- Ack : 數據消費成功的最後一次消費位置
借鑑Disruptor的RingBuffer的實現,將RingBuffer拉直來看:
實現說明:
- Put/Get/Ack cursor用於遞增,採用long型存儲
- buffer的get操做,經過取餘或者與操做。(與操做: cusor & (size - 1) , size須要爲2的指數,效率比較高)
Instance設計
instance表明了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。
抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:
- manager方式: 和你本身的內部web console/manager系統進行對接。(目前主要是公司內部使用)
- spring方式:基於spring xml + properties進行定義,構建spring配置.
Server設計
server表明了一個canal的運行實例,爲了方便組件化使用,特地抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現
- Embeded : 對latency和可用性都有比較高的要求,本身又能hold住分佈式的相關技術(好比failover)
- Netty : 基於netty封裝了一層網絡協議,由canal server保證其可用性,採用的pull模型,固然latency會稍微打點折扣,不過這個也視狀況而定。(阿里系的notify和metaq,典型的 push/pull模型,目前也逐步的在向pull模型靠攏,push在數據量大的時候會有一些問題)
增量訂閱/消費設計
具體的協議格式,可參見:CanalProtocol.proto
get/ack/rollback協議介紹:
- Message getWithoutAck(int batchSize),容許指定batchSize,一次能夠獲取多條,每次返回的對象爲Message,包含的內容爲:
a. batch id 惟一標識
b. entries 具體的數據對象,對應的數據對象格式:EntryProtocol.proto
- void rollback(long batchId),顧命思議,回滾上次的get請求,從新獲取數據。基於get獲取的batchId進行提交,避免誤操做
- void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操做
canal的get/ack/rollback協議和常規的jms協議有所不一樣,容許get/ack異步處理,好比能夠連續調用get屢次,後續異步按順序提交ack/rollback,項目中稱之爲流式api.
流式api設計的好處:
- get/ack異步化,減小因ack帶來的網絡延遲和操做成本 (99%的狀態都是處於正常狀態,異常的rollback屬於個別狀況,不必爲個別的case犧牲整個性能)
- get獲取數據後,業務消費存在瓶頸或者須要多進程/多線程消費時,能夠不停的輪詢get數據,不停的日後發送任務,提升並行化. (做者在實際業務中的一個case:業務數據消費須要跨中美網絡,因此一次操做基本在200ms以上,爲了減小延遲,因此須要實施並行化)
流式api設計:
- 每次get操做都會在meta中產生一個mark,mark標記會遞增,保證運行過程當中mark的惟一性
- 每次的get操做,都會在上一次的mark操做記錄的cursor繼續日後取,若是mark不存在,則在last ack cursor繼續日後取
- 進行ack時,須要按照mark的順序進行數序ack,不能跳躍ack. ack會刪除當前的mark標記,並將對應的mark位置更新爲last ack cusor
- 一旦出現異常狀況,客戶端可發起rollback狀況,從新置位:刪除全部的mark, 清理get請求位置,下次請求會從last ack cursor繼續日後取
- Entry
- Header
- logfileName [binlog文件名]
- logfileOffset [binlog position]
- executeTime [發生的變動]
- schemaName
- tableName
- eventType [insert/update/delete類型]
- entryType [事務頭BEGIN/事務尾END/數據ROWDATA]
- storeValue [byte數據,可展開,對應的類型爲RowChange]
-
- RowChange
- isDdl [是不是ddl變動操做,好比create table/drop table]
- sql [具體的ddl sql]
- rowDatas [具體insert/update/delete的變動數據,可爲多條,1個binlog event事件可對應多條變動,好比批處理]
- beforeColumns [Column類型的數組]
- afterColumns [Column類型的數組]
-
- Column
- index
- sqlType [jdbc type]
- name [column name]
- isKey [是否爲主鍵]
- updated [是否發生過變動]
- isNull [值是否爲null]
- value [具體的內容,注意爲文本]
說明:
- 能夠提供數據庫變動前和變動後的字段內容,針對binlog中沒有的name,isKey等信息進行補全
- 能夠提供ddl的變動語句
HA機制設計
canal的ha分爲兩部分,canal server和canal client分別有對應的ha實現
- canal server: 爲了減小對mysql dump的請求,不一樣server上的instance要求同一時間只能有一個處於running,其餘的處於standby狀態.
- canal client: 爲了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操做,不然客戶端接收沒法保證有序。
整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命週期綁定),能夠看下我以前zookeeper的相關文章。
Canal Server:
大體步驟:
- canal server要啓動某個canal instance時都先向zookeeper進行一次嘗試啓動判斷 (實現:建立EPHEMERAL節點,誰建立成功就容許誰啓動)
- 建立zookeeper節點成功後,對應的canal server就啓動對應的canal instance,沒有建立成功的canal instance就會處於standby狀態
- 一旦zookeeper發現canal server A建立的節點消失後,當即通知其餘的canal server再次進行步驟1的操做,從新選出一個canal server啓動instance.
- canal client每次進行connect時,會首先向zookeeper詢問當前是誰啓動了canal instance,而後和其創建連接,一旦連接不可用,會從新嘗試connect.
Canal Client的方式和canal server方式相似,也是利用zokeeper的搶佔EPHEMERAL節點的方式進行控制.
最後
項目的代碼: https://github.com/alibabatech/canal
這裏給出瞭如何快速啓動Canal Server和Canal Client的例子,若有問題可隨時聯繫
Quick Start
Client Example