數據同步利器 - canal

前言

大約兩年之前,筆者在一個項目中遇到了數據同步的難題。mysql

當時,系統部署了幾十個實例,分爲1箇中心平臺和N個分中心平臺,而每個系統都對應一個單獨的數據庫實例。git

在數據庫層面,有這樣一個需求:github

  • 中心平臺數據庫要包含全部系統平臺的數據。
  • 分中心數據庫只包含本系統平臺的數據。
  • 在中心平臺能夠新增或修改 中心平臺的數據,但要講數據實時同步到對應的分中心平臺數據庫。

這幾十個數據庫實例之間,沒有明確的主從關係,是否同步還要看數據的來源,因此並不能用MySQL的主從同步來作。sql

當時,筆者實驗了幾種方式,最後採用的方式是基於Mybatis攔截器機制 + 消息隊列的方式來作的。數據庫

大概原理是經過Mybatis攔截器,攔截到事務操做,好比新增、修改和刪除,根據自定義的數據主鍵(標識數據來源和去向),封裝成對象,投遞到消息隊列對應的topic中去。而後,每一個系統監聽不一樣的topic,消費數據並同步到數據庫。緩存

在此後的一段時間裏,知道了canal這個開源組件。發現它更直接,它能夠從MySQL的binlog中解析數據,投遞到消息隊列或其它地方。bash

1、canal簡介

提及canal,也是阿里巴巴存在數據同步的業務需求。因此從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務。服務器

基於日誌增量訂閱&消費支持的業務:測試

  • 數據庫鏡像
  • 數據庫實時備份
  • 多級索引 (賣家和買家各自分庫索引)
  • search build
  • 業務cache刷新
  • 價格變化等重要業務消息

咱們正能夠基於canal的機制,來完成一系列如數據同步、緩存刷新等業務。ui

2、啓動canal

一、修改MySQL配置

對於自建的MySQL服務, 須要先開啓 Binlog 寫入功能,配置 binlog-format 爲 ROW 模式,my.cnf 中配置以下:

[mysqld]
log-bin=mysql-bin # 開啓 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 須要定義,不要和 canal 的 slaveId 重複
複製代碼

而後建立一個帳戶,用來連接MySQL,做爲 MySQL slave 的權限。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
複製代碼

二、下載

下載canal很是簡單,訪問 releases頁面選擇須要的包下載,而後將下載的包解壓到指定的目錄便可。

tar -zxvf canal.deployer-1.1.4.tar.gz -C /canal

解壓完成後,咱們能夠看到這樣一個目錄:

三、修改配置

在啓動以前,還須要修改一些配置信息。

首先,定位到canal/conf/example ,編輯instance.properties配置文件,重點有幾項:

canal.instance.mysql.slaveId=1234               # canal模擬slaveid
canal.instance.master.address=127.0.0.1:3306    # MySQL數據庫地址
canal.instance.dbUsername=canal                 # 做爲slave角色的帳戶
canal.instance.dbPassword=canal                 # 做爲slave角色的帳戶密碼
canal.instance.connectionCharset = UTF-8        # 數據庫編碼方式對應Java中的編碼類型
canal.instance.filter.regex=.*\\..*             # 表過濾的表達式
canal.mq.topic=example                          # MQ 主題名稱
複製代碼

咱們但願canal監聽到的數據,要發送到消息隊列中,還須要修改canal.properties文件,在這裏主要是MQ的配置。在這裏筆者使用的是阿里雲版RocketMQ,參數以下:

# 配置ak/sk
canal.aliyun.accessKey = XXX
canal.aliyun.secretKey = XXX
# 配置topic
canal.mq.accessChannel = cloud
canal.mq.servers = 內網接入點
canal.mq.producerGroup = GID_**group(在後臺建立)
canal.mq.namespace = rocketmq實例id
canal.mq.topic=(在後臺建立)
複製代碼

四、啓動

直接運行啓動腳本便可運行:./canal/bin/startup.sh 。 而後打開logs/canal/canal.log文件,能夠看到啓動效果。

2020-02-26 21:12:36.715 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-02-26 21:12:36.746 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.44.128(192.168.44.128):11111]
2020-02-26 21:12:37.406 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
複製代碼

3、啓動MQ監聽

咱們把canal監聽到的數據,投送到了消息隊列中,那麼接下來就是寫個監聽程序來消費其中的數據。

爲了方便,筆者直接使用的是阿里雲版RocketMQ,測試代碼以下:

public static void main(String[] args) {
	Properties properties = new Properties();
	// 您在控制檯建立的 Group ID
	properties.put(PropertyKeyConst.GROUP_ID, "GID_CANAL");
	// AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立
	properties.put(PropertyKeyConst.AccessKey, "accessKey");
	// SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立
	properties.put(PropertyKeyConst.SecretKey, "secretKey");
	// 設置 TCP 接入域名,到控制檯的實例基本信息中查看
	properties.put(PropertyKeyConst.NAMESRV_ADDR,"http://MQ_INST_xxx.mq-internet.aliyuncs.com:80");
	// 集羣訂閱方式(默認)
	// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
	Consumer consumer = ONSFactory.createConsumer(properties);
	consumer.subscribe("example","*",new CanalListener());
	consumer.start();
	logger.info("Consumer Started");
}
複製代碼

4、測試

把環境都部署好以後,咱們進入測試階段來看一看實際效果。

咱們以一張t_account表爲例,這裏面記錄着帳戶id和帳戶餘額。

首先,咱們新增一條記錄,insert into t_account (id,user_id,amount) values (4,4,200);

此時,MQ消費到數據以下:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "200.0"
	}],
	"database": "seata",
	"es": 1582723607000,
	"id": 2,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": null,
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582723607656,
	"type": "INSERT"
}
複製代碼

經過數據能夠看到,這裏面詳細記錄了數據庫的名稱、表的名稱、表的字段和新增數據的內容等。

而後,咱們還能夠把這條數據修改一下:update t_account set amount = 150 where id = 4;

此時,MQ消費到數據以下:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "150.0"
	}],
	"database": "seata",
	"es": 1582724016000,
	"id": 3,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": [{
		"amount": "200.0"
	}],
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582724016353,
	"type": "UPDATE"
}
複製代碼

能夠看到,除了修改後的內容,canal還用old字段記錄了修改前字段的值。

最後,咱們刪除這條數據:delete from t_account where id = 4;

相應的,MQ消費到數據以下:

{
	"data": [{
		"id": "4",
		"user_id": "4",
		"amount": "150.0"
	}],
	"database": "seata",
	"es": 1582724155000,
	"id": 4,
	"isDdl": false,
	"mysqlType": {
		"id": "int(11)",
		"user_id": "varchar(255)",
		"amount": "double(14,2)"
	},
	"old": null,
	"pkNames": ["id"],
	"sql": "",
	"sqlType": {
		"id": 4,
		"user_id": 12,
		"amount": 8
	},
	"table": "t_account",
	"ts": 1582724155370,
	"type": "DELETE"
}
複製代碼

監聽到數據庫表的變化以後,就能夠根據本身的業務場景,對這些數據進行業務上的處理啦。

5、總結

能夠看到,利用canal組件能夠很方便的完成對數據變化的監聽。若是利用消息隊列來作數據同步的話,只有一點須要格外注意,即消息順序性的問題。

binlog自己是有序的,但寫入到mq以後如何保障順序是值得關注的問題。

mq順序性問題這裏,能夠看到canal的消費順序性相關解答。

相關文章
相關標籤/搜索