【轉載請註明出處】:https://segmentfault.com/a/1190000022767293html
基於數據庫增量日誌解析,提供增量數據訂閱&消費,目前主要支持了mysql。
早期,阿里巴巴B2B公司由於存在杭州和美國雙機房部署,存在跨機房同步的業務需求。不過早期的數據庫同步業務,主要是基於trigger的方式獲取增量變動,不過從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務,今後開啓了一段新紀元。java
ps. 目前內部版本已經支持mysql和oracle部分版本的日誌解析,當前的canal開源版本支持5.7及如下的版本(阿里內部mysql 5.7.13, 5.6.10, mysql 5.5.18和5.1.40/48)mysql
基於日誌增量訂閱&消費支持的業務:git
從上層來看,複製分紅三步:github
原理相對比較簡單:web
說明:正則表達式
instance模塊:spring
大體過程:
整個parser過程大體可分爲幾步:sql
// 0. write command number
// 1. write 4 bytes bin-log position to start at
// 2. write 2 bytes bin-log flags
// 3. write 4 bytes server id of the slave
// 4. write bin-log file name數據庫
// 補充字段名字,字段類型,主鍵信息,unsigned類型處理
mysql的Binlay Log網絡協議:
說明:
https://dev.mysql.com/doc/internals/en/event-structure.html
https://dev.mysql.com/doc/internals/en/binlog-event.html
說明:
數據1:n業務
爲了合理的利用數據庫資源, 通常常見的業務都是按照schema進行隔離,而後在mysql上層或者dao這一層面上,進行一個數據源路由,屏蔽數據庫物理位置對開發的影響,阿里系主要是經過cobar/tddl來解決數據源路由問題。
因此,通常一個數據庫實例上,會部署多個schema,每一個schema會有由1個或者多個業務方關注
數據n:1業務
一樣,當一個業務的數據規模達到必定的量級後,必然會涉及到水平拆分和垂直拆分的問題,針對這些拆分的數據須要處理時,就須要連接多個store進行處理,消費的位點就會變成多份,並且數據消費的進度沒法獲得儘量有序的保證。
因此,在必定業務場景下,須要將拆分後的增量數據進行歸併處理,好比按照時間戳/全局id進行排序歸併.
RingBuffer設計:
定義了3個cursor
借鑑Disruptor的RingBuffer的實現,將RingBuffer拉直來看:
實現說明:
instance表明了一個實際運行的數據隊列,包括了EventPaser,EventSink,EventStore等組件。
抽象了CanalInstanceGenerator,主要是考慮配置的管理方式:
server表明了一個canal的運行實例,爲了方便組件化使用,特地抽象了Embeded(嵌入式) / Netty(網絡訪問)的兩種實現
具體的協議格式,可參見:CanalProtocol.proto
get/ack/rollback協議介紹:
canal的get/ack/rollback協議和常規的jms協議有所不一樣,容許get/ack異步處理,好比能夠連續調用get屢次,後續異步按順序提交ack/rollback,項目中稱之爲流式api.
流式api設計的好處:
流式api設計:
canal的ha分爲兩部分,canal server和canal client分別有對應的ha實現
整個HA機制的控制主要是依賴了zookeeper的幾個特性,watcher和EPHEMERAL節點(和session生命週期綁定),能夠看下我以前zookeeper的相關文章。
Canal Server:
大體步驟:
Canal Client的方式和canal server方式相似,也是利用zookeeper的搶佔EPHEMERAL節點的方式進行控制.
開啓mysql的binlog寫入功能,而且配置binlog模式爲row
[mysqld] log-bin=mysql-bin binlog-format=ROW #選擇row模式 server_id=1 #配置mysql replaction須要定義,不能和canal的slaveId重複
檢查配置是否有效
#查看binlog的開啓狀態及文件名 mysql> show variables like '%log_bin%'; #查看binlog當前的格式 mysql> show variables like '%format%'; #查看binlog文件列表 mysql> show binary logs; #查看binlog的狀態 mysql> show master status;
canal的原理是模擬本身爲mysql slave,因此這裏必定須要作爲mysql slave的相關權限
mysql> CREATE USER canal IDENTIFIED BY 'canal'; mysql> GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; # -- mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; mysql> FLUSH PRIVILEGES;
針對已有的帳戶可經過grants查詢權限:
mysql> show grants for 'canal' ;
方法1: (直接下載)
訪問:https://github.com/alibaba/canal/releases,會列出全部歷史的發佈版本包
當前的最新版本是1.1.3
wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
方法2: (本身編譯)
git clone git@github.com:alibaba/canal.git git co canal-1.1.3 #切換到對應的版本上 mvn clean install -Denv=release
執行完成後,會在canal工程根目錄下生成一個target目錄,裏面會包含一個 canal.deployer-1.1.3.tar.gz
介紹配置以前,先了解下canal的配置加載方式:
canal配置方式有兩種:
spring配置的原理是將整個配置抽象爲兩部分:
經過spring的PropertyPlaceholderConfigurer經過機制將其融合,生成一份instance實例對象,每一個instance對應的組件都是相互獨立的,互不影響
properties配置文件
properties配置分爲兩部分:
canal.properties介紹:
canal配置主要分爲兩部分定義:
instance.properties介紹:
好比:
canal.destinations = example1,example2
這時須要建立example1和example2兩個目錄,每一個目錄裏各自有一份instance.properties.
ps. canal自帶了一份instance.properties demo,可直接複製conf/example目錄進行配置修改
server運行過程當中,會根據canal.auto.scan.interval定義的頻率,進行掃描
instance.xml配置文件
目前默認支持的instance.xml有如下幾種:
在介紹instance配置以前,先了解一下canal如何維護一份增量訂閱&消費的關係信息:
對應的兩個位點組件,目前都有幾種實現:
memory-instance.xml:
全部的組件(parser , sink , store)都選擇了內存版模式,記錄位點的都選擇了memory模式,重啓後又會回到初始位點進行解析
**特色:** 速度最快,依賴最少(不須要zookeeper)
場景:通常應用在quickstart,或者是出現問題後,進行數據分析的場景,不該該將其應用於生產環境
default-instance.xml:
store選擇了內存模式,其他的parser/sink依賴的位點管理選擇了持久化模式,目前持久化的方式主要是寫入zookeeper,保證數據集羣共享.
特色: 支持HA
場景: 生產環境,集羣化部署.
group-instance.xml:
主要針對須要進行多庫合併時,能夠將多個物理instance合併爲一個邏輯instance,提供客戶端訪問。
**場景:** 分庫業務。 好比產品數據拆分了4個庫,每一個庫會有一個instance,若是不用group,業務上要消費數據時,須要啓動4個客戶端,分別連接4個instance實例。使用group後,能夠在canal server上合併爲一個邏輯instance,只須要啓動1個客戶端,連接這個邏輯instance便可.
instance.xml設計初衷:
容許進行自定義擴展,好比實現了基於數據庫的位點管理後,能夠自定義一份本身的instance.xml,整個canal設計中最大的靈活性在於此
修改canal.properties
canal.zkServers =127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183 canal.destinations = example #當前server上部署的instance列表 canal.instance.global.spring.xml = classpath:spring/default-instance.xml
修改instance.properties
canal.instance.mysql.slaveId=1234 #mysql集羣配置中的serverId概念,須要保證和當前mysql集羣中id惟一 (v1.1.x版本以後canal會自動生成,不須要手工指定) canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.filter.regex=.*\\..* #mysql 數據解析關注的表,Perl正則表達式.多個正則之間以逗號(,)分隔,轉義符須要雙斜槓(\\)
注意: 其餘機器上的instance目錄的名字須要保證徹底一致,HA模式是依賴於instance name進行管理,同時必須都選擇default-instance.xml配置,canal.instance.mysql.slaveId應該惟一。
執行啓動腳本startup.sh
,啓動後,你能夠查看logs/example/example.log,只會看到一臺機器上出現了啓動成功的日誌,其餘的處於standby狀態。
建立mvn工程,修改pom.xml,添加依賴:
<dependency> <groupId>com.alibaba.otter</groupId> <artifactId>canal.client</artifactId> <version>1.1.3</version> </dependency>
CanalClientTest代碼
package com.stepper.canalclient; import java.net.InetSocketAddress; import java.util.List; import com.alibaba.otter.canal.client.CanalConnectors; import com.alibaba.otter.canal.client.CanalConnector; import com.alibaba.otter.canal.common.utils.AddressUtils; import com.alibaba.otter.canal.protocol.Message; import com.alibaba.otter.canal.protocol.CanalEntry.Column; import com.alibaba.otter.canal.protocol.CanalEntry.Entry; import com.alibaba.otter.canal.protocol.CanalEntry.EntryType; import com.alibaba.otter.canal.protocol.CanalEntry.EventType; import com.alibaba.otter.canal.protocol.CanalEntry.RowChange; import com.alibaba.otter.canal.protocol.CanalEntry.RowData; public class CanalClientTest { public static void main(String args[]) { String zkServers="127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183"; String destination="example"; CanalConnector connector = CanalConnectors.newClusterConnector(zkServers,destination,"",""); int batchSize = 1000; int emptyCount = 0; try { connector.connect(); connector.subscribe(".*\\..*"); // connector.rollback(); int totalEmptyCount = 120; while (emptyCount < totalEmptyCount) { Message message = connector.getWithoutAck(batchSize); // 獲取指定數量的數據 // System.out.println(message.toString()); long batchId = message.getId(); int size = message.getEntries().size(); if (batchId == -1 || size == 0) { emptyCount++; System.out.println("empty count : " + emptyCount); try { Thread.sleep(1000); } catch (InterruptedException e) { } } else { emptyCount = 0; // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size); printEntry(message.getEntries()); } connector.ack(batchId); // 提交確認 // connector.rollback(batchId); // 處理失敗, 回滾數據 } System.out.println("empty too many times, exit"); } finally { connector.disconnect(); } } private static void printEntry(List<Entry> entrys) { for (Entry entry : entrys) { if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) { continue; } RowChange rowChage = null; try { rowChage = RowChange.parseFrom(entry.getStoreValue()); } catch (Exception e) { throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(), e); } EventType eventType = rowChage.getEventType(); System.out.println(String.format("================ binlog[%s:%s] , name[%s,%s] , eventType : %s", entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName(), eventType)); for (RowData rowData : rowChage.getRowDatasList()) { if (eventType == EventType.DELETE) { printColumn(rowData.getBeforeColumnsList()); } else if (eventType == EventType.INSERT) { printColumn(rowData.getAfterColumnsList()); } else { System.out.println("-------> before"); printColumn(rowData.getBeforeColumnsList()); System.out.println("-------> after"); printColumn(rowData.getAfterColumnsList()); } } } } private static void printColumn(List<Column> columns) { for (Column column : columns) { System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated()); } } }
啓動Canal Client後,操做數據庫變動數據便可從控制檯從看到消息。
更多參數及介紹能夠參考官方wiki文檔.
注意:
【轉載請註明出處】: https://segmentfault.com/a/1190000022767293