Canal支持MySQL增量數據訂閱&消費

https://github.com/alibaba/canalnode

概述

canal是阿里巴巴旗下的一款開源項目,純Java開發。基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。mysql

起源:早期,阿里巴巴B2B公司由於存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變動,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務,今後開啓了一段新紀元。git

基於日誌增量訂閱&消費支持的業務:github

  1. 數據庫鏡像
  2. 數據庫實時備份
  3. 多級索引 (賣家和買家各自分庫索引)
  4. search build
  5. 業務cache刷新
  6. 價格變化等重要業務消息

工做原理

mysql主備複製實現:web

從上層來看,複製分紅三步:spring

  1. master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);
  2. slave將master的binary log events拷貝到它的中繼日誌(relay log);
  3. slave重作中繼日誌中的事件,將改變反映它本身的數據。

canal的工做原理

原理相對比較簡單:sql

  1. canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
  2. mysql master收到dump請求,開始推送binary log給slave(也就是canal)
  3. canal解析binary log對象(原始爲byte流)

架構設計

我的理解,數據增量訂閱與消費應當有以下幾個點:docker

  1. 增量訂閱和消費模塊應當包括binlog日誌抓取,binlog日誌解析,事件分發過濾(EventSink),存儲(EventStore)等主要模塊。
  2. 若是須要確保HA能夠採用Zookeeper保存各個子模塊的狀態,讓整個增量訂閱和消費模塊實現無狀態化,固然做爲consumer(客戶端)的狀態也能夠保存在zk之中。
  3. 總體上經過一個Manager System進行集中管理,分配資源。

能夠參考下圖:數據庫

canal架構設計

說明:api

  • server表明一個canal運行實例,對應於一個jvm
  • instance對應於一個數據隊列 (1個server對應1..n個instance)

instance模塊:

  • eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
  • eventSink (Parser和Store連接器,進行數據過濾,加工,分發的工做)
  • eventStore (數據存儲)
  • metaManager (增量訂閱&消費信息管理器)

EventParser

整個parser過程大體可分爲幾部:

  1. Connection獲取上一次解析成功的位置(若是第一次啓動,則獲取初始制定的位置或者是當前數據庫的binlog位點)
  2. Connection創建鏈接,發生BINLOG_DUMP命令
  3. Mysql開始推送Binary Log
  4. 接收到的Binary Log經過Binlog parser進行協議解析,補充一些特定信息
  5. 傳遞給EventSink模塊進行數據存儲,是一個阻塞操做,直到存儲成功
  6. 存儲成功後,定時記錄Binary Log位置

EventSink設計

說明:

  • 數據過濾:支持通配符的過濾模式,表名,字段內容等
  • 數據路由/分發:解決1:n (1個parser對應多個store的模式)
  • 數據歸併:解決n:1 (多個parser對應1個store)
  • 數據加工:在進入store以前進行額外的處理,好比join

1 數據1:n業務 :

爲了合理的利用數據庫資源, 通常常見的業務都是按照schema進行隔離,而後在mysql上層或者dao這一層面上,進行一個數據源路由,屏蔽數據庫物理位置對開發的影響,阿里系主要是經過cobar/tddl來解決數據源路由問題。 因此,通常一個數據庫實例上,會部署多個schema,每一個schema會有由1個或者多個業務方關注。

2 數據n:1業務:

一樣,當一個業務的數據規模達到必定的量級後,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數據須要處理時,就須要連接多個store進行處理,消費的位點就會變成多份,並且數據消費的進度沒法獲得儘量有序的保證。 因此,在必定業務場景下,須要將拆分後的增量數據進行歸併處理,好比按照時間戳/全局id進行排序歸併.

EventStore設計

目前實現了Memory內存、本地file存儲以及持久化到zookeeper以保障數據集羣共享。
Memory內存的RingBuffer設計:

定義了3個cursor

  • Put : Sink模塊進行數據存儲的最後一次寫入位置
  • Get : 數據訂閱獲取的最後一次提取位置
  • Ack : 數據消費成功的最後一次消費位置

借鑑Disruptor的RingBuffer的實現,將RingBuffer拉直來看:

實現說明:

  • Put/Get/Ack cursor用於遞增,採用long型存儲
  • buffer的get操做,經過取餘或者與操做。(與操做: cusor & (size – 1) , size須要爲2的指數,效率比較高)

Instance設計

instance表明了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。
抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:

1. manager方式: 和你本身的內部web console/manager系統進行對接。(alibaba內部使用方式)

2. spring方式:基於spring xml + properties進行定義,構建spring配置.

  • spring/memory-instance.xml 全部的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啓後又會回到初始位點進行解析。特色:速度最快,依賴最少
  • spring/file-instance.xml 全部的組件(parser , sink , store)都選擇了基於file持久化模式,注意,不支持HA機制.支持單機持久化
  • spring/default-instance.xml 全部的組件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集羣共享. 支持HA
  • spring/group-instance.xml 主要針對須要進行多庫合併時,能夠將多個物理instance合併爲一個邏輯instance,提供客戶端訪問。場景:分庫業務。 好比產品數據拆分了4個庫,每一個庫會有一個instance,若是不用group,業務上要消費數據時,須要啓動4個客戶端,分別連接4個instance實例。使用group後,能夠在canal server上合併爲一個邏輯instance,只須要啓動1個客戶端,連接這個邏輯instance便可.

Server設計

server表明了一個canal的運行實例,爲了方便組件化使用,特地抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現:

  • Embeded : 對latency和可用性都有比較高的要求,本身又能hold住分佈式的相關技術(好比failover)
  • Netty : 基於netty封裝了一層網絡協議,由canal server保證其可用性,採用的pull模型,固然latency會稍微打點折扣,不過這個也視狀況而定。

增量訂閱/消費設計

具體的協議格式,可參見:CanalProtocol.proto
get/ack/rollback協議介紹:

  • Message getWithoutAck(int batchSize),容許指定batchSize,一次能夠獲取多條,每次返回的對象爲Message,包含的內容爲:
  • a. batch id 惟一標識
  • b. entries 具體的數據對象,對應的數據對象格式:EntryProtocol.proto
  • void rollback(long batchId),顧命思議,回滾上次的get請求,從新獲取數據。基於get獲取的batchId進行提交,避免誤操做
  • void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操做
  • canal的get/ack/rollback協議和常規的jms協議有所不一樣,容許get/ack異步處理,好比能夠連續調用get屢次,後續異步按順序提交ack/rollback,項目中稱之爲流式api.
  • 流式api設計的好處:
  • get/ack異步化,減小因ack帶來的網絡延遲和操做成本 (99%的狀態都是處於正常狀態,異常的rollback屬於個別狀況,不必爲個別的case犧牲整個性能)
  • get獲取數據後,業務消費存在瓶頸或者須要多進程/多線程消費時,能夠不停的輪詢get數據,不停的日後發送任務,提升並行化. (做者在實際業務中的一個case:業務數據消費須要跨中美網絡,因此一次操做基本在200ms以上,爲了減小延遲,因此須要實施並行化)

流式api設計:

  • 每次get操做都會在meta中產生一個mark,mark標記會遞增,保證運行過程當中mark的惟一性
  • 每次的get操做,都會在上一次的mark操做記錄的cursor繼續日後取,若是mark不存在,則在last ack cursor繼續日後取
  • 進行ack時,須要按照mark的順序進行數序ack,不能跳躍ack. ack會刪除當前的mark標記,並將對應的mark位置更新爲last ack cusor
  • 一旦出現異常狀況,客戶端可發起rollback狀況,從新置位:刪除全部的mark, 清理get請求位置,下次請求會從last ack cursor繼續日後取

數據格式

canal採用protobuff:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Entry
     Header
         logfileName [binlog文件名]
         logfileOffset [binlog position]
         executeTime [發生的變動]
         schemaName
         tableName
         eventType [insert/update/delete類型]
     entryType   [事務頭BEGIN/事務尾END/數據ROWDATA]
     storeValue  [ byte 數據,可展開,對應的類型爲RowChange]   
RowChange
     isDdl       [是不是ddl變動操做,好比create table/drop table]
     sql     [具體的ddl sql]
     rowDatas    [具體insert/update/delete的變動數據,可爲多條, 1 個binlog event事件可對應多條變動,好比批處理]
         beforeColumns [Column類型的數組]
         afterColumns [Column類型的數組]     
Column
     index      
     sqlType     [jdbc type]
     name        [column name]
     isKey       [是否爲主鍵]
     updated     [是否發生過變動]
     isNull      [值是否爲 null ]
     value       [具體的內容,注意爲文本]

canal-message example:

好比數據庫中的表:

1
2
3
4
5
6
7
8
9
mysql> select * from person;
+----+------+------+------+
| id | name | age  | sex  |
+----+------+------+------+
1  | zzh  |   10  | m    |
3  | zzh3 |   12  | f    |
4  | zzh4 |    5  | m    |
+----+------+------+------+
3  rows in  set  ( 0.00  sec)

更新一條數據(update person set age=15 where id=4):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
****************************************************
* Batch Id: [ 2 ] ,count : [ 3 ] , memsize : [ 165 ] , Time : 2016 - 09 - 07  15 : 54 : 18
* Start : [mysql-bin. 000003 : 6354 : 1473234846000 ( 2016 - 09 - 07  15 : 54 : 06 )]
* End : [mysql-bin. 000003 : 6550 : 1473234846000 ( 2016 - 09 - 07  15 : 54 : 06 )]
****************************************************
 
================> binlog[mysql-bin. 000003 : 6354 ] , executeTime : 1473234846000  , delay : 12225ms
  BEGIN ----> Thread id: 67
----------------> binlog[mysql-bin. 000003 : 6486 ] , name[canal_test,person] , eventType : UPDATE , executeTime : 1473234846000  , delay : 12225ms
id : 4     type= int ( 11 )
name : zzh4    type=varchar( 100 )
age : 15     type= int ( 11 )    update= true
sex : m    type= char ( 1 )
----------------
  END ----> transaction id: 308
================> binlog[mysql-bin. 000003 : 6550 ] , executeTime : 1473234846000  , delay : 12240ms

HA機制設計

canal的HA分爲兩部分,canal server和canal client分別有對應的ha實現:

  • canal server: 爲了減小對mysql dump的請求,不一樣server上的instance要求同一時間只能有一個處於running,其餘的處於standby狀態.
  • canal client: 爲了保證有序性,一份instance同一時間只能由一個canal client進行get/ack/rollback操做,不然客戶端接收沒法保證有序。

整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命週期綁定),能夠看下我以前zookeeper的相關文章。

Canal Server:

大體步驟:

  1. canal server要啓動某個canal instance時都先向zookeeper進行一次嘗試啓動判斷 (實現:建立EPHEMERAL節點,誰建立成功就容許誰啓動)
  2. 建立zookeeper節點成功後,對應的canal server就啓動對應的canal instance,沒有建立成功的canal instance就會處於standby狀態
  3. 一旦zookeeper發現canal server A建立的節點消失後,當即通知其餘的canal server再次進行步驟1的操做,從新選出一個canal server啓動instance.
  4. canal client每次進行connect時,會首先向zookeeper詢問當前是誰啓動了canal instance,而後和其創建連接,一旦連接不可用,會從新嘗試connect.
  5. Canal Client的方式和canal server方式相似,也是利用zokeeper的搶佔EPHEMERAL節點的方式進行控制.

HA配置架構圖(舉例)以下所示:

canal其餘連接方式

canal還有幾種鏈接方式:

1. 單連

2. 兩個client+兩個instance+1個mysql

當mysql變更時,兩個client都能獲取到變更

3. 一個server+兩個instance+兩個mysql+兩個client

4. instance的standby配置

總體架構

從總體架構上來講canal是這種架構的(canal中沒有包含一個運維的console web來對接,但要運用於分佈式環境中確定須要一個Manager來管理):

一個整體的manager system對應於n個Canal Server(物理上來講是一臺服務器), 那麼一個Canal Server對應於n個Canal Instance(destinations). 大致上是三層結構,第二層也須要Manager統籌運維管理。

那麼隨着Docker技術的興起,是否能夠試一下下面的架構呢?

  • 一個docker中跑一個instance服務,至關於略去server這一層的概念。
  • Manager System中配置一個instance,直接調取一個docker發佈這個instance,其中包括向這個instance發送配置信息,啓動instance服務.
  • instance在運行過程當中,定時刷新binlog filename+ binlog position的信息至zk。
  • 若是一個instance出現故障,instance自己報錯或者zk感知此node消失,則根據相應的信息,好比上一步保存的binlog filename+binlog position從新開啓一個docker服務,固然這裏能夠適當的加一些重試機制。
  • 當要更新時,相似AB test, 先關閉一個docker,而後開啓新的已更新的替換,按部就班的進行。
  • 當涉及到分表分庫時,多個物理表對應於一個邏輯表,能夠將結果存於一個公共的模塊(好比MQ),或者單獨存取也能夠,具體狀況具體分析
  • 存儲能夠參考canal的多樣化:內存,文件,zk,或者加入至MQ中
  • docker由此以外的工具管理,好比kubernetes
  • 也能夠進一步添加HA的功能,兩個docker對應一個mysql,互爲主備,相似Canal的HA架構。若是時效性不是貼彆強的場景,考慮到成本,此功能能夠不採用。

總結

這裏總結了一下Canal的一些點,僅供參考:

  1. 原理:模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議;mysql master收到dump請求,開始推送binary log給slave(也就是canal);解析binary log對象(原始爲byte流)
  2. 重複消費問題:在消費端解決。
  3. 採用開源的open-replicator來解析binlog
  4. canal須要維護EventStore,能夠存取在Memory, File, zk
  5. canal須要維護客戶端的狀態,同一時刻一個instance只能有一個消費端消費
  6. 數據傳輸格式:protobuff
  7. 支持binlog format 類型:statement, row, mixed. 屢次附加功能只能在row下使用,好比otter
  8. binlog position能夠支持保存在內存,文件,zk中
  9. instance啓動方式:rpc/http; 內嵌
  10. 有ACK機制
  11. 無告警,無監控,這兩個功能都須要對接外部系統
  12. 方便快速部署。
相關文章
相關標籤/搜索