近段時間,業務系統架構基本完備,數據層面的建設比較薄弱,由於筆者目前工做重心在於搭建一個小型的數據平臺。優先級比較高的一個任務就是須要近實時同步業務系統的數據(包括保存、更新或者軟刪除)到一個另外一個數據源,持久化以前須要清洗數據而且構建一個相對合理的便於後續業務數據統計、標籤系統構建等擴展功能的數據模型。基於當前團隊的資源和能力,優先調研了Alibaba
開源中間件Canal
的使用。java
這篇文章簡單介紹一下如何快速地搭建一套Canal
相關的組件。node
下面的簡介和下一節的原理均來自於Canal項目的README
:mysql
Canal[kə'næl]
,譯意爲水道/管道/溝渠,主要用途是基於MySQL
數據庫增量日誌解析,提供增量數據訂閱和消費。早期阿里巴巴由於杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務trigger
獲取增量變動。從 2010 年開始,業務逐步嘗試數據庫日誌解析獲取增量變動進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。git
基於日誌增量訂閱和消費的業務包括:github
Cache
刷新MySQL
主備複製原理:spring
MySQL
的Master
實例將數據變動寫入二進制日誌(binary log
,其中記錄叫作二進制日誌事件binary log events
,能夠經過show binlog events
進行查看)MySQL
的Slave
實例將master
的binary log events
拷貝到它的中繼日誌(relay log
)MySQL
的Slave
實例重放relay log
中的事件,將數據變動反映它到自身的數據Canal
的工做原理以下:sql
Canal
模擬MySQL Slave
的交互協議,假裝本身爲MySQL Slave
,向MySQL Master
發送dump
協議MySQL Master
收到dump
請求,開始推送binary log
給Slave
(即Canal
)Canal
解析binary log
對象(原始爲byte
流),而且能夠經過鏈接器發送到對應的消息隊列等中間件中截止筆者開始編寫本文的時候(2020-03-05
),Canal
的最新發布版本是v1.1.5-alpha-1
(2019-10-09
發佈的),最新的正式版是v1.1.4
(2019-09-02
發佈的)。其中,v1.1.4
主要添加了鑑權、監控的功能,而且作了一些列的性能優化,此版本集成的鏈接器是Tcp
、Kafka
和RockerMQ
。而v1.1.5-alpha-1
版本已經新增了RabbitMQ
鏈接器,可是此版本的RabbitMQ
鏈接器暫時不能定義鏈接RabbitMQ
的端口號,不過此問題已經在master
分支中修復(具體能夠參看源碼中的CanalRabbitMQProducer
類的提交記錄)。換言之,v1.1.4
版本中目前能使用的內置鏈接器只有Tcp
、Kafka
和RockerMQ
三種,若是想嚐鮮使用RabbitMQ
鏈接器,能夠選用下面的兩種方式之一:shell
v1.1.5-alpha-1
版本,可是沒法修改RabbitMQ
的port
屬性,默認爲5672
。master
分支自行構建Canal
。目前,Canal
項目的活躍度比較高,可是考慮到功能的穩定性問題,筆者建議選用穩定版本在生產環境中實施,當前能夠選用v1.1.4
版本,本文的例子用選用的就是v1.1.4
版本,配合Kafka
鏈接器使用。Canal
主要包括三個核心部件:數據庫
canal-admin
:後臺管理模塊,提供面向WebUI
的Canal
管理能力。canal-adapter
:適配器,增長客戶端數據落地的適配及啓動功能,包括REST
、日誌適配器、關係型數據庫的數據同步(表對錶同步)、HBase
數據同步、ES
數據同步等等。canal-deployer
:發佈器,核心功能所在,包括binlog
解析、轉換和發送報文到鏈接器中等等功能都由此模塊提供。通常狀況下,canal-deployer
部件是必須的,其餘兩個部件按需選用便可。apache
搭建一套能夠用的組件須要部署MySQL
、Zookeeper
、Kafka
和Canal
四個中間件的實例,下面簡單分析一下部署過程。選用的虛擬機系統是CentOS7
。
爲了簡單起見,選用yum
源安裝(官方連接是https://dev.mysql.com/downloads/repo/yum
):
mysql80-community-release-el7-3雖然包名帶了mysql80關鍵字,其實已經集成了MySQL主流版本5.六、5.7和8.x等等的最新安裝包倉庫
選用的是最新版的MySQL8.x
社區版,下載CentOS7
適用的rpm包
:
cd /data/mysql
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// 下載完畢以後
sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm
複製代碼
此時列舉一下yum
倉庫裏面的MySQL
相關的包:
[root@localhost mysql]# yum repolist all | grep mysql
mysql-cluster-7.5-community/x86_64 MySQL Cluster 7.5 Community disabled
mysql-cluster-7.5-community-source MySQL Cluster 7.5 Community - disabled
mysql-cluster-7.6-community/x86_64 MySQL Cluster 7.6 Community disabled
mysql-cluster-7.6-community-source MySQL Cluster 7.6 Community - disabled
mysql-cluster-8.0-community/x86_64 MySQL Cluster 8.0 Community disabled
mysql-cluster-8.0-community-source MySQL Cluster 8.0 Community - disabled
mysql-connectors-community/x86_64 MySQL Connectors Community enabled: 141
mysql-connectors-community-source MySQL Connectors Community - disabled
mysql-tools-community/x86_64 MySQL Tools Community enabled: 105
mysql-tools-community-source MySQL Tools Community - Sourc disabled
mysql-tools-preview/x86_64 MySQL Tools Preview disabled
mysql-tools-preview-source MySQL Tools Preview - Source disabled
mysql55-community/x86_64 MySQL 5.5 Community Server disabled
mysql55-community-source MySQL 5.5 Community Server - disabled
mysql56-community/x86_64 MySQL 5.6 Community Server disabled
mysql56-community-source MySQL 5.6 Community Server - disabled
mysql57-community/x86_64 MySQL 5.7 Community Server disabled
mysql57-community-source MySQL 5.7 Community Server - disabled
mysql80-community/x86_64 MySQL 8.0 Community Server enabled: 161
mysql80-community-source MySQL 8.0 Community Server - disabled
複製代碼
編輯/etc/yum.repos.d/mysql-community.repo
文件([mysql80-community]
塊中enabled設置爲1
,其實默認就是這樣子,不用改,若是要選用5.x
版本則須要修改對應的塊):
[mysql80-community]
name=MySQL 8.0 Community Server
baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql
複製代碼
而後安裝MySQL
服務:
sudo yum install mysql-community-server
複製代碼
這個過程比較漫長,由於須要下載和安裝5個rpm
安裝包(或者是全部安裝包組合的壓縮包mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar
)。若是網絡比較差,也能夠直接從官網手動下載後安裝:
// 下載下面5個rpm包 common --> libs --> libs-compat --> client --> server
mysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server
// 強制安裝
rpm -ivh mysql-community-common-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-client-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-server-8.0.18-1.el7.x86_64.rpm --force --nodeps
複製代碼
安裝完畢以後,啓動MySQL
服務,而後搜索MySQL
服務的root
帳號的臨時密碼用於首次登錄(mysql -u root -p
):
// 啓動服務,關閉服務就是service mysqld stop
service mysqld start
// 查看臨時密碼 cat /var/log/mysqld.log
[root@localhost log]# cat /var/log/mysqld.log
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li
2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834
// 登陸臨時root用戶,使用臨時密碼
[root@localhost log]# mysql -u root -p
複製代碼
接下來作下面的操做:
root
用戶的密碼:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';
(注意密碼規則必須包含大小寫字母、數字和特殊字符)root
的host
,切換數據庫use mysql;
,指定host
爲%
以即可以讓其餘服務器遠程訪問UPDATE USER SET HOST = '%' WHERE USER = 'root';
'root'@'%'
用戶,全部權限,執行GRANT ALL PRIVILEGES ON *.* TO 'root'@'%';
root'@'%
用戶的密碼校驗規則以即可以使用Navicat
等工具訪問:ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';
操做完成以後,就可使用root
用戶遠程訪問此虛擬機上的MySQL
服務。最後確認是否開啓了binlog
(注意一點是MySQL8.x
默認開啓binlog
)SHOW VARIABLES LIKE '%bin%';
:
最後在MySQL
的Shell
執行下面的命令,新建一個用戶名canal
密碼爲QWqw12!@
的新用戶,賦予REPLICATION SLAVE
和 REPLICATION CLIENT
權限:
CREATE USER canal IDENTIFIED BY 'QWqw12!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';
複製代碼
切換回去root
用戶,建立一個數據庫test
:
CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;
複製代碼
Canal
和Kafka
集羣都依賴於Zookeeper
作服務協調,爲了方便管理,通常會獨立部署Zookeeper
服務或者Zookeeper
集羣。筆者這裏選用2020-03-04
發佈的3.6.0
版本:
midkr /data/zk
# 建立數據目錄
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg
複製代碼
把zoo.cfg
文件中的dataDir
設置爲/data/zk/data
,而後啓動Zookeeper
:
[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
複製代碼
這裏注意一點,要啓動此版本的Zookeeper
服務必須本地安裝好JDK8+
,這一點須要自行處理。啓動的默認端口是2181
,啓動成功後的日誌以下:
Kafka
是一個高性能分佈式消息隊列中間件,它的部署依賴於Zookeeper
。筆者在此選用2.4.0
而且Scala
版本爲2.13
的安裝包:
mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz
複製代碼
因爲解壓後/data/kafka/kafka_2.13-2.4.0/config/server.properties
配置中對應的zookeeper.connect=localhost:2181
已經符合須要,沒必要修改,須要修改日誌文件的目錄log.dirs
爲/data/kafka/data
。而後啓動Kafka
服務:
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties
複製代碼
這樣啓動一旦退出控制檯就會結束Kafka
進程,能夠添加-daemon
參數用於控制Kafka
進程後臺不掛斷運行。
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties
複製代碼
終於到了主角登場,這裏選用Canal
的v1.1.4
穩定發佈版,只須要下載deployer
模塊:
mkdir /data/canal
cd /data/canal
# 這裏注意一點,Github在國內被牆,下載速度極慢,能夠先用其餘下載工具下載完再上傳到服務器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz
複製代碼
解壓後的目錄以下:
- bin # 運維腳本
- conf # 配置文件
canal_local.properties # canal本地配置,通常不須要動
canal.properties # canal服務配置
logback.xml # logback日誌配置
metrics # 度量統計配置
spring # spring-實例配置,主要和binlog位置計算、一些策略配置相關,能夠在canal.properties選用其中的任意一個配置文件
example # 實例配置文件夾,通常認爲單個數據庫對應一個獨立的實例配置文件夾
instance.properties # 實例配置,通常指單個數據庫的配置
- lib # 服務依賴包
- logs # 日誌文件輸出目錄
複製代碼
在開發和測試環境建議把logback.xml
的日誌級別修改成DEBUG
方便定位問題。這裏須要關注canal.properties
和instance.properties
兩個配置文件。canal.properties
文件中,須要修改:
canal.instance.parser.parallelThreadSize = 16
這個配置項的註釋,也就是啓用此配置項,和實例解析器的線程數相關,不配置會表現爲阻塞或者不進行解析。canal.serverMode
配置項指定爲kafka
,可選值有tcp
、kafka
和rocketmq
(master
分支或者最新的的v1.1.5-alpha-1
版本,能夠選用rabbitmq
),默認是kafka
。canal.mq.servers
配置須要指定爲Kafka
服務或者集羣Broker
的地址,這裏配置爲127.0.0.1:9092
。canal.mq.servers在不一樣的canal.serverMode有不一樣的意義。 kafka模式下,指Kafka服務或者集羣Broker的地址,也就是bootstrap.servers rocketmq模式下,指NameServer列表 rabbitmq模式下,指RabbitMQ服務的Host和Port
其餘配置項能夠參考下面兩個官方Wiki
的連接:
instance.properties
通常指一個數據庫實例的配置,Canal
架構支持一個Canal
服務實例,處理多個數據庫實例的binlog
異步解析。instance.properties
須要修改的配置項主要包括:
canal.instance.mysql.slaveId
須要配置一個和Master
節點的服務ID
徹底不一樣的值,這裏筆者配置爲654321
。
配置數據源實例,包括地址、用戶、密碼和目標數據庫:
canal.instance.master.address
,這裏指定爲127.0.0.1:3306
。canal.instance.dbUsername
,這裏指定爲canal
。canal.instance.dbPassword
,這裏指定爲QWqw12!@
。canal.instance.defaultDatabaseName
,這裏指定爲test
(須要在MySQL
中創建一個test
數據庫,見前面的流程)。Kafka
相關配置,這裏暫時使用靜態topic
和單個partition
:
canal.mq.topic
,這裏指定爲test
,也就是解析完的binlog
結構化數據會發送到Kafka
的命名爲test
的topic
中。
canal.mq.partition
,這裏指定爲0
。配置工做作好以後,能夠啓動Canal
服務:
sh /data/canal/bin/startup.sh
# 查看服務日誌
tail -100f /data/canal/logs/canal/canal
# 查看實例日誌 -- 通常狀況下,關注實例日誌便可
tail -100f /data/canal/logs/example/example.log
複製代碼
啓動正常後,見實例日誌以下:
在test
數據庫建立一個訂單表,而且執行幾個簡單的DML
:
use `test`;
CREATE TABLE `order`
(
id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
order_id VARCHAR(64) NOT NULL COMMENT '訂單ID',
amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',
create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',
UNIQUE uniq_order_id (`order_id`)
) COMMENT '訂單表';
INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);
UPDATE `order` SET amount = 10087 WHERE order_id = '10086';
DELETE FROM `order` WHERE order_id = '10086';
複製代碼
這個時候,能夠利用Kafka
的kafka-console-consumer
或者Kafka Tools
查看test
這個topic
的數據:
sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
複製代碼
具體的數據以下:
// test數據庫建庫腳本
{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}
// order表建表DDL
{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\n(\n id BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',\n order_id VARCHAR(64) NOT NULL COMMENT '訂單ID',\n amount DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',\n create_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',\n UNIQUE uniq_order_id (`order_id`)\n) COMMENT '訂單表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}
// INSERT
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}
// UPDATE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}
// DELETE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}
複製代碼
可見Kafka
的名爲test
的topic
已經寫入了對應的結構化binlog
事件數據,能夠編寫消費者監聽Kafka
對應的topic
而後對獲取到的數據進行後續處理。
這篇文章大部分篇幅用於介紹其餘中間件是怎麼部署的,這個問題側面說明了Canal
自己部署並不複雜,它的配置文件屬性項比較多,可是實際上須要自定義和改動的配置項是比較少的,也就是說明了它的運維成本和學習成本並不高。後面會分析基於結構化binlog
事件作ELT
和持久化相關工做以及Canal
的生產環境可用級別HA
集羣的搭建。
若是你們喜歡個人文章,能夠關注我的訂閱號。歡迎隨時留言、交流。若是想加入微信羣的話一塊兒討論的話,請加管理員簡棧文化-小助手(lastpass4u),他會拉大家進羣。