大約兩年之前,筆者在一個項目中遇到了數據同步的難題。mysql
當時,系統部署了幾十個實例,分爲1箇中心平臺和N個分中心平臺,而每個系統都對應一個單獨的數據庫實例。git
在數據庫層面,有這樣一個需求:github
這幾十個數據庫實例之間,沒有明確的主從關係,是否同步還要看數據的來源,因此並不能用MySQL的主從同步來作。sql
當時,筆者實驗了幾種方式,最後採用的方式是基於Mybatis攔截器機制 + 消息隊列的方式來作的。數據庫
大概原理是經過Mybatis攔截器,攔截到事務操做,好比新增、修改和刪除,根據自定義的數據主鍵(標識數據來源和去向),封裝成對象,投遞到消息隊列對應的topic中去。而後,每一個系統監聽不一樣的topic,消費數據並同步到數據庫。緩存
在此後的一段時間裏,知道了canal這個開源組件。發現它更直接,它能夠從MySQL的binlog中解析數據,投遞到消息隊列或其它地方。bash
提及canal,也是阿里巴巴存在數據同步的業務需求。因此從2010年開始,阿里系公司開始逐步的嘗試基於數據庫的日誌解析,獲取增量變動進行同步,由此衍生出了增量訂閱&消費的業務。服務器
基於日誌增量訂閱&消費支持的業務:測試
咱們正能夠基於canal的機制,來完成一系列如數據同步、緩存刷新等業務。ui
對於自建的MySQL服務, 須要先開啓 Binlog 寫入功能,配置 binlog-format 爲 ROW 模式,my.cnf 中配置以下:
[mysqld]
log-bin=mysql-bin # 開啓 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 須要定義,不要和 canal 的 slaveId 重複
複製代碼
而後建立一個帳戶,用來連接MySQL,做爲 MySQL slave 的權限。
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
複製代碼
下載canal很是簡單,訪問 releases頁面選擇須要的包下載,而後將下載的包解壓到指定的目錄便可。
tar -zxvf canal.deployer-1.1.4.tar.gz -C /canal
解壓完成後,咱們能夠看到這樣一個目錄:
在啓動以前,還須要修改一些配置信息。
首先,定位到canal/conf/example
,編輯instance.properties
配置文件,重點有幾項:
canal.instance.mysql.slaveId=1234 # canal模擬slaveid
canal.instance.master.address=127.0.0.1:3306 # MySQL數據庫地址
canal.instance.dbUsername=canal # 做爲slave角色的帳戶
canal.instance.dbPassword=canal # 做爲slave角色的帳戶密碼
canal.instance.connectionCharset = UTF-8 # 數據庫編碼方式對應Java中的編碼類型
canal.instance.filter.regex=.*\\..* # 表過濾的表達式
canal.mq.topic=example # MQ 主題名稱
複製代碼
咱們但願canal監聽到的數據,要發送到消息隊列中,還須要修改canal.properties
文件,在這裏主要是MQ的配置。在這裏筆者使用的是阿里雲版RocketMQ,參數以下:
# 配置ak/sk
canal.aliyun.accessKey = XXX
canal.aliyun.secretKey = XXX
# 配置topic
canal.mq.accessChannel = cloud
canal.mq.servers = 內網接入點
canal.mq.producerGroup = GID_**group(在後臺建立)
canal.mq.namespace = rocketmq實例id
canal.mq.topic=(在後臺建立)
複製代碼
直接運行啓動腳本便可運行:./canal/bin/startup.sh
。 而後打開logs/canal/canal.log
文件,能夠看到啓動效果。
2020-02-26 21:12:36.715 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2020-02-26 21:12:36.746 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.44.128(192.168.44.128):11111]
2020-02-26 21:12:37.406 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
複製代碼
咱們把canal監聽到的數據,投送到了消息隊列中,那麼接下來就是寫個監聽程序來消費其中的數據。
爲了方便,筆者直接使用的是阿里雲版RocketMQ,測試代碼以下:
public static void main(String[] args) {
Properties properties = new Properties();
// 您在控制檯建立的 Group ID
properties.put(PropertyKeyConst.GROUP_ID, "GID_CANAL");
// AccessKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立
properties.put(PropertyKeyConst.AccessKey, "accessKey");
// SecretKey 阿里雲身份驗證,在阿里雲服務器管理控制檯建立
properties.put(PropertyKeyConst.SecretKey, "secretKey");
// 設置 TCP 接入域名,到控制檯的實例基本信息中查看
properties.put(PropertyKeyConst.NAMESRV_ADDR,"http://MQ_INST_xxx.mq-internet.aliyuncs.com:80");
// 集羣訂閱方式(默認)
// properties.put(PropertyKeyConst.MessageModel, PropertyValueConst.CLUSTERING);
Consumer consumer = ONSFactory.createConsumer(properties);
consumer.subscribe("example","*",new CanalListener());
consumer.start();
logger.info("Consumer Started");
}
複製代碼
把環境都部署好以後,咱們進入測試階段來看一看實際效果。
咱們以一張t_account
表爲例,這裏面記錄着帳戶id和帳戶餘額。
首先,咱們新增一條記錄,insert into t_account (id,user_id,amount) values (4,4,200);
此時,MQ消費到數據以下:
{
"data": [{
"id": "4",
"user_id": "4",
"amount": "200.0"
}],
"database": "seata",
"es": 1582723607000,
"id": 2,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"user_id": "varchar(255)",
"amount": "double(14,2)"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"user_id": 12,
"amount": 8
},
"table": "t_account",
"ts": 1582723607656,
"type": "INSERT"
}
複製代碼
經過數據能夠看到,這裏面詳細記錄了數據庫的名稱、表的名稱、表的字段和新增數據的內容等。
而後,咱們還能夠把這條數據修改一下:update t_account set amount = 150 where id = 4;
此時,MQ消費到數據以下:
{
"data": [{
"id": "4",
"user_id": "4",
"amount": "150.0"
}],
"database": "seata",
"es": 1582724016000,
"id": 3,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"user_id": "varchar(255)",
"amount": "double(14,2)"
},
"old": [{
"amount": "200.0"
}],
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"user_id": 12,
"amount": 8
},
"table": "t_account",
"ts": 1582724016353,
"type": "UPDATE"
}
複製代碼
能夠看到,除了修改後的內容,canal還用old
字段記錄了修改前字段的值。
最後,咱們刪除這條數據:delete from t_account where id = 4;
相應的,MQ消費到數據以下:
{
"data": [{
"id": "4",
"user_id": "4",
"amount": "150.0"
}],
"database": "seata",
"es": 1582724155000,
"id": 4,
"isDdl": false,
"mysqlType": {
"id": "int(11)",
"user_id": "varchar(255)",
"amount": "double(14,2)"
},
"old": null,
"pkNames": ["id"],
"sql": "",
"sqlType": {
"id": 4,
"user_id": 12,
"amount": 8
},
"table": "t_account",
"ts": 1582724155370,
"type": "DELETE"
}
複製代碼
監聽到數據庫表的變化以後,就能夠根據本身的業務場景,對這些數據進行業務上的處理啦。
能夠看到,利用canal組件能夠很方便的完成對數據變化的監聽。若是利用消息隊列來作數據同步的話,只有一點須要格外注意,即消息順序性的問題。
binlog自己是有序的,但寫入到mq以後如何保障順序是值得關注的問題。
在mq順序性問題這裏,能夠看到canal的消費順序性相關解答。