騰訊大牛教你ClickHouse實時同步MySQL數據

| 做者   史鵬宙,CSIG雲與智慧產業事業羣研發工程師


ClickHouse做爲OLAP分析引擎已經被普遍使用,數據的導入導出是用戶面臨的第一個問題。因爲ClickHouse自己沒法很好地支持單條大批量的寫入,所以在實時同步數據方面須要藉助其餘服務協助。本文給出一種結合Canal+Kafka的方案,而且給出在多個MySQL實例分庫分表的場景下,如何將多張MySQL數據表寫入同一張ClickHouse表的方法,歡迎你們批評指正。java

 

首先來看看咱們的需求背景:mysql

 

1. 實時同步多個MySQL實例數據到ClickHouse,天天規模500G,記錄數目億級別,能夠接受分鐘級別的同步延遲;正則表達式

 

2. 某些數據庫表存在分庫分表的操做,用戶須要跨MySQL實例跨數據庫的表同步到ClickHouse的一張表中;spring

 

3. 現有的MySQL binlog開源組件(Canal),沒法作到多張源數據表到一張目的表的映射關係。sql

 

基本原理數據庫

 

1、使用JDBC方式同步

 

1. 使用Canal組件完成binlog的解析和數據同步;json

 

2. Canal-Server進程會假裝成MySQL的slave,使用MySQL的binlog同步協議完成數據同步;bootstrap

 

3. Canal-Adapter進程負責從canal-server獲取解析後的binlog,而且經過jdbc接口寫入到ClickHouse;緩存

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

優勢:網絡

 

1. Canal組件原生支持; 

 

缺點:

 

1. Canal-Adpater寫入時源表和目的表一一對應,靈活性不足;

 

2. 須要維護兩個Canal組件進程;

 

2、Kafka+ClickHouse物化視圖方式同步

 

1. Canal-Server完成binlog的解析,而且將解析後的json寫入Kafka;

 

2. Canal-Server能夠根據正則表達式過濾數據庫和表名,而且根據規則寫入Kafka的topic;

 

3. ClickHouse使用KafkaEngine和Materialized View完成消息消費,並寫入本地表;

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

優勢:

 

1. Kafka支持水平擴展,能夠根據數據規模調整partition數目;

 

2. Kafka引入後將寫入請求合併,防止ClickHouse生成大量的小文件,從而影響查詢性能;

 

3. Canal-Server支持規則過濾,能夠靈活配置上游的MySQL實例的數據庫名和表名,而且指明寫入的Kafka topic名稱;

 

缺點:

 

1. 須要維護Kafka和配置規則;

 

2. ClickHouse須要新建相關的視圖、Kafka Engine的外表等;

 

具體步驟

 

1、準備工做

 

1. 若是使用TencentDB,則在控制檯確認binlog_format爲ROW,無需多餘操做。

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

 若是是自建MySQL,則在客戶端中查詢變量:

 

>   show variables like '%binlog%';

+-----------------------------------------+----------------------+

| Variable_name                           | Value                |

+-----------------------------------------+----------------------+

| binlog_format                           | ROW                  |

+-----------------------------------------+----------------------+

 

> show variables like '%log_bin%';

+---------------------------------+--------------------------------------------+

| Variable_name                   | Value                                      |

+---------------------------------+--------------------------------------------+

| log_bin                         | ON                                         |

| log_bin_basename                |  /data/mysql_root/log/20146/mysql-bin        |

| log_bin_index                   |  /data/mysql_root/log/20146/mysql-bin.index |

+---------------------------------+--------------------------------------------+

 

2. 建立帳號canal,用於同步binlog

 

CREATE USER canal IDENTIFIED BY  'canal'; 

GRANT SELECT, REPLICATION SLAVE,  REPLICATION CLIENT ON *.* TO 'canal'@'%';

FLUSH PRIVILEGES;

 

2、Canal組件部署

 

前置條件:

 

Canal組件部署的機器須要跟ClickHouse服務和MySQL網絡互通;

 

須要在機器上部署java8,配置JAVA_HOME、PATH等環境變量;

 

基本概念:

 

watermark,size_16,text_QDUxQ1RP5Y2a5a6i,color_FFFFFF,t_100,g_se,x_10,y_10,shadow_90,type_ZmFuZ3poZW5naGVpdGk=

 

1. Canal-Server組件部署

 

Canal-Server的主要做用是訂閱binlog信息並解析和定義instance相關信息,建議每一個Canal-Server進程對應一個MySQL實例;

 

1)下載canal.deployer-1.1.4.tar.gz,解壓

 

2)修改配置文件conf/canal.properties,須要關注的配置以下:

 

...

# 端口相關信息,若是同一臺機器部署多個進程須要修改

canal.port = 11111

canal.metrics.pull.port = 11112

canal.admin.port = 11110

...

# 服務模式

canal.serverMode = tcp

...

# Kafka地址

canal.mq.servers = 172.21.48.11:9092

# 使用消息隊列時 這兩個值必須爲true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance列表,conf目錄下必須有同名的目錄

canal.destinations = example,example2

 

3)配置instance

 

能夠參照example新增新的instance,主要修改配置文件conf/${instance_name}/instance.properties文件。    

 

樣例1:  同步某個數據庫的以XX前綴開頭的表

訂閱 172.21.48.35的MySQL的testdb數據庫中的以tb_開頭的表的數據變動(例如tb_20200801 、 tb_20200802等),主要的步驟以下:

步驟1:建立example2實例:cddeployer/conf && cp -r example example2

步驟2:修改deployer/conf/example2/instance.properties文件

 

...

# 上游MySQL實例地址

canal.instance.master.address=172.21.48.35:3306

...

# 同步帳戶信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

# 過濾數據庫名稱和表名

canal.instance.filter.regex=testdb\\.tb_.*,

步驟3:在conf/canal.properties中修改 canal.destinations ,新增example2

 

樣例2:  同步多個數據庫的以XX前綴開頭的表,且輸出到Kafka

 

訂閱 172.21.48.35的MySQL的empdb_0數據庫的employees_20200801表,empdb_1數據庫的employees_20200802表,而且數據寫入Kafka;

 

步驟1:建立example2實例:cddeployer/conf && cp -r example example3

 

步驟2:修改deployer/conf/example3/instance.properties文件

 

...

# 上游MySQL實例地址

canal.instance.master.address=172.21.48.35:3306

...

# 同步帳戶信息

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

# 過濾數據庫名稱和表名

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

# Kafka的topic名稱和匹配的規則

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

 

# Kafka topic的分區數目(即partition數目)

canal.mq.partitionsNum=3

 

# 根據employees_開頭的表中的 emp_no字段來進行數據hash,分佈到不一樣的partition

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

 

步驟3:在Kafka中新建topic employees_topic,指定分區數目爲3

 

步驟4:在conf/canal.properties中修改 canal.destinations ,新增example3;修改服務模式爲kafka,配置kafka相關信息;

 

 

# 服務模式

canal.serverMode = kafka

...

# Kafka地址

canal.mq.servers = 172.21.48.11:9092

# 使用消息隊列時 這兩個值必須爲true

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

...

# instance列表,conf目錄下必須有同名的目錄

canal.destinations =  example,example2,example3

 

2. Canal-Adapter組件部署(只針對方案一)

 

Canal-Adapter的主要做用是經過JDBC接口寫入ClickHouse數據,能夠配置多個表的寫入;

 

1)下載canal.adapter-1.1.4.tar.gz,解壓;

 

2)在lib目錄下新增clickhouse驅動jar包及httpclient的jar包 httpcore-4.4.13.jar、httpclient-4.3.3.jar、clickhouse-jdbc-0.2.4.jar;

 

3)修改配置文件conf/application.yml文件,修改canalServerHost、srcDataSources、canalAdapters的配置;

 

server:

   port: 8081

spring:

   jackson:

     date-format: yyyy-MM-dd HH????????ss

     time-zone: GMT+8

     default-property-inclusion: non_null

 

canal.conf:

   mode: tcp

   canalServerHost: 127.0.0.1:11111   # canal-server的服務地址

   batchSize: 500

   syncBatchSize: 1000

   retries: 0

   timeout:

   accessKey:

  secretKey:

  #  MySQL的配置,修改用戶名密碼及制定數據庫

   srcDataSources:

     defaultDS:

       url: jdbc:mysql://172.21.48.35:3306

       username: root

       password: yourpasswordhere

   canalAdapters:

  -  instance: example

     groups:

     - groupId: g1

       outerAdapters:

       - name: logger

       - name: rdb

         key: mysql1

         # clickhouse的配置,修改用戶名密碼數據庫

         properties:

           jdbc.driverClassName: ru.yandex.clickhouse.ClickHouseDriver

           jdbc.url: jdbc:clickhouse://172.21.48.18:8123

           jdbc.username: default

           jdbc.password:

 

4)修改配置文件conf/rdb/mytest_user.yml文件

 

dataSourceKey: defaultDS

destination: example

groupId: g1

outerAdapterKey: mysql1

concurrent: true

dbMapping:

  database:  testdb

   mirrorDb: true

上述的配置文件中,因爲開啓了mirrorDb: true,目的端的ClickHouse必須有相同的數據庫名和表名。

 

樣例1:源數據庫與目標數據庫名字不一樣,源表名與目標表名不一樣

 

修改adapter的conf/rdb/mytest_user.yml配置文件,指定源數據庫和目標數據庫

 

dataSourceKey: defaultDS

destination: example

groupId: g1

outerAdapterKey: mysql1

concurrent: true

dbMapping:

   database: source_database_name

   table: source_table

   targetTable: destination_database_name.destination_table

   targetColumns:

     id:

     name:

  commitBatch:  3000 # 批量提交的大小

 

樣例2:多個源數據庫表寫入目的端的同一張表

 

在conf/rdb 目錄配置多個yml文件,分別指明不一樣的table名稱。


 

Kafka 服務配置

 

1、調整合理的producer參數

 

確認Canal-Server裏的canal.properties文件,重要參數見下表;

 

配置項

Kafka SDK配置項

配置說明

canal.mq.servers

bootstrap.servers

Kafka服務地址

canal.mq.retries

retries

producer在發送失敗的時候會重試次數,默認爲0

canal.mq.batchSize

batch.size

producer嘗試發送同一個partition的請求數據量,默認爲16K

canal.mq.maxRequestSize

max.request.size

producer請求的最大大小,默認爲1M

canal.mq.lingerMs

linger.ms

producer等待發送的延遲,默認爲100ms

canal.mq.bufferMemory

buffer.memory

producer使用的緩存消息的最大內存,默認爲30M

canal.mq.flatMessage

-

Canal-Server 將binlog解析結果轉爲json;下游爲ClickHouse Kafka Engine表時必須爲true

canal.mq.flatMessage.onlyData

-

Canal-Server 只發送binlog解析結果中的data部分;下游爲ClickHouse Kafka Engine表時必須爲true

canal.mq.acks

acks

producer但願leader返回的用於確認請求完成的確認數量. 可選值 all, -1, 0 1. 默認值爲all

 

2、新建相關的topic名稱

 

根據Canal-Server裏instance裏配置文件instance.properties,注意分區數目與canal.mq.partitionsNum 保持一致;

 

partition數目須要考慮如下因素:

 

1. 上游的MySQL的數據量。原則上數據寫入量越大,應該分配更多的partition數目;

 

2. 考慮下游ClickHouse的實例數目。topic的partition分區總數 最好 不大於 下游ClickHouse的總實例數目,保證每一個ClickHouse實例都能至少分配到一個partition;


 

ClickHouse服務配置

 

根據上游MySQL實例的表的schema新建數據表;

 

引入Kafka時須要額外新建Engine=Kafka的外表以及相關的物化視圖表;

 

建議:

 

1. 爲每一個外表新增不一樣的 kafka_group_name,防止相互影響;

 

2. 設置kafka_skip_broken_messages 參數爲合理值,遇到沒法解析數據會跳過;

 

3. 設置合理的kafka_num_consumers值,最好保證全部ClickHouse實例該值的總和大於 topic的partition數目;

 

新建相關的分佈式查詢表;


 

服務啓動

 

啓動相關的Canal組件進程;

 

1. canal-server:  sh bin/startup.sh

 

2. canal-adapter: sh bin/startup.sh

 

在MySQL中插入數據,觀察日誌是否能夠正常運行;

 

若是使用Kafka,能夠經過kafka-console-consumer.sh腳本觀察binlog數據解析;

 

觀察ClickHouse數據表中是否正常寫入數據;


 

實際案例

 

需求:實時同步MySQL實例的empdb_0.employees_20200801表和empdb_1.employees_20200802數據表

 

方案:使用方案二

 

環境及參數:

 

MySQL地址

172.21.48.35:3306

CKafka地址

172.21.48.11:9092

Canal instance名稱

employees

Kafka目的topic

employees_topic 

 

1.在MySQL新建相關表

 

# MySQL表的建表語句

CREATE DATABASE `empdb_0`;

CREATE DATABASE `empdb_1`;

 

CREATE TABLE  `empdb_0`.`employees_20200801` (

   `emp_no` int(11) NOT NULL,

   `birth_date` date NOT NULL,

   `first_name` varchar(14) NOT NULL,

   `last_name` varchar(16) NOT NULL,

   `gender` enum('M','F') NOT NULL,

   `hire_date` date NOT NULL,

   PRIMARY KEY (`emp_no`)

);

 

CREATE TABLE  `empdb_1`.`employees_20200802` (

   `emp_no` int(11) NOT NULL,

   `birth_date` date NOT NULL,

   `first_name` varchar(14) NOT NULL,

   `last_name` varchar(16) NOT NULL,

   `gender` enum('M','F') NOT NULL,

   `hire_date` date NOT NULL,

   PRIMARY KEY (`emp_no`)

);

 

2. Canal-Server配置

 

步驟1. 修改conf/canal.properties文件

 

canal.serverMode = kafka

...

canal.destinations = example,employees

...

canal.mq.servers = 172.21.48.11:9092

canal.mq.retries = 0

canal.mq.batchSize = 16384

canal.mq.maxRequestSize = 1048576

canal.mq.lingerMs = 100

canal.mq.bufferMemory = 33554432

canal.mq.canalBatchSize = 50

canal.mq.canalGetTimeout = 100

canal.mq.flatMessage = true

canal.mq.flatMessage.onlyData = true

canal.mq.compressionType = none

canal.mq.acks = all

canal.mq.producerGroup = cdbproducer

canal.mq.accessChannel = local

...

 

步驟2. 新增employees實例,修改employees/instances.properties配置

 

...

canal.instance.master.address=172.21.48.35:3306

...

canal.instance.dbUsername=canal

canal.instance.dbPassword=canal

...

canal.instance.filter.regex=empdb_.*\\.employees_.*

...

canal.mq.dynamicTopic=employees_topic:empdb_.*\\.employees_.*

canal.mq.partition=0

canal.mq.partitionsNum=3

canal.mq.partitionHash=empdb_.*\\.employees_.*:emp_no

 

3. Kafka配置

 

4. 新增topic employees_topic,分區數爲3

 

5. ClickHouse建表

 

CREATE DATABASE testckdb ON CLUSTER  default_cluster;

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees ON CLUSTER default_cluster (

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

   `hire_date` String

) ENGINE=MergeTree() ORDER BY (emp_no)

SETTINGS index_granularity = 8192;

 

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees_stream ON CLUSTER default_cluster (

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

   `hire_date` String

) ENGINE = Kafka()

SETTINGS

   kafka_broker_list = '172.21.48.11:9092',

   kafka_topic_list = 'employees_topic',

   kafka_group_name = 'employees_group',

   kafka_format = 'JSONEachRow',

   kafka_skip_broken_messages = 1024,

  kafka_num_consumers  = 1;

 

 

CREATE MATERIALIZED VIEW IF NOT EXISTS  testckdb.ck_employees_mv ON CLUSTER default_cluster TO testckdb.ck_employees(

   `emp_no` Int32,

   `birth_date` String,

   `first_name` String,

   `last_name` String,

   `gender` String,

  `hire_date`  String

) AS SELECT

   `emp_no`,

   `birth_date`,

   `first_name`,

   `last_name`,

   `gender`,

   `hire_date`

FROM

   testckdb.ck_employees_stream;

 

CREATE TABLE IF NOT EXISTS  testckdb.ck_employees_dis ON CLUSTER default_cluster AS testckdb.ck_employees  

ENGINE=Distributed(default_cluster,  testckdb, ck_employees);

 

6. 啓動Canal-Server服務

 

MySQL實例上游插入數據,觀察數據是否在Canal-Server解析正常,是否在ClickHouse中完成同步。

相關文章
相關標籤/搜索