昨日瀏覽公衆文章時,偶爾發現阿里開源的這款軟件,初步瞭解,是基於mysql binarylog的增量發佈訂閱服務。網上也有客戶端針對.net平臺的支持。java
下面關於canal的介紹,蒐集了網上的一些資料,可供你們參考學習之用。mysql
canal的介紹git
canal是阿里巴巴旗下的一款開源項目,純Java開發。基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了MySQL(也支持mariaDB)。github
起源:早期,阿里巴巴B2B公司由於存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變動,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務,今後開啓了一段新紀元。web
基於日誌增量訂閱&消費支持的業務:redis
- 數據庫鏡像
- 數據庫實時備份
- 多級索引 (賣家和買家各自分庫索引)
- search build
- 業務cache刷新
- 價格變化等重要業務消息
工做原理
mysql主備複製實現:spring
從上層來看,複製分紅三步:sql
- master將改變記錄到二進制日誌(binary log)中(這些記錄叫作二進制日誌事件,binary log events,能夠經過show binlog events進行查看);
- slave將master的binary log events拷貝到它的中繼日誌(relay log);
- slave重作中繼日誌中的事件,將改變反映它本身的數據。
canal的工做原理
原理相對比較簡單:數據庫
- canal模擬mysql slave的交互協議,假裝本身爲mysql slave,向mysql master發送dump協議
- mysql master收到dump請求,開始推送binary log給slave(也就是canal)
- canal解析binary log對象(原始爲byte流)
架構設計
我的理解,數據增量訂閱與消費應當有以下幾個點:api
- 增量訂閱和消費模塊應當包括binlog日誌抓取,binlog日誌解析,事件分發過濾(EventSink),存儲(EventStore)等主要模塊。
- 若是須要確保HA能夠採用Zookeeper保存各個子模塊的狀態,讓整個增量訂閱和消費模塊實現無狀態化,固然做爲consumer(客戶端)的狀態也能夠保存在zk之中。
- 總體上經過一個Manager System進行集中管理,分配資源。
能夠參考下圖:
canal架構設計
說明:
- server表明一個canal運行實例,對應於一個jvm
- instance對應於一個數據隊列 (1個server對應1..n個instance)
instance模塊:
- eventParser (數據源接入,模擬slave協議和master進行交互,協議解析)
- eventSink (Parser和Store連接器,進行數據過濾,加工,分發的工做)
- eventStore (數據存儲)
- metaManager (增量訂閱&消費信息管理器)
EventParser
整個parser過程大體可分爲幾部:
- Connection獲取上一次解析成功的位置(若是第一次啓動,則獲取初始制定的位置或者是當前數據庫的binlog位點)
- Connection創建鏈接,發生BINLOG_DUMP命令
- Mysql開始推送Binary Log
- 接收到的Binary Log經過Binlog parser進行協議解析,補充一些特定信息
- 傳遞給EventSink模塊進行數據存儲,是一個阻塞操做,直到存儲成功
- 存儲成功後,定時記錄Binary Log位置
EventSink設計
說明:
- 數據過濾:支持通配符的過濾模式,表名,字段內容等
- 數據路由/分發:解決1:n (1個parser對應多個store的模式)
- 數據歸併:解決n:1 (多個parser對應1個store)
- 數據加工:在進入store以前進行額外的處理,好比join
1 數據1:n業務 :
爲了合理的利用數據庫資源, 通常常見的業務都是按照schema進行隔離,而後在mysql上層或者dao這一層面上,進行一個數據源路由,屏蔽數據庫物理位置對開發的影響,阿里系主要是經過cobar/tddl來解決數據源路由問題。 因此,通常一個數據庫實例上,會部署多個schema,每一個schema會有由1個或者多個業務方關注。
2 數據n:1業務:
一樣,當一個業務的數據規模達到必定的量級後,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數據須要處理時,就須要連接多個store進行處理,消費的位點就會變成多份,並且數據消費的進度沒法獲得儘量有序的保證。 因此,在必定業務場景下,須要將拆分後的增量數據進行歸併處理,好比按照時間戳/全局id進行排序歸併.
EventStore設計
目前實現了Memory內存、本地file存儲以及持久化到zookeeper以保障數據集羣共享。
Memory內存的RingBuffer設計:
定義了3個cursor
- Put : Sink模塊進行數據存儲的最後一次寫入位置
- Get : 數據訂閱獲取的最後一次提取位置
- Ack : 數據消費成功的最後一次消費位置
借鑑Disruptor的RingBuffer的實現,將RingBuffer拉直來看:
實現說明:
- Put/Get/Ack cursor用於遞增,採用long型存儲
- buffer的get操做,經過取餘或者與操做。(與操做: cusor & (size – 1) , size須要爲2的指數,效率比較高)
Instance設計
instance表明了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。
抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:
1. manager方式: 和你本身的內部web console/manager系統進行對接。(alibaba內部使用方式)
2. spring方式:基於spring xml + properties進行定義,構建spring配置.
- spring/memory-instance.xml 全部的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啓後又會回到初始位點進行解析。特色:速度最快,依賴最少
- spring/file-instance.xml 全部的組件(parser , sink , store)都選擇了基於file持久化模式,注意,不支持HA機制.支持單機持久化
- spring/default-instance.xml 全部的組件(parser , sink , store)都選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集羣共享. 支持HA
- spring/group-instance.xml 主要針對須要進行多庫合併時,能夠將多個物理instance合併爲一個邏輯instance,提供客戶端訪問。場景:分庫業務。 好比產品數據拆分了4個庫,每一個庫會有一個instance,若是不用group,業務上要消費數據時,須要啓動4個客戶端,分別連接4個instance實例。使用group後,能夠在canal server上合併爲一個邏輯instance,只須要啓動1個客戶端,連接這個邏輯instance便可.
Server設計
server表明了一個canal的運行實例,爲了方便組件化使用,特地抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現:
- Embeded : 對latency和可用性都有比較高的要求,本身又能hold住分佈式的相關技術(好比failover)
- Netty : 基於netty封裝了一層網絡協議,由canal server保證其可用性,採用的pull模型,固然latency會稍微打點折扣,不過這個也視狀況而定。
增量訂閱/消費設計
具體的協議格式,可參見:CanalProtocol.proto
get/ack/rollback協議介紹:
- Message getWithoutAck(int batchSize),容許指定batchSize,一次能夠獲取多條,每次返回的對象爲Message,包含的內容爲:
- a. batch id 惟一標識
- b. entries 具體的數據對象,對應的數據對象格式:EntryProtocol.proto
- void rollback(long batchId),顧命思議,回滾上次的get請求,從新獲取數據。基於get獲取的batchId進行提交,避免誤操做
- void ack(long batchId),顧命思議,確認已經消費成功,通知server刪除數據。基於get獲取的batchId進行提交,避免誤操做
- canal的get/ack/rollback協議和常規的jms協議有所不一樣,容許get/ack異步處理,好比能夠連續調用get屢次,後續異步按順序提交ack/rollback,項目中稱之爲流式api.
- 流式api設計的好處:
- get/ack異步化,減小因ack帶來的網絡延遲和操做成本 (99%的狀態都是處於正常狀態,異常的rollback屬於個別狀況,不必爲個別的case犧牲整個性能)
- get獲取數據後,業務消費存在瓶頸或者須要多進程/多線程消費時,能夠不停的輪詢get數據,不停的日後發送任務,提升並行化. (做者在實際業務中的一個case:業務數據消費須要跨中美網絡,因此一次操做基本在200ms以上,爲了減小延遲,因此須要實施並行化)
流式api設計:
- 每次get操做都會在meta中產生一個mark,mark標記會遞增,保證運行過程當中mark的惟一性
- 每次的get操做,都會在上一次的mark操做記錄的cursor繼續日後取,若是mark不存在,則在last ack cursor繼續日後取
- 進行ack時,須要按照mark的順序進行數序ack,不能跳躍ack. ack會刪除當前的mark標記,並將對應的mark位置更新爲last ack cusor
- 一旦出現異常狀況,客戶端可發起rollback狀況,從新置位:刪除全部的mark, 清理get請求位置,下次請求會從last ack cursor繼續日後取
數據同步方案選擇
針對上文的需求,通過思考,初步有以下的一些方案:
- 代碼實現 針對代碼中進行數據庫的增刪改操做時,同時進行elasticsearch的增刪改操做。
- mybatis實現 經過mybatis plugin進行實現,截取sql語句進行分析, 針對insert、update、delete的語句進行處理。顯然,這些操做若是都是單條數據的操做,是很容易處理的。可是,實際開發中,老是會有一些批量的更新或者刪除操做,這時候,就很難進行處理了。
- Aop實現 無論是經過哪一種Aop方式,根據制定的規則,如規範方法名,註解等進行切面處理,但依然仍是會出現沒法處理批量操做數據的問題。
- logstash logstash相似的同步組件提供的文件和數據同步的功能,能夠進行數據的同步,只須要簡單的配置就能將mysql數據同步到elasticsearch,可是logstash的原理是每秒進行一次增量數據查詢,將結果同步到elasticsearch,實時性要求特別高的,可能沒法知足要求。且此方案的性能不是很好,形成資源的浪費。
實現方式 |
優缺點 |
代碼實現 |
技術難度低,侵入性強,實時性高 |
基於mybatis |
有必定的技術難度,可是沒法覆蓋全部的場景 |
Aop實現 |
技術難度低,半侵入性,須要規範代碼,依然沒法覆蓋全部的場景 |
logstash |
技術難度低,無侵入性,無需開發,但會形成資源浪費。 |
那麼是否有什麼更好的方式進行處理嗎?mysql binlog同步,實時性強,對於應用無任何侵入性,且性能更好,不會形成資源浪費,那麼就有了我今天的主角——canal
canal
介紹
canal 是阿里巴巴的一個開源項目,基於java實現,總體已經在不少大型的互聯網項目生產環境中使用,包括阿里、美團等都有普遍的應用,是一個很是成熟的數據庫同步方案,基礎的使用只須要進行簡單的配置便可。 canal是經過模擬成爲mysql 的slave的方式,監聽mysql 的binlog日誌來獲取數據,binlog設置爲row模式之後,不只能獲取到執行的每個增刪改的腳本,同時還能獲取到修改前和修改後的數據,基於這個特性,canal就能高性能的獲取到mysql數據數據的變動。
使用
canal的介紹在官網有很是詳細的說明,若是想了解更多,你們能夠移步官網(https://github.com/alibaba/canal)瞭解。我這裏補充下使用中不太容易理解部分。 canal的部署主要分爲server端和client端。 server端部署好之後,能夠直接監聽mysql binlog,由於server端是把本身模擬成了mysql slave,因此,只能接受數據,沒有進行任何邏輯的處理,具體的邏輯處理,須要client端進行處理。 client端通常是須要你們進行簡單的開發。https://github.com/alibaba/canal/wiki/ClientAPI 有一個簡單的示例,很容易理解。
canal Adapter
爲了便於你們的使用,官方作了一個獨立的組件Adapter,Adapter是能夠將canal server端獲取的數據轉換成幾個經常使用的中間件數據源,如今支持kafka、rocketmq、hbase、elasticsearch,針對這幾個中間件的支持,直接配置便可,無需開發。上文中,若是須要將mysql的數據同步到elasticsearch,直接運行 canal Adapter,修改相關的配置便可。
常見問題
- 沒法接收到數據,程序也沒有報錯? 必定要確保mysql的binlog模式爲row模式,canal原理是解析Binlog文件,而且直接中文件中獲取數據的。
- Adapter 使用沒法同步數據? 按照官方文檔,檢查配置項,如sql的大小寫,字段的大小寫可能都會有影響,若是還沒法搞定,能夠本身獲取代碼調試下,Adapter的代碼仍是比較容易看懂的。
canal Adapter elasticsearch 改造
由於有了canal和canal Adapter這個神器,同步到elasticsearch、hbase等問題都解決了,可是本身的開發的過程當中發現,Adapter使用仍是有些問題,由於先使用的是elasticsearch同步功能,因此對elasticsearch進行了一些改造:
elasticsearch初始化
一個全新的elasticsearch沒法使用,由於沒有建立elasticsearch index和mapping,增長了對應的功能。 elasticsearch配置文件mapping節點增長兩個參數:
enablefieldmap: true
fieldmap:
id: "text"
BuildingId: "text"
HouseNum: "text"
Floors: "text"
IdProjectInfo: "text"
HouseDigitNum: "text"
BuildingNum: "text"
BuildingName: "text"
Name: "text"
projectid: "text"
bIdProjectInfo: "text"
cinitid: "text"
pCommunityId: "text"
enablefieldmap 是否須要自動生成fieldmap,默認爲false,若是須要啓動的時候就生成這設置爲true,而且設置 fieldmap,相似elasticsearch mapping中每一個字段的類型。
esconfig bug處理
代碼中獲取binlog的日誌處理時,必需要獲取數據庫名,可是當獲取binlog爲type query時,是沒法獲取 數據庫名的,此處有bug,致使出現 "Outer adapter write failed" ,且未輸出錯誤日誌,修復此bug.
後續計劃
- 增長rabbit MQ的支持
- 增長redis的支持
源碼
源碼地址:https://github.com/itmifen/canal