需求是要可以實時監聽數據庫表的變更,動態獲取。採用的方法是canal+kafka動態隊列創建websocket。以前安裝的canal是在apache集羣上的,此次須要安裝在CDH集羣上面,遇到很多問題。這裏把過程分享給你們,但願可以少走彎路。java
1、canal安裝及配置mysql
先說canal的安裝。canal有不少版本,我選擇的是1.1.4版本。這個版本有webUI界面,算是極大的優化,配置起來不用那麼麻煩。canal的下載地址在這裏,能夠本身選擇合適的版本。git
canal版本選擇下載github
選擇1.1.4以後,有簡單的配置說明和原理說明。上面雖然說mysql支持的版本爲5.5及如下,不過個人mysql是5.7版本,也仍是能夠獲取到數據的,這個沒必要擔憂。首先開啓mysql的binlog功能,簡單的配置如下就好,不須要太過複雜。web
1. mysql部分spring
假如mysql是默認安裝位置的話,修改C:\ProgramData\MySQL\MySQL Server 5.7下的my.ini文件。添加如下內容:sql
# Binary Logging. log-bin=mysql-bin #binlog日誌格式 binlog-format=ROW expire_logs_days = 30
binlog日誌格式要選擇row,通常狀況下 statement 模式也不會有問題,可以正常獲取到數據。可是一旦mysql表的變更比較複雜,statement 生成的日誌是極可能有bug,不能正確複製的。選擇row模式日誌的數據量會大一些,可是問題也會少一些。數據庫
下面兩個可配可不配,官方建議是強烈建議配置,假如只是嘗試canal的話那就無所謂了,正式使用環境仍是配置比較好。apache
binlog-do-db = epg #配置須要同步的庫 binlog-ignore-db = mysql #配置不須要同步的庫
配置完以後建立一個專門用來讀取binlog日誌的mysql用戶,並賦予相關的權限。bootstrap
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; GRANT SELECT,REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO canal@localhost ; FLUSH PRIVILEGES;
在建立用戶並賦予權限的時候可能會遇到各類各樣的問題,好比密碼太簡單、大寫不識別之類的。大寫不識別就換成小寫,密碼太簡單就修改密碼級別。
先查看mysql密碼策略:
SHOW VARIABLES LIKE 'validate_password%';
再修改密碼等級:
set global validate_password.policy=LOW;
這個列名有的是validate_password_policy有的是validate_password.policy,修改的時候請對應本身mysql數據庫中的字段名。
以上mysql的準備工做就作完了,如今開始配置canal。
2.canal部分
1)canal想要web界面的話要下載兩個壓縮包,一個是admin一個是deployer。一個是集羣管理包一個是單機部署包,假如不須要集羣的話那麼下載deployer就能夠了。在本身喜歡的位置使用wget下載。
wget:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz wget:https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
下載完以後建立文件夾準備將解壓的文件放入其中,我喜歡直接選擇壓縮包名字做爲解壓文件夾。
mkdir canal.admin-1.1.4 mkdir canal.deployer-1.1.4 tar -zxvf canal.deployer-1.1.4.tar.gz -C canal.deployer-1.1.4 tar -zxvf canal.admin-1.1.4.tar.gz -C canal.admin-1.1.4
解壓指定文件夾的時候-C要大寫。
2)先配置canal.admin。解壓以後有4個文件夾,進入conf文件夾
修改conf文件夾中的application.yml
server: port: 8089 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 spring.datasource: #canal_manager數據庫所在的mysql數據庫ip地址 address: 192.168.49.104:3306 database: canal_manager username: canal password: canal driver-class-name: com.mysql.jdbc.Driver url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false hikari: maximum-pool-size: 30 minimum-idle: 1 canal: adminUser: admin #密碼默認爲admin,實際在web界面時要求密碼大於等於6位 adminPasswd: 123456
要改的地方很少配置一下數據庫和canalweb的用戶密碼就能夠了。密碼要大於等於6位,不然沒法登陸。
在conf文件夾下面還有canal_manager的建立語句。 cat canal_manager.sql,再把sql文件的內容複製下來,在mysql中執行,建立canal_manager數據庫。
3)到這裏canal_admin的配置接就完成了,執行bin目錄下的start.sh文件,啓動canal_admin。
sh ../bin/startup.sh
在上面的配置文件中端口配置爲8089,因此頁面的訪問就是你所配置canal-admin主機的ip:8089,假如沒有問題的話就能看到canal-admin的登陸界面。
輸入用戶名密碼,沒有問題的話就可以進入管理界面。簡單說說我在上面兩個步驟中遇到的比較有價值的問題,遇到了問題能夠從log文件夾下面的日誌中讀取。
1.JDBC鏈接MySQL報錯Unknown system variable 'query_cache_size'。這是由於mysql的鏈接驅動包和個人mysql版本對應不上。將mysql-connecter-java換成高版本的就能夠看到webUi的界面了。mysql-connecter-java能夠到mavenrepository中下載。下載地址
成功登陸以後進入管理界面,點擊集羣管理->新建集羣,輸入集羣名稱和集羣的zookeeper地址,多個地址用,分隔。
建立集羣的點擊肯定的時候可能會報錯modified_time不能爲空,這應該是webUI的bug了...因爲項目源碼已經打包,因此不如直接去改數據庫,把canal_manager中的modified_time都修改成可爲空,避免相似的錯誤出現。
創建集羣以後就開始配置集羣信息,點擊右邊的操做->主配置,開始配置集羣信息(即conf/canal-template.properties文件)。
點擊載入模板,而後配置本身的信息,配置完後點擊保存。
################################################# ######### common argument ############# ################################################# # tcp bind ip canal.ip = # register ip to zookeeper canal.register.ip = canal.port = 11111 canal.metrics.pull.port = 11112 # canal instance user/passwd #canal_admin登陸的用戶名和密碼 canal.user = canal canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458 # canal admin config # canal.admin的管理地址 canal.admin.manager = 192.168.49.104:8089 canal.admin.port = 11110 # canal登陸的用戶名和密碼,密碼在canal_manager的canal_user表中 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # 配置zookpeer地址 canal.zkServers = 192.168.49.104:2181,192.168.49.105:2181,192.168.49.106:2181 # flush data to zk canal.zookeeper.flush.period = 1000 canal.withoutNetty = false # tcp, kafka, RocketMQ # 配置由tcp接收仍是kafka接收或者RocketMQ接收消息 canal.serverMode = kafka # flush meta cursor/parse position to file canal.file.data.dir = ${canal.conf.dir} canal.file.flush.period = 1000 ## memory store RingBuffer size, should be Math.pow(2,n) canal.instance.memory.buffer.size = 16384 ## memory store RingBuffer used memory unit size , default 1kb canal.instance.memory.buffer.memunit = 1024 ## meory store gets mode used MEMSIZE or ITEMSIZE canal.instance.memory.batch.mode = MEMSIZE canal.instance.memory.rawEntry = true ## detecing config canal.instance.detecting.enable = false #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now() canal.instance.detecting.sql = select 1 canal.instance.detecting.interval.time = 3 canal.instance.detecting.retry.threshold = 3 canal.instance.detecting.heartbeatHaEnable = false # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery canal.instance.transaction.size = 1024 # mysql fallback connected to new master should fallback times canal.instance.fallbackIntervalInSeconds = 60 # network config canal.instance.network.receiveBufferSize = 16384 canal.instance.network.sendBufferSize = 16384 canal.instance.network.soTimeout = 30 # binlog filter config canal.instance.filter.druid.ddl = true canal.instance.filter.query.dcl = false canal.instance.filter.query.dml = false canal.instance.filter.query.ddl = false canal.instance.filter.table.error = false canal.instance.filter.rows = false canal.instance.filter.transaction.entry = false # binlog format/image check canal.instance.binlog.format = ROW,STATEMENT,MIXED canal.instance.binlog.image = FULL,MINIMAL,NOBLOB # binlog ddl isolation canal.instance.get.ddl.isolation = false # parallel parser config canal.instance.parser.parallel = true ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors() #canal.instance.parser.parallelThreadSize = 16 ## disruptor ringbuffer size, must be power of 2 canal.instance.parser.parallelBufferSize = 256 # table meta tsdb info canal.instance.tsdb.enable = true canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:} canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL; #配置canal訪問mysql的用戶名和密碼 canal.instance.tsdb.dbUsername = canal canal.instance.tsdb.dbPassword = canal # dump snapshot interval, default 24 hour canal.instance.tsdb.snapshot.interval = 24 # purge snapshot expire , default 360 hour(15 days) canal.instance.tsdb.snapshot.expire = 360 # aliyun ak/sk , support rds/mq canal.aliyun.accessKey = canal.aliyun.secretKey = ################################################# ######### destinations ############# ################################################# #定義canal_deployer的instance列表,不一樣instance用,隔開 canal.destinations = nyhx_student # conf root dir canal.conf.dir = ../conf # auto scan instance dir add/remove and start/stop instance #自動掃描canal_deployer的conf目錄下面instance,因此上面的canal.destinations配不配、配置對不對都沒關係,可#以空在那裏 canal.auto.scan = true canal.auto.scan.interval = 5 canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml canal.instance.global.mode = manager canal.instance.global.lazy = false canal.instance.global.manager.address = ${canal.admin.manager} #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml canal.instance.global.spring.xml = classpath:spring/file-instance.xml #canal.instance.global.spring.xml = classpath:spring/default-instance.xml ################################################## ######### MQ ############# ################################################## #配置kafka地址 canal.mq.servers = 192.168.49.104:9092,192.168.49.105:9092,192.168.49.106:9092 canal.mq.retries = 0 canal.mq.batchSize = 16384 #canal發送信息的最大大小,要修改成268435456(8M) canal.mq.maxRequestSize = 268435456 canal.mq.lingerMs = 100 canal.mq.bufferMemory = 33554432 canal.mq.canalBatchSize = 50 canal.mq.canalGetTimeout = 100 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all #canal.mq.properties. = canal.mq.producerGroup = test # Set this value to "cloud", if you want open message trace feature in aliyun. canal.mq.accessChannel = local # aliyun mq namespace #canal.mq.namespace = ################################################## ######### Kafka Kerberos Info ############# ################################################## canal.mq.kafka.kerberos.enable = false canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf" canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"
這裏要說明一點,因爲canal1.1.4和以前的版本相比,一樣一條數據庫改動,發送的信息要多不少,因此默認的1M是不夠用的,須要canal.mq.maxRequestSize修改成本身想要的大小。除此以外,kafka也有隊列消息體最大值的限定,須要將 message.max.bytes 配置增大, fetch.max.bytes 也增大。
kafka的配置我是經過CDH Cloudera-Manager的webUI界面修改的,由於我沒找到它把kafka安裝在哪了...
吐槽一下,bin目錄下面stop.sh並不能中止canal_admin,在沒有啓動的狀況下也能登陸8089端口訪問web頁面。
4)以上canal_admin就安裝完了,下面是canal_deployer的安裝。回到canal_deployer1.1.4.tar.gz解壓後的位置,一樣有四個文件夾。
進入conf文件夾,將canal_local.properties的內容複製到canal.properties文件中,配置以下:
# register ip canal.register.ip = 192.168.49.106 # canal admin config canal.admin.manager = 192.168.49.104:8089 canal.admin.port = 11110 canal.admin.user = admin canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9 # admin auto register canal.admin.register.auto = true canal.admin.register.cluster = canal_cluster
修改完後將canal.deployer-1.1.4發送給其它從機,發送以後要修改canal.properties中的canal.register.ip爲各自主機ip。
cd .. scp canal.deployer-1.1.4 root@192.168.105:/usr/software
修改完ip以後,分別啓動canal.deployer,啓動方法和canal.admin相似。
sh bin/startup.sh
都修改完後,刷新web頁面,在server管理頁面中可以看到所配置的實例。
5)配置instance
在web界面進入instance管理,點擊新建instance。輸入instance名稱,選擇集羣,載入模板,開始修改instance。
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 #由於canal是假裝成mysql的一個slave,因此slaveId不要和mysql 的slaveId以及其它instance的slaveId重複 canal.instance.mysql.slaveId=654319 # enable gtid use true/false canal.instance.gtidon=false # position info #監聽的mysql地址 canal.instance.master.address=192.168.49.214:3306 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password #查看mysql binlog日誌所用的用戶密碼,因爲我建立的canal用戶所在的數據庫和監聽的數據庫不是同一個,這裏求方便用#的root用戶 canal.instance.dbUsername=root canal.instance.dbPassword=123456 canal.instance.connectionCharset = UTF-8 #查看的數據庫 canal.instance.defaultDatabaseName=nyhx # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex #canal.instance.filter.regex=.*\\..* #配置監聽nyhx數據庫下的全部表 canal.instance.filter.regex=nyhx\..* # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config #默認topic,在canal.mq.dynamicTopic沒有匹配到的表變更都會到這個topic下面來 canal.mq.topic=other # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* #配置動態topic,指定哪張表進入哪一個分區 canal.mq.dynamicTopic=student:nyhx.student;magicStone:nyhx.magicstone #canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #配置主鍵id canal.mq.partitionHash=nyhx.student:uid,nyhx.magicstone:magic_id #################################################
在開始監聽以前,在kafka中須要有對應的topic才行。kafka相關指令:
#查看全部topic kafka-topics --list --zookeeper 192.168.49.104:2181 #建立topic kafka-topics --create --zookeeper 192.168.49.104:2181 --topic other --replication-factor 3 --partitions 1 #監聽topic消息 kafka-console-consumer --bootstrap-server 192.168.49.104:9092 --topic other
當全部topic都建立好以後,在web頁面將instance設置爲啓動,集羣會自動爲它分配一臺服務器。以後監聽topic,修改監聽表,看看是否有數據產生。像這樣:
假若有的話,那麼同步實時數據的任務就完成啦!沒有的話,能夠看看instance的日誌。有web界面也不用一臺虛擬機一臺虛擬機去翻了。
通常來講都是一些配置問題,看着日誌判斷問題在哪裏產生,再修改過來。這裏要注意canal發送的數據量大小和kafka接受的數據量大小,我就在這裏卡了很久,監聽kafka topic雖然不報錯,可是也沒有數據。回過頭來看整個配置過程並不困難,只要理解配置內容的格式和做用很快就能配好。
最後再附上看過的參考文檔:
canal-admin1.1.14界面化安裝配置canal集羣詳解