canal 1.1.4集羣安裝並監聽mysql表的變更

        需求是要可以實時監聽數據庫表的變更,動態獲取。採用的方法是canal+kafka動態隊列創建websocket。以前安裝的canal是在apache集羣上的,此次須要安裝在CDH集羣上面,遇到很多問題。這裏把過程分享給你們,但願可以少走彎路。java

1、canal安裝及配置mysql

         先說canal的安裝。canal有不少版本,我選擇的是1.1.4版本。這個版本有webUI界面,算是極大的優化,配置起來不用那麼麻煩。canal的下載地址在這裏,能夠本身選擇合適的版本。git

         canal版本選擇下載github

        選擇1.1.4以後,有簡單的配置說明和原理說明。上面雖然說mysql支持的版本爲5.5及如下,不過個人mysql是5.7版本,也仍是能夠獲取到數據的,這個沒必要擔憂。首先開啓mysql的binlog功能,簡單的配置如下就好,不須要太過複雜。web

       1. mysql部分spring

        假如mysql是默認安裝位置的話,修改C:\ProgramData\MySQL\MySQL Server 5.7下的my.ini文件。添加如下內容:sql

# Binary Logging.
log-bin=mysql-bin
#binlog日誌格式
binlog-format=ROW
expire_logs_days = 30

    binlog日誌格式要選擇row,通常狀況下 statement  模式也不會有問題,可以正常獲取到數據。可是一旦mysql表的變更比較複雜,statement  生成的日誌是極可能有bug,不能正確複製的。選擇row模式日誌的數據量會大一些,可是問題也會少一些。數據庫

    下面兩個可配可不配,官方建議是強烈建議配置,假如只是嘗試canal的話那就無所謂了,正式使用環境仍是配置比較好。apache

binlog-do-db = epg #配置須要同步的庫

binlog-ignore-db = mysql #配置不須要同步的庫

    配置完以後建立一個專門用來讀取binlog日誌的mysql用戶,並賦予相關的權限。bootstrap

CREATE USER canal IDENTIFIED BY 'canal';      

GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';

GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@localhost ;    

FLUSH PRIVILEGES;

在建立用戶並賦予權限的時候可能會遇到各類各樣的問題,好比密碼太簡單、大寫不識別之類的。大寫不識別就換成小寫,密碼太簡單就修改密碼級別。

    先查看mysql密碼策略:

SHOW VARIABLES LIKE 'validate_password%';

    再修改密碼等級:

set global validate_password.policy=LOW;

    這個列名有的是validate_password_policy有的是validate_password.policy,修改的時候請對應本身mysql數據庫中的字段名。

    以上mysql的準備工做就作完了,如今開始配置canal。

    2.canal部分

    1)canal想要web界面的話要下載兩個壓縮包,一個是admin一個是deployer。一個是集羣管理包一個是單機部署包,假如不須要集羣的話那麼下載deployer就能夠了。在本身喜歡的位置使用wget下載。

wget:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
wget:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

    下載完以後建立文件夾準備將解壓的文件放入其中,我喜歡直接選擇壓縮包名字做爲解壓文件夾。

mkdir canal.admin-1.1.4
mkdir canal.deployer-1.1.4
tar -zxvf canal.deployer-1.1.4.tar.gz -C canal.deployer-1.1.4
tar -zxvf canal.admin-1.1.4.tar.gz -C canal.admin-1.1.4

    解壓指定文件夾的時候-C要大寫。

    2)先配置canal.admin。解壓以後有4個文件夾,進入conf文件夾

    修改conf文件夾中的application.yml

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8

spring.datasource:
  #canal_manager數據庫所在的mysql數據庫ip地址
  address: 192.168.49.104:3306
  database: canal_manager
  username: canal
  password: canal
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1

canal:
  adminUser: admin
  #密碼默認爲admin,實際在web界面時要求密碼大於等於6位
  adminPasswd: 123456

    要改的地方很少配置一下數據庫和canalweb的用戶密碼就能夠了。密碼要大於等於6位,不然沒法登陸。

    在conf文件夾下面還有canal_manager的建立語句。 cat canal_manager.sql,再把sql文件的內容複製下來,在mysql中執行,建立canal_manager數據庫。

    3)到這裏canal_admin的配置接就完成了,執行bin目錄下的start.sh文件,啓動canal_admin。

sh ../bin/startup.sh

在上面的配置文件中端口配置爲8089,因此頁面的訪問就是你所配置canal-admin主機的ip:8089,假如沒有問題的話就能看到canal-admin的登陸界面。

輸入用戶名密碼,沒有問題的話就可以進入管理界面。簡單說說我在上面兩個步驟中遇到的比較有價值的問題,遇到了問題能夠從log文件夾下面的日誌中讀取。

1.JDBC鏈接MySQL報錯Unknown system variable 'query_cache_size'。這是由於mysql的鏈接驅動包和個人mysql版本對應不上。將mysql-connecter-java換成高版本的就能夠看到webUi的界面了。mysql-connecter-java能夠到mavenrepository中下載。下載地址

成功登陸以後進入管理界面,點擊集羣管理->新建集羣,輸入集羣名稱和集羣的zookeeper地址,多個地址用,分隔。

建立集羣的點擊肯定的時候可能會報錯modified_time不能爲空,這應該是webUI的bug了...因爲項目源碼已經打包,因此不如直接去改數據庫,把canal_manager中的modified_time都修改成可爲空,避免相似的錯誤出現。

創建集羣以後就開始配置集羣信息,點擊右邊的操做->主配置,開始配置集羣信息(即conf/canal-template.properties文件)。

點擊載入模板,而後配置本身的信息,配置完後點擊保存。

#################################################
######### 		common argument		#############
#################################################
# tcp bind ip
canal.ip =
# register ip to zookeeper
canal.register.ip =
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
#canal_admin登陸的用戶名和密碼
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
# canal.admin的管理地址
canal.admin.manager = 192.168.49.104:8089
canal.admin.port = 11110
# canal登陸的用戶名和密碼,密碼在canal_manager的canal_user表中
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

# 配置zookpeer地址
canal.zkServers = 192.168.49.104:2181,192.168.49.105:2181,192.168.49.106:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
# 配置由tcp接收仍是kafka接收或者RocketMQ接收消息
canal.serverMode = kafka
# flush meta cursor/parse position to file
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
## memory store RingBuffer size, should be Math.pow(2,n)
canal.instance.memory.buffer.size = 16384
## memory store RingBuffer used memory unit size , default 1kb
canal.instance.memory.buffer.memunit = 1024 
## meory store gets mode used MEMSIZE or ITEMSIZE
canal.instance.memory.batch.mode = MEMSIZE
canal.instance.memory.rawEntry = true

## detecing config
canal.instance.detecting.enable = false
#canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
canal.instance.detecting.sql = select 1
canal.instance.detecting.interval.time = 3
canal.instance.detecting.retry.threshold = 3
canal.instance.detecting.heartbeatHaEnable = false

# support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
canal.instance.transaction.size =  1024
# mysql fallback connected to new master should fallback times
canal.instance.fallbackIntervalInSeconds = 60

# network config
canal.instance.network.receiveBufferSize = 16384
canal.instance.network.sendBufferSize = 16384
canal.instance.network.soTimeout = 30

# binlog filter config
canal.instance.filter.druid.ddl = true
canal.instance.filter.query.dcl = false
canal.instance.filter.query.dml = false
canal.instance.filter.query.ddl = false
canal.instance.filter.table.error = false
canal.instance.filter.rows = false
canal.instance.filter.transaction.entry = false

# binlog format/image check
canal.instance.binlog.format = ROW,STATEMENT,MIXED 
canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

# binlog ddl isolation
canal.instance.get.ddl.isolation = false

# parallel parser config
canal.instance.parser.parallel = true
## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
#canal.instance.parser.parallelThreadSize = 16
## disruptor ringbuffer size, must be power of 2
canal.instance.parser.parallelBufferSize = 256

# table meta tsdb info
canal.instance.tsdb.enable = true
canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
#配置canal訪問mysql的用戶名和密碼
canal.instance.tsdb.dbUsername = canal
canal.instance.tsdb.dbPassword = canal
# dump snapshot interval, default 24 hour
canal.instance.tsdb.snapshot.interval = 24
# purge snapshot expire , default 360 hour(15 days)
canal.instance.tsdb.snapshot.expire = 360

# aliyun ak/sk , support rds/mq
canal.aliyun.accessKey =
canal.aliyun.secretKey =

#################################################
######### 		destinations		#############
#################################################
#定義canal_deployer的instance列表,不一樣instance用,隔開
canal.destinations = nyhx_student
# conf root dir
canal.conf.dir = ../conf
# auto scan instance dir add/remove and start/stop instance
#自動掃描canal_deployer的conf目錄下面instance,因此上面的canal.destinations配不配、配置對不對都沒關係,可#以空在那裏
canal.auto.scan = true
canal.auto.scan.interval = 5

canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
#canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

canal.instance.global.mode = manager
canal.instance.global.lazy = false
canal.instance.global.manager.address = ${canal.admin.manager}
#canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
canal.instance.global.spring.xml = classpath:spring/file-instance.xml
#canal.instance.global.spring.xml = classpath:spring/default-instance.xml

##################################################
######### 		     MQ 		     #############
##################################################
#配置kafka地址
canal.mq.servers = 192.168.49.104:9092,192.168.49.105:9092,192.168.49.106:9092
canal.mq.retries = 0
canal.mq.batchSize = 16384
#canal發送信息的最大大小,要修改成268435456(8M)
canal.mq.maxRequestSize = 268435456
canal.mq.lingerMs = 100
canal.mq.bufferMemory = 33554432
canal.mq.canalBatchSize = 50
canal.mq.canalGetTimeout = 100
canal.mq.flatMessage = true
canal.mq.compressionType = none
canal.mq.acks = all
#canal.mq.properties. =
canal.mq.producerGroup = test
# Set this value to "cloud", if you want open message trace feature in aliyun.
canal.mq.accessChannel = local
# aliyun mq namespace
#canal.mq.namespace =

##################################################
#########     Kafka Kerberos Info    #############
##################################################
canal.mq.kafka.kerberos.enable = false
canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"
canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

這裏要說明一點,因爲canal1.1.4和以前的版本相比,一樣一條數據庫改動,發送的信息要多不少,因此默認的1M是不夠用的,須要canal.mq.maxRequestSize修改成本身想要的大小。除此以外,kafka也有隊列消息體最大值的限定,須要將 message.max.bytes 配置增大, fetch.max.bytes 也增大。

kafka的配置我是經過CDH Cloudera-Manager的webUI界面修改的,由於我沒找到它把kafka安裝在哪了...

吐槽一下,bin目錄下面stop.sh並不能中止canal_admin,在沒有啓動的狀況下也能登陸8089端口訪問web頁面。

4)以上canal_admin就安裝完了,下面是canal_deployer的安裝。回到canal_deployer1.1.4.tar.gz解壓後的位置,一樣有四個文件夾。

進入conf文件夾,將canal_local.properties的內容複製到canal.properties文件中,配置以下:

# register ip
canal.register.ip = 192.168.49.106

# canal admin config
canal.admin.manager = 192.168.49.104:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = canal_cluster

修改完後將canal.deployer-1.1.4發送給其它從機,發送以後要修改canal.properties中的canal.register.ip爲各自主機ip。

cd ..
scp canal.deployer-1.1.4 root@192.168.105:/usr/software

修改完ip以後,分別啓動canal.deployer,啓動方法和canal.admin相似。

sh bin/startup.sh

都修改完後,刷新web頁面,在server管理頁面中可以看到所配置的實例。

5)配置instance

在web界面進入instance管理,點擊新建instance。輸入instance名稱,選擇集羣,載入模板,開始修改instance。

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0
#由於canal是假裝成mysql的一個slave,因此slaveId不要和mysql 的slaveId以及其它instance的slaveId重複
canal.instance.mysql.slaveId=654319

# enable gtid use true/false
canal.instance.gtidon=false

# position info
#監聽的mysql地址
canal.instance.master.address=192.168.49.214:3306
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
#查看mysql binlog日誌所用的用戶密碼,因爲我建立的canal用戶所在的數據庫和監聽的數據庫不是同一個,這裏求方便用#的root用戶
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
#查看的數據庫
canal.instance.defaultDatabaseName=nyhx
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
#canal.instance.filter.regex=.*\\..*
#配置監聽nyhx數據庫下的全部表
canal.instance.filter.regex=nyhx\..*
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
#默認topic,在canal.mq.dynamicTopic沒有匹配到的表變更都會到這個topic下面來
canal.mq.topic=other
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
#配置動態topic,指定哪張表進入哪一個分區
canal.mq.dynamicTopic=student:nyhx.student;magicStone:nyhx.magicstone
#canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#配置主鍵id
canal.mq.partitionHash=nyhx.student:uid,nyhx.magicstone:magic_id
#################################################

在開始監聽以前,在kafka中須要有對應的topic才行。kafka相關指令:

#查看全部topic
kafka-topics --list --zookeeper 192.168.49.104:2181
#建立topic
kafka-topics --create --zookeeper 192.168.49.104:2181 --topic other --replication-factor 3 --partitions 1
#監聽topic消息
kafka-console-consumer --bootstrap-server 192.168.49.104:9092 --topic other

當全部topic都建立好以後,在web頁面將instance設置爲啓動,集羣會自動爲它分配一臺服務器。以後監聽topic,修改監聽表,看看是否有數據產生。像這樣:

假若有的話,那麼同步實時數據的任務就完成啦!沒有的話,能夠看看instance的日誌。有web界面也不用一臺虛擬機一臺虛擬機去翻了。

通常來講都是一些配置問題,看着日誌判斷問題在哪裏產生,再修改過來。這裏要注意canal發送的數據量大小和kafka接受的數據量大小,我就在這裏卡了很久,監聽kafka topic雖然不報錯,可是也沒有數據。回過頭來看整個配置過程並不困難,只要理解配置內容的格式和做用很快就能配好。

最後再附上看過的參考文檔:

canal-admin1.1.14界面化安裝配置canal集羣詳解

如何使用Canal同步MySQL的Binlog到Kafka

CDC系列(一)、Canal 集羣部署及使用(帶WebUI)

Canal簡介及配置說明

canal系列—配置文件介紹

Kafka消息體大小設置的一些細節

記錄一次canal發送大請求至kafka失敗

相關文章
相關標籤/搜索