基於Canal和Kafka實現MySQL的Binlog近實時同步

前提

近段時間,業務系統架構基本完備,數據層面的建設比較薄弱,由於筆者目前工做重心在於搭建一個小型的數據平臺。優先級比較高的一個任務就是須要近實時同步業務系統的數據(包括保存、更新或者軟刪除)到一個另外一個數據源,持久化以前須要清洗數據而且構建一個相對合理的便於後續業務數據統計、標籤系統構建等擴展功能的數據模型。基於當前團隊的資源和能力,優先調研了Alibaba開源中間件Canal的使用。java

https://user-gold-cdn.xitu.io/2020/3/17/170e8dbf385d294f?w=781&h=524&f=png&s=31778

這篇文章簡單介紹一下如何快速地搭建一套Canal相關的組件。node

關於Canal

簡介

下面的簡介和下一節的原理均來自於Canal項目的READMEmysql

img

Canal[kə'næl],譯意爲水道/管道/溝渠,主要用途是基於MySQL數據庫增量日誌解析,提供增量數據訂閱和消費。早期阿里巴巴由於杭州和美國雙機房部署,存在跨機房同步的業務需求,實現方式主要是基於業務trigger獲取增量變動。從 2010 年開始,業務逐步嘗試數據庫日誌解析獲取增量變動進行同步,由此衍生出了大量的數據庫增量訂閱和消費業務。git

基於日誌增量訂閱和消費的業務包括:github

  • 數據庫鏡像
  • 數據庫實時備份
  • 索引構建和實時維護(拆分異構索引、倒排索引等)
  • 業務Cache刷新
  • 帶業務邏輯的增量數據處理

Canal的工做原理

MySQL主備複製原理:spring

img

  • MySQLMaster實例將數據變動寫入二進制日誌(binary log,其中記錄叫作二進制日誌事件binary log events,能夠經過show binlog events進行查看)
  • MySQLSlave實例將masterbinary log events拷貝到它的中繼日誌(relay log
  • MySQLSlave實例重放relay log中的事件,將數據變動反映它到自身的數據

Canal的工做原理以下:sql

  • Canal模擬MySQL Slave的交互協議,假裝本身爲MySQL Slave,向MySQL Master發送dump協議
  • MySQL Master收到dump請求,開始推送binary logSlave(即Canal
  • Canal解析binary log對象(原始爲byte流),而且能夠經過鏈接器發送到對應的消息隊列等中間件中

關於Canal的版本和部件

截止筆者開始編寫本文的時候(2020-03-05),Canal的最新發布版本是v1.1.5-alpha-12019-10-09發佈的),最新的正式版是v1.1.42019-09-02發佈的)。其中,v1.1.4主要添加了鑑權、監控的功能,而且作了一些列的性能優化,此版本集成的鏈接器是TcpKafkaRockerMQ。而v1.1.5-alpha-1版本已經新增了RabbitMQ鏈接器,可是此版本的RabbitMQ鏈接器暫時不能定義鏈接RabbitMQ的端口號,不過此問題已經在master分支中修復(具體能夠參看源碼中的CanalRabbitMQProducer類的提交記錄)。換言之,v1.1.4版本中目前能使用的內置鏈接器只有TcpKafkaRockerMQ三種,若是想嚐鮮使用RabbitMQ鏈接器,能夠選用下面的兩種方式之一:shell

  • 選用v1.1.5-alpha-1版本,可是沒法修改RabbitMQport屬性,默認爲5672
  • 基於master分支自行構建Canal

目前,Canal項目的活躍度比較高,可是考慮到功能的穩定性問題,筆者建議選用穩定版本在生產環境中實施,當前能夠選用v1.1.4版本,本文的例子用選用的就是v1.1.4版本,配合Kafka鏈接器使用Canal主要包括三個核心部件:數據庫

  • canal-admin:後臺管理模塊,提供面向WebUICanal管理能力。
  • canal-adapter:適配器,增長客戶端數據落地的適配及啓動功能,包括REST、日誌適配器、關係型數據庫的數據同步(表對錶同步)、HBase數據同步、ES數據同步等等。
  • canal-deployer:發佈器,核心功能所在,包括binlog解析、轉換和發送報文到鏈接器中等等功能都由此模塊提供。

通常狀況下,canal-deployer部件是必須的,其餘兩個部件按需選用便可。apache

部署所需的中間件

搭建一套能夠用的組件須要部署MySQLZookeeperKafkaCanal四個中間件的實例,下面簡單分析一下部署過程。選用的虛擬機系統是CentOS7

安裝MySQL

爲了簡單起見,選用yum源安裝(官方連接是https://dev.mysql.com/downloads/repo/yum):

img

mysql80-community-release-el7-3雖然包名帶了mysql80關鍵字,其實已經集成了MySQL主流版本5.六、5.7和8.x等等的最新安裝包倉庫

選用的是最新版的MySQL8.x社區版,下載CentOS7適用的rpm包

cd /data/mysql
wget https://dev.mysql.com/get/mysql80-community-release-el7-3.noarch.rpm
// 下載完畢以後
sudo rpm -Uvh mysql80-community-release-el7-3.noarch.rpm
複製代碼

此時列舉一下yum倉庫裏面的MySQL相關的包:

[root@localhost mysql]# yum repolist all | grep mysql
mysql-cluster-7.5-community/x86_64 MySQL Cluster 7.5 Community   disabled
mysql-cluster-7.5-community-source MySQL Cluster 7.5 Community - disabled
mysql-cluster-7.6-community/x86_64 MySQL Cluster 7.6 Community   disabled
mysql-cluster-7.6-community-source MySQL Cluster 7.6 Community - disabled
mysql-cluster-8.0-community/x86_64 MySQL Cluster 8.0 Community   disabled
mysql-cluster-8.0-community-source MySQL Cluster 8.0 Community - disabled
mysql-connectors-community/x86_64  MySQL Connectors Community    enabled:    141
mysql-connectors-community-source  MySQL Connectors Community -  disabled
mysql-tools-community/x86_64       MySQL Tools Community         enabled:    105
mysql-tools-community-source       MySQL Tools Community - Sourc disabled
mysql-tools-preview/x86_64         MySQL Tools Preview           disabled
mysql-tools-preview-source         MySQL Tools Preview - Source  disabled
mysql55-community/x86_64           MySQL 5.5 Community Server    disabled
mysql55-community-source           MySQL 5.5 Community Server -  disabled
mysql56-community/x86_64           MySQL 5.6 Community Server    disabled
mysql56-community-source           MySQL 5.6 Community Server -  disabled
mysql57-community/x86_64           MySQL 5.7 Community Server    disabled
mysql57-community-source           MySQL 5.7 Community Server -  disabled
mysql80-community/x86_64           MySQL 8.0 Community Server    enabled:    161
mysql80-community-source           MySQL 8.0 Community Server -  disabled
複製代碼

編輯/etc/yum.repos.d/mysql-community.repo文件([mysql80-community]塊中enabled設置爲1,其實默認就是這樣子,不用改,若是要選用5.x版本則須要修改對應的塊):

[mysql80-community]
name=MySQL 8.0 Community Server
baseurl=http://repo.mysql.com/yum/mysql-8.0-community/el/7/$basearch/
enabled=1
gpgcheck=1
gpgkey=file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql
複製代碼

而後安裝MySQL服務:

sudo yum install mysql-community-server
複製代碼

這個過程比較漫長,由於須要下載和安裝5個rpm安裝包(或者是全部安裝包組合的壓縮包mysql-8.0.18-1.el7.x86_64.rpm-bundle.tar)。若是網絡比較差,也能夠直接從官網手動下載後安裝:

img

// 下載下面5個rpm包 common --> libs --> libs-compat --> client --> server
mysql-community-common
mysql-community-libs
mysql-community-libs-compat
mysql-community-client
mysql-community-server

// 強制安裝
rpm -ivh mysql-community-common-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-libs-compat-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-client-8.0.18-1.el7.x86_64.rpm --force --nodeps
rpm -ivh mysql-community-server-8.0.18-1.el7.x86_64.rpm --force --nodeps
複製代碼

安裝完畢以後,啓動MySQL服務,而後搜索MySQL服務的root帳號的臨時密碼用於首次登錄(mysql -u root -p):

// 啓動服務,關閉服務就是service mysqld stop
service mysqld start
// 查看臨時密碼 cat /var/log/mysqld.log
[root@localhost log]# cat /var/log/mysqld.log 
2020-03-02T06:03:53.996423Z 0 [System] [MY-013169] [Server] /usr/sbin/mysqld (mysqld 8.0.18) initializing of server in progress as process 22780
2020-03-02T06:03:57.321447Z 5 [Note] [MY-010454] [Server] A temporary password is generated for root@localhost: >kjYaXENK6li
2020-03-02T06:04:00.123845Z 0 [System] [MY-010116] [Server] /usr/sbin/mysqld (mysqld 8.0.18) starting as process 22834
// 登陸臨時root用戶,使用臨時密碼
[root@localhost log]# mysql -u root -p
複製代碼

接下來作下面的操做:

  • 修改root用戶的密碼:ALTER USER 'root'@'localhost' IDENTIFIED BY 'QWqw12!@';(注意密碼規則必須包含大小寫字母、數字和特殊字符)
  • 更新roothost,切換數據庫use mysql;,指定host%以即可以讓其餘服務器遠程訪問UPDATE USER SET HOST = '%' WHERE USER = 'root';
  • 賦予'root'@'%'用戶,全部權限,執行GRANT ALL PRIVILEGES ON *.* TO 'root'@'%';
  • 改變root'@'%用戶的密碼校驗規則以即可以使用Navicat等工具訪問:ALTER USER 'root'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';

https://user-gold-cdn.xitu.io/2020/3/17/170e8dbf3d130414?w=934&h=606&f=png&s=61774

操做完成以後,就可使用root用戶遠程訪問此虛擬機上的MySQL服務。最後確認是否開啓了binlog(注意一點是MySQL8.x默認開啓binlogSHOW VARIABLES LIKE '%bin%';

https://user-gold-cdn.xitu.io/2020/3/17/170e8dbff332497f?w=682&h=613&f=png&s=39387

最後在MySQLShell執行下面的命令,新建一個用戶名canal密碼爲QWqw12!@的新用戶,賦予REPLICATION SLAVEREPLICATION CLIENT權限:

CREATE USER canal IDENTIFIED BY 'QWqw12!@';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;
ALTER USER 'canal'@'%' IDENTIFIED WITH mysql_native_password BY 'QWqw12!@';
複製代碼

切換回去root用戶,建立一個數據庫test

CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`;
複製代碼

安裝Zookeeper

CanalKafka集羣都依賴於Zookeeper作服務協調,爲了方便管理,通常會獨立部署Zookeeper服務或者Zookeeper集羣。筆者這裏選用2020-03-04發佈的3.6.0版本:

midkr /data/zk
# 建立數據目錄
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg
複製代碼

zoo.cfg文件中的dataDir設置爲/data/zk/data,而後啓動Zookeeper

[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
複製代碼

這裏注意一點,要啓動此版本的Zookeeper服務必須本地安裝好JDK8+,這一點須要自行處理。啓動的默認端口是2181,啓動成功後的日誌以下:

https://user-gold-cdn.xitu.io/2020/3/17/170e8dbff466e610?w=1191&h=518&f=png&s=97157

安裝Kafka

Kafka是一個高性能分佈式消息隊列中間件,它的部署依賴於Zookeeper。筆者在此選用2.4.0而且Scala版本爲2.13的安裝包:

mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz
複製代碼

因爲解壓後/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中對應的zookeeper.connect=localhost:2181已經符合須要,沒必要修改,須要修改日誌文件的目錄log.dirs/data/kafka/data。而後啓動Kafka服務:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties
複製代碼

https://user-gold-cdn.xitu.io/2020/3/17/170e8dbff50d4a53?w=1103&h=569&f=png&s=109619

這樣啓動一旦退出控制檯就會結束Kafka進程,能夠添加-daemon參數用於控制Kafka進程後臺不掛斷運行。

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties
複製代碼

安裝和使用Canal

終於到了主角登場,這裏選用Canalv1.1.4穩定發佈版,只須要下載deployer模塊:

mkdir /data/canal
cd /data/canal
# 這裏注意一點,Github在國內被牆,下載速度極慢,能夠先用其餘下載工具下載完再上傳到服務器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
tar -zxvf canal.deployer-1.1.4.tar.gz
複製代碼

解壓後的目錄以下:

- bin   # 運維腳本
- conf  # 配置文件
  canal_local.properties  # canal本地配置,通常不須要動
  canal.properties        # canal服務配置
  logback.xml             # logback日誌配置
  metrics                 # 度量統計配置
  spring                  # spring-實例配置,主要和binlog位置計算、一些策略配置相關,能夠在canal.properties選用其中的任意一個配置文件
  example                 # 實例配置文件夾,通常認爲單個數據庫對應一個獨立的實例配置文件夾
    instance.properties   # 實例配置,通常指單個數據庫的配置
- lib   # 服務依賴包
- logs  # 日誌文件輸出目錄
複製代碼

在開發和測試環境建議把logback.xml的日誌級別修改成DEBUG方便定位問題。這裏須要關注canal.propertiesinstance.properties兩個配置文件。canal.properties文件中,須要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16這個配置項的註釋,也就是啓用此配置項,和實例解析器的線程數相關,不配置會表現爲阻塞或者不進行解析。
  • canal.serverMode配置項指定爲kafka,可選值有tcpkafkarocketmqmaster分支或者最新的的v1.1.5-alpha-1版本,能夠選用rabbitmq),默認是kafka
  • canal.mq.servers配置須要指定爲Kafka服務或者集羣Broker的地址,這裏配置爲127.0.0.1:9092

canal.mq.servers在不一樣的canal.serverMode有不一樣的意義。 kafka模式下,指Kafka服務或者集羣Broker的地址,也就是bootstrap.servers rocketmq模式下,指NameServer列表 rabbitmq模式下,指RabbitMQ服務的Host和Port

其餘配置項能夠參考下面兩個官方Wiki的連接:

instance.properties通常指一個數據庫實例的配置,Canal架構支持一個Canal服務實例,處理多個數據庫實例的binlog異步解析。instance.properties須要修改的配置項主要包括:

  • canal.instance.mysql.slaveId須要配置一個和Master節點的服務ID徹底不一樣的值,這裏筆者配置爲654321

  • 配置數據源實例,包括地址、用戶、密碼和目標數據庫:

    • canal.instance.master.address,這裏指定爲127.0.0.1:3306
    • canal.instance.dbUsername,這裏指定爲canal
    • canal.instance.dbPassword,這裏指定爲QWqw12!@
    • 新增canal.instance.defaultDatabaseName,這裏指定爲test(須要在MySQL中創建一個test數據庫,見前面的流程)。
  • Kafka相關配置,這裏暫時使用靜態topic和單個partition

  • canal.mq.topic,這裏指定爲test也就是解析完的binlog結構化數據會發送到Kafka的命名爲testtopic

    • canal.mq.partition,這裏指定爲0

配置工做作好以後,能夠啓動Canal服務:

sh /data/canal/bin/startup.sh 
# 查看服務日誌
tail -100f /data/canal/logs/canal/canal
# 查看實例日誌 -- 通常狀況下,關注實例日誌便可
tail -100f /data/canal/logs/example/example.log
複製代碼

啓動正常後,見實例日誌以下:

https://user-gold-cdn.xitu.io/2020/3/17/170e8dbff7ce4057?w=1702&h=231&f=png&s=55030

test數據庫建立一個訂單表,而且執行幾個簡單的DML

use `test`;

CREATE TABLE `order`
(
    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',
    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',
    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',
    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',
    UNIQUE uniq_order_id (`order_id`)
) COMMENT '訂單表';

INSERT INTO `order`(order_id, amount) VALUES ('10086', 999);
UPDATE `order` SET amount = 10087 WHERE order_id = '10086';
DELETE  FROM `order` WHERE order_id = '10086';
複製代碼

這個時候,能夠利用Kafkakafka-console-consumer或者Kafka Tools查看test這個topic的數據:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test
複製代碼

https://user-gold-cdn.xitu.io/2020/3/17/170e8dc001ce6c66?w=1698&h=341&f=png&s=81813

具體的數據以下:

// test數據庫建庫腳本
{"data":null,"database":"`test`","es":1583143732000,"id":1,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE DATABASE `test` CHARSET `utf8mb4` COLLATE `utf8mb4_unicode_ci`","sqlType":null,"table":"","ts":1583143930177,"type":"QUERY"}

// order表建表DDL
{"data":null,"database":"test","es":1583143957000,"id":2,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"CREATE TABLE `order`\n(\n    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主鍵',\n    order_id    VARCHAR(64)    NOT NULL COMMENT '訂單ID',\n    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '訂單金額',\n    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '建立時間',\n    UNIQUE uniq_order_id (`order_id`)\n) COMMENT '訂單表'","sqlType":null,"table":"order","ts":1583143958045,"type":"CREATE"}

// INSERT
{"data":[{"id":"1","order_id":"10086","amount":"999.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143969000,"id":3,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143969460,"type":"INSERT"}

// UPDATE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143974000,"id":4,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":[{"amount":"999.0"}],"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143974870,"type":"UPDATE"}

// DELETE
{"data":[{"id":"1","order_id":"10086","amount":"10087.0","create_time":"2020-03-02 05:12:49"}],"database":"test","es":1583143980000,"id":5,"isDdl":false,"mysqlType":{"id":"BIGINT","order_id":"VARCHAR(64)","amount":"DECIMAL(10,2)","create_time":"DATETIME"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"order_id":12,"amount":3,"create_time":93},"table":"order","ts":1583143981091,"type":"DELETE"}
複製代碼

可見Kafka的名爲testtopic已經寫入了對應的結構化binlog事件數據,能夠編寫消費者監聽Kafka對應的topic而後對獲取到的數據進行後續處理。

小結

這篇文章大部分篇幅用於介紹其餘中間件是怎麼部署的,這個問題側面說明了Canal自己部署並不複雜,它的配置文件屬性項比較多,可是實際上須要自定義和改動的配置項是比較少的,也就是說明了它的運維成本和學習成本並不高。後面會分析基於結構化binlog事件作ELT和持久化相關工做以及Canal的生產環境可用級別HA集羣的搭建。

參考地址

若是你們喜歡個人文章,能夠關注我的訂閱號。歡迎隨時留言、交流。若是想加入微信羣的話一塊兒討論的話,請加管理員簡棧文化-小助手(lastpass4u),他會拉大家進羣。

簡棧文化服務訂閱號
相關文章
相關標籤/搜索