背景:html
1. 一些項目的基礎功能會有Audit Trace, 以記錄系統用戶所作過的全部記錄。mysql
2. 實時備份數據,好比mysql主從複製,一個用於面向應用,一個用於對應用數據庫的實時備份。git
3. 實時收集關係型數據庫變動,將數據保存在nosql數據庫中,以提供快速檢索,一個較爲實用的場景就是實現地將mysql數據變動同步到elastic search 或者 mongo db。github
下面,將介紹如何經過canal,將mysql 數據變動同步到elastic search 。面試
首先咱們瞭解一下什麼是canal?sql
原理相對比較簡單:api
安裝步驟:nosql
訪問:https://github.com/alibaba/canal/releases ,會列出全部歷史的發佈版本包 下載方式,好比以1.0.17版本爲例子:elasticsearch
wget https://github.com/alibaba/canal/releases/download/canal-1.0.17/canal.deployer-1.0.17.tar.gz mkdir /tmp/canal tar zxvf canal.deployer-1.0.17.tar.gz -C /tmp/canal
配置修改
vi conf/example/instance.properties
################################################# ## mysql serverId canal.instance.mysql.slaveId = 1234 #position info,須要改爲本身的數據庫信息 canal.instance.master.address = 127.0.0.1:3306 canal.instance.master.journal.name = canal.instance.master.position = canal.instance.master.timestamp = #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #username/password,須要改爲本身的數據庫信息 canal.instance.dbUsername = canal canal.instance.dbPassword = canal canal.instance.defaultDatabaseName = canal.instance.connectionCharset = UTF-8 #table regex canal.instance.filter.regex = .\.. #################################################
準備啓動
sh bin/startup.sh
查看日誌
vi logs/canal/canal.log
關閉
sh bin/stop.sh
下面試下在代碼中,獲取到mysql變動:
首先安裝下 canal 客戶端 nuget包
Install-Package CanalSharp.Client
static void Main(string[] args) { //canal 配置的 destination,默認爲 example var destination = "example"; //建立一個簡單 CanalClient 鏈接對象(此對象不支持集羣)傳入參數分別爲 canal 地址、端口、destination、用戶名、密碼 var connector = CanalConnectors.NewSingleConnector("192.168.1.23", 11111, destination, "", ""); //鏈接 Canal connector.Connect(); //訂閱,同時傳入 Filter。Filter是一種過濾規則,經過該規則的表數據變動纔會傳遞過來 //容許全部數據 .*\\..* //容許某個庫數據 庫名\\..* //容許某些表 庫名.表名,庫名.表名 connector.Subscribe(".*\\..*"); while (true) { //獲取數據 1024表示數據大小 單位爲字節 var message = connector.Get(1024); //批次id 可用於回滾 var batchId = message.Id; if (batchId == -1 || message.Entries.Count <= 0) { Thread.Sleep(300); continue; } PrintEntry(message.Entries); } } /// <summary> /// 輸出數據 /// </summary> /// <param name="entrys">一個entry表示一個數據庫變動</param> private static void PrintEntry(List<Entry> entrys) { foreach (var entry in entrys) { if (entry.EntryType == EntryType.Transactionbegin || entry.EntryType == EntryType.Transactionend) { continue; } RowChange rowChange = null; try { //獲取行變動 rowChange = RowChange.Parser.ParseFrom(entry.StoreValue); } catch (Exception e) { Console.WriteLine(e.Message); } if (rowChange != null) { //by the changed entry's table name and record id. get the changed order(full info with any children records) form mysql and save it to es. //to do it, boys ! //變動類型 insert/update/delete 等等 EventType eventType = rowChange.EventType; //輸出binlog信息 表名 數據庫名 變動類型 Console.WriteLine( $"================> binlog[{entry.Header.LogfileName}:{entry.Header.LogfileOffset}] , name[{entry.Header.SchemaName},{entry.Header.TableName}] , eventType :{eventType}"); //輸出 insert/update/delete 變動類型列數據 foreach (var rowData in rowChange.RowDatas) { if (eventType == EventType.Delete) { PrintColumn(rowData.BeforeColumns.ToList()); } else if (eventType == EventType.Insert) { PrintColumn(rowData.AfterColumns.ToList()); } else { Console.WriteLine("-------> before"); PrintColumn(rowData.BeforeColumns.ToList()); Console.WriteLine("-------> after"); PrintColumn(rowData.AfterColumns.ToList()); } } } } } /// <summary> /// 輸出每一個列的詳細數據 /// </summary> /// <param name="columns"></param> private static void PrintColumn(List<Column> columns) { foreach (var column in columns) { //輸出列明 列值 是否變動 Console.WriteLine($"{column.Name} : {column.Value} update= {column.Updated}"); } } }
運行代碼,去到數據庫中改一下某行數據:
能夠看到咱們代碼收集到變動信息:
本篇就介紹到這裏了, 至於如何將變動同步到es,那是屬於es操做的範疇,可參考 https://www.elastic.co/guide/en/elasticsearch/client/net-api/current/elasticsearch-net.html
以上內容源於: