canal https://github.com/alibaba/canal/wiki/QuickStartjava
canal-adapter 安裝 https://github.com/alibaba/canal/wiki/ClientAdaptermysql
ElasticSearch適配器 https://github.com/alibaba/canal/wiki/Sync-ESgit
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gzgithub
對於自建 MySQL , 須要先開啓 Binlog 寫入功能,配置 binlog-format 爲 ROW 模式,my.cnf 中配置以下spring
[mysqld] log-bin=mysql-bin # 開啓 binlog binlog-format=ROW # 選擇 ROW 模式 server_id=1 # 配置 MySQL replaction 須要定義,不要和 canal 的 slaveId 重複
受權 canal 連接 MySQL 帳號具備做爲 MySQL slave 的權限, 若是已有帳戶可直接 grantsql
CREATE USER canal IDENTIFIED BY 'canal'; GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%'; -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ; FLUSH PRIVILEGES;
如下配置根據pf部門提供進行修改數據庫
解壓縮json
mkdir /yourpath/soft/canal.deployer-1.1.4 tar zxvf canal.deployer-1.1.4.tar.gz -C /yourpath/soft/canal.deployer-1.1.4
配置修改,複製 example 文件夾新爲 basic 文件夾 (basic 這個名稱與後面adapter配置對應)數組
cd /yourpath/soft/canal.deployer-1.1.4 cp -r example basic vi conf/basic/instance.properties
conf/basic/instance.properties 修改(數據庫,鏈接數據庫帳戶密碼)app
canal.instance.master.address=172.23.201.55:3313 //數據庫地址端口 canal.instance.dbUsername=canal // 數據庫帳戶 canal.instance.dbPassword=canal // 數據庫密碼 canal.instance.filter.regex=pf_basic\\..* // 匹配 pf_basic庫
修改後
################################################# ## mysql serverId , v1.0.26+ will autoGen # canal.instance.mysql.slaveId=0 # enable gtid use true/false canal.instance.gtidon=false # position info canal.instance.master.address=172.23.201.55:3313 //數據庫地址端口 canal.instance.master.journal.name= canal.instance.master.position= canal.instance.master.timestamp= canal.instance.master.gtid= # rds oss binlog canal.instance.rds.accesskey= canal.instance.rds.secretkey= canal.instance.rds.instanceId= # table meta tsdb info canal.instance.tsdb.enable=true #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb #canal.instance.tsdb.dbUsername=canal #canal.instance.tsdb.dbPassword=canal #canal.instance.standby.address = #canal.instance.standby.journal.name = #canal.instance.standby.position = #canal.instance.standby.timestamp = #canal.instance.standby.gtid= # username/password canal.instance.dbUsername=canal // 數據庫帳戶 canal.instance.dbPassword=canal // 數據庫密碼 canal.instance.connectionCharset = UTF-8 # enable druid Decrypt database password canal.instance.enableDruid=false #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ== # table regex canal.instance.filter.regex=pf_basic\\..* // 匹配 pf_basic庫 # table black regex canal.instance.filter.black.regex= # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2) #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch # mq config canal.mq.topic=example # dynamic topic route by schema or table regex #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..* canal.mq.partition=0 # hash partition config #canal.mq.partitionsNum=3 #canal.mq.partitionHash=test.table:id^name,.*\\..* #################################################
使用canal用戶啓動
cd /yourpath/soft/canal.deployer-1.1.4 su canal sh bin/startup.sh --- sh bin/stop.sh // 關閉
ls /yourpath/soft/canal.deployer-1.1.4/logs basic 日誌 ls /yourpath/soft/canal.deployer-1.1.4/logs/basic/basic.log
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz
mkdir /yourpath/soft/canal.adapter-1.1.4 tar zxvf canal.adapter-1.1.4.tar.gz -C /yourpath/soft/canal.adapter-1.1.4
cd /yourpath/soft/canal.adapter-1.1.4 cat conf/application.yml |grep -v "^#" server: port: 8081 spring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null canal.conf: mode: tcp # kafka rocketMQ canalServerHost: 172.23.201.59:11111 //canal 地址和端口 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: accessKey: secretKey: srcDataSources: defaultDS: url: jdbc:mysql://172.23.201.55:3313/pf_basic?useUnicode=true //數據庫 username: canal password: canal canalAdapters: - instance: basic # canal instance Name or mq topic name 對應canal建立的文件夾 groups: - groupId: g1 outerAdapters: - name: es hosts: 172.16.195.227:9200,172.16.195.226:9200,172.16.195.228:9200,172.16.195.232:9200,172.16.195.233:9200 # es 集羣地址, 逗號分隔 properties: mode: rest # transport # or rest
adapter將會自動加載 conf/es 下的全部.yml結尾的配置文件
示例: cat pf_basic_sms_report.yml dataSourceKey: defaultDS destination: basic # basic 對應canal建立的文件夾 groupId: g1 esMapping: _index: sms_20200602 _type: report _id: _id upsert: true # pk: id sql: "SELECT a.id _id, a.id , a.mobile , a.supplier , a.msgid , a.status , a.status_desc statusDesc, a.uid , a.charging_count chargingCount, a.caller , a.report_time reportTime, a.create_time createTime FROM t_sms_status_report a" # objFields: # _labels: array:; # etlCondition: "where a.c_time>={}" commitBatch: 3000
dataSourceKey: defaultDS # 源數據源的key, 對應上面配置的srcDataSources中的值 outerAdapterKey: exampleKey # 對應application.yml中es配置的key destination: example # cannal的instance或者MQ的topic groupId: # 對應MQ模式下的groupId, 只會同步對應groupId的數據 esMapping: _index: mytest_user # es 的索引名稱 _type: _doc # es 的type名稱, es7下無需配置此項 _id: _id # es 的_id, 若是不配置該項必須配置下面的pk項_id則會由es自動分配 # pk: id # 若是不須要_id, 則須要指定一個屬性爲主鍵屬性 # sql映射 sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name, a.c_time as _c_time, c.labels as _labels from user a left join role b on b.id=a.role_id left join (select user_id, group_concat(label order by id desc separator ';') as labels from label group by user_id) c on c.user_id=a.id" # objFields: # _labels: array:; # 數組或者對象屬性, array:; 表明以;字段裏面是以;分隔的 # _obj: object # json對象 etlCondition: "where a.c_time>='{0}'" # etl 的條件參數 commitBatch: 3000 # 提交批大小
cd /yourpath/soft/canal.adapter-1.1.4 bin/startup.sh
cd /yourpath/soft/logstash/config/conf.d cat config-jdbc_mysql-to-es.conf input { jdbc { codec => "json_lines" add_field => {"@doc_type" => "send"} #數據庫驅動路徑 jdbc_driver_library => "/yourpath/soft/logstash/driver/mysql-connector-java-5.1.46.jar" jdbc_driver_class => "com.mysql.jdbc.Driver" #數據庫地址 jdbc_connection_string => "jdbc:mysql://172.23.201.55:3313/pf_basic" jdbc_user => "logstash" jdbc_password => "Logstash@yourpath" jdbc_paging_enabled => "true" jdbc_page_size => 10000 # sql_last_value 是一個內置的變量,記錄了上次執行的跟蹤字段值 #statement => "select * from t_sms_status_report where create_time > :sql_last_value" statement_filepath => "/yourpath/soft/logstash/config/conf.d/sql/sms_send.sql" #多久同步一次,一分鐘一次 schedule => "* * * * *" #是否開啓記錄追蹤 record_last_run => "true" #這裏能夠手動設置:sql_last_value的值,默認時間是1970-01-01,默認數字是0 last_run_metadata_path => "/yourpath/soft/logstash/config/conf.d/state/last_run.pf-basic-sms-send" #是否每次清除last_run_metadata_path的內容 clean_run => "false" #是否須要追蹤字段,若是爲true,則須要指定tracking_column,默認是timestamp use_column_value => "true" #指定追蹤的字段 tracking_column => "createTime" #將字段名稱轉換爲小寫形式,默認爲true lowercase_column_names => "false" #追蹤字段的類型,目前只有數字和時間類型,默認是數字類型 tracking_column_type => "timestamp" #設置時區 jdbc_default_timezone => "Asia/Shanghai" } } filter { mutate { remove_field => ["@version", "@timestamp"] split => {"reqPhone" => "," "sendPhone" => ","} } } output { elasticsearch { hosts => ["172.16.195.227:9200", "172.16.195.226:9200", "172.16.195.228:9200", "172.16.195.232:9200", "172.16.195.233:9200"] index => "sms" document_type => "%{@doc_type}" document_id => "%{id}" # 寫入動做,string類型,可能的值: (index), delete, create, update action => "index" # update模式下使用,以當前更新的文檔source看成upsert的內容,從而建立新文檔。默認值爲false doc_as_upsert => "false" } }
/yourpath/soft/logstash/config/conf.d/state 數據同步的狀態文件
/yourpath/soft/logstash/config/conf.d/sql 執行數據同步的sql文件
示例: cat sms_report.sql SELECT a.id , a.mobile , a.supplier , a.msgid , a.status , a.status_desc statusDesc, a.uid , a.charging_count chargingCount, a.caller , DATE_FORMAT(a.report_time, '%Y-%m-%dT%T+08:00') reportTime, DATE_FORMAT(a.create_time, '%Y-%m-%dT%T+08:00') createTime FROM t_sms_status_report a WHERE a.create_time > :sql_last_value
sudo /yourpath/soft/logstash/bin/logstash -f /yourpath/soft/logstash/config/conf.d/config-jdbc_mysql-to-es.conf
該方法進行的全量同步,以後使用canal 進行增量同步