canal,canal adapter安裝使用

官網

canal https://github.com/alibaba/canal/wiki/QuickStartjava

canal-adapter 安裝 https://github.com/alibaba/canal/wiki/ClientAdaptermysql

ElasticSearch適配器 https://github.com/alibaba/canal/wiki/Sync-ESgit

canal

一、下載canal

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gzgithub

二、配置mysql

對於自建 MySQL , 須要先開啓 Binlog 寫入功能,配置 binlog-format 爲 ROW 模式,my.cnf 中配置以下spring

[mysqld]
log-bin=mysql-bin # 開啓 binlog
binlog-format=ROW # 選擇 ROW 模式
server_id=1 # 配置 MySQL replaction 須要定義,不要和 canal 的 slaveId 重複

受權 canal 連接 MySQL 帳號具備做爲 MySQL slave 的權限, 若是已有帳戶可直接 grantsql

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

三、安裝配置canal(須要java環境)

如下配置根據pf部門提供進行修改數據庫

解壓縮json

mkdir /yourpath/soft/canal.deployer-1.1.4
tar zxvf canal.deployer-1.1.4.tar.gz  -C /yourpath/soft/canal.deployer-1.1.4

配置修改,複製 example 文件夾新爲 basic 文件夾 (basic 這個名稱與後面adapter配置對應)數組

cd /yourpath/soft/canal.deployer-1.1.4
cp -r example basic
vi conf/basic/instance.properties

conf/basic/instance.properties 修改(數據庫,鏈接數據庫帳戶密碼)app

canal.instance.master.address=172.23.201.55:3313    //數據庫地址端口
canal.instance.dbUsername=canal                // 數據庫帳戶
canal.instance.dbPassword=canal                // 數據庫密碼
canal.instance.filter.regex=pf_basic\\..*      // 匹配 pf_basic庫

修改後

#################################################
## mysql serverId , v1.0.26+ will autoGen
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# position info
canal.instance.master.address=172.23.201.55:3313      //數據庫地址端口
canal.instance.master.journal.name=
canal.instance.master.position=
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal                    // 數據庫帳戶
canal.instance.dbPassword=canal                    // 數據庫密碼
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=pf_basic\\..*          // 匹配 pf_basic庫
# table black regex
canal.instance.filter.black.regex=
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.partitionsNum=3
#canal.mq.partitionHash=test.table:id^name,.*\\..*
#################################################

四、啓動

使用canal用戶啓動

cd /yourpath/soft/canal.deployer-1.1.4
su canal
sh bin/startup.sh
--- sh bin/stop.sh    // 關閉

五、查看日誌

ls /yourpath/soft/canal.deployer-1.1.4/logs
basic 日誌
ls /yourpath/soft/canal.deployer-1.1.4/logs/basic/basic.log

canal-adapter

一、下載 canal adapter

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

二、安裝canal adapter

mkdir /yourpath/soft/canal.adapter-1.1.4
tar zxvf canal.adapter-1.1.4.tar.gz  -C /yourpath/soft/canal.adapter-1.1.4

三、ElasticSearch適配器 application.yml

cd /yourpath/soft/canal.adapter-1.1.4

cat conf/application.yml |grep -v "^#"
server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp # kafka rocketMQ
  canalServerHost: 172.23.201.59:11111              //canal 地址和端口
  batchSize: 500
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://172.23.201.55:3313/pf_basic?useUnicode=true        //數據庫
      username: canal
      password: canal
  canalAdapters:
  - instance: basic            # canal instance Name or mq topic name  對應canal建立的文件夾
    groups:
    - groupId: g1
      outerAdapters:
      - name: es
        hosts: 172.16.195.227:9200,172.16.195.226:9200,172.16.195.228:9200,172.16.195.232:9200,172.16.195.233:9200                     # es 集羣地址, 逗號分隔
        properties:
          mode: rest # transport # or rest

四、es配置文件

adapter將會自動加載 conf/es 下的全部.yml結尾的配置文件

示例:  cat pf_basic_sms_report.yml 
dataSourceKey: defaultDS
destination: basic               # basic 對應canal建立的文件夾
groupId: g1
esMapping:
  _index: sms_20200602
  _type: report
  _id: _id
  upsert: true
#  pk: id

  sql: "SELECT 
    a.id _id,
    a.id ,
    a.mobile ,
    a.supplier ,
    a.msgid ,
    a.status ,
    a.status_desc statusDesc,
    a.uid ,
    a.charging_count chargingCount,
    a.caller ,
    a.report_time reportTime,
    a.create_time createTime
FROM t_sms_status_report a"

#  objFields:
#    _labels: array:;
#  etlCondition: "where a.c_time>={}"
  commitBatch: 3000
dataSourceKey: defaultDS        # 源數據源的key, 對應上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 對應application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId:                        # 對應MQ模式下的groupId, 只會同步對應groupId的數據
esMapping:
  _index: mytest_user           # es 的索引名稱
  _type: _doc                   # es 的type名稱, es7下無需配置此項
  _id: _id                      # es 的_id, 若是不配置該項必須配置下面的pk項_id則會由es自動分配
#  pk: id                       # 若是不須要_id, 則須要指定一個屬性爲主鍵屬性
  # sql映射
  sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
        a.c_time as _c_time, c.labels as _labels from user a
        left join role b on b.id=a.role_id
        left join (select user_id, group_concat(label order by id desc separator ';') as labels from label
        group by user_id) c on c.user_id=a.id"
#  objFields:
#    _labels: array:;           # 數組或者對象屬性, array:; 表明以;字段裏面是以;分隔的
#    _obj: object               # json對象
  etlCondition: "where a.c_time>='{0}'"     # etl 的條件參數
  commitBatch: 3000                         # 提交批大小

五、啓動ES同步數據

cd /yourpath/soft/canal.adapter-1.1.4
bin/startup.sh

mysql --> logstash --> es

一、安裝logstash

二、配置文件 config-jdbc_mysql-to-es.conf

cd /yourpath/soft/logstash/config/conf.d
cat config-jdbc_mysql-to-es.conf 
input {
    jdbc {
        codec => "json_lines"
        add_field => {"@doc_type" => "send"}

        #數據庫驅動路徑
        jdbc_driver_library => "/yourpath/soft/logstash/driver/mysql-connector-java-5.1.46.jar"
        jdbc_driver_class => "com.mysql.jdbc.Driver"

        #數據庫地址
        jdbc_connection_string => "jdbc:mysql://172.23.201.55:3313/pf_basic"
        jdbc_user => "logstash"
        jdbc_password => "Logstash@yourpath"
        jdbc_paging_enabled => "true"
        jdbc_page_size => 10000
        # sql_last_value 是一個內置的變量,記錄了上次執行的跟蹤字段值
        #statement => "select * from t_sms_status_report where create_time > :sql_last_value"
        statement_filepath => "/yourpath/soft/logstash/config/conf.d/sql/sms_send.sql"
        #多久同步一次,一分鐘一次
        schedule => "* * * * *"

        #是否開啓記錄追蹤
        record_last_run => "true"
        #這裏能夠手動設置:sql_last_value的值,默認時間是1970-01-01,默認數字是0
        last_run_metadata_path => "/yourpath/soft/logstash/config/conf.d/state/last_run.pf-basic-sms-send"
        #是否每次清除last_run_metadata_path的內容
        clean_run => "false"

        #是否須要追蹤字段,若是爲true,則須要指定tracking_column,默認是timestamp
        use_column_value => "true"
        #指定追蹤的字段
        tracking_column => "createTime"
        #將字段名稱轉換爲小寫形式,默認爲true
        lowercase_column_names => "false"
        #追蹤字段的類型,目前只有數字和時間類型,默認是數字類型
        tracking_column_type => "timestamp"
        #設置時區
        jdbc_default_timezone => "Asia/Shanghai"
    }
}

filter {
    mutate {
        remove_field => ["@version", "@timestamp"]
        split => {"reqPhone" => "," "sendPhone" => ","}
    }
}

output {
  elasticsearch {
    hosts => ["172.16.195.227:9200", "172.16.195.226:9200", "172.16.195.228:9200", "172.16.195.232:9200", "172.16.195.233:9200"]
    index => "sms"
    document_type => "%{@doc_type}"
    document_id => "%{id}"
    # 寫入動做,string類型,可能的值: (index), delete, create, update
    action => "index"
    # update模式下使用,以當前更新的文檔source看成upsert的內容,從而建立新文檔。默認值爲false
    doc_as_upsert => "false"
  }
}

三、文件目錄(sql,state)

/yourpath/soft/logstash/config/conf.d/state 數據同步的狀態文件

/yourpath/soft/logstash/config/conf.d/sql 執行數據同步的sql文件

示例:  cat sms_report.sql 
SELECT 
    a.id ,
    a.mobile ,
    a.supplier ,
    a.msgid ,
    a.status ,
    a.status_desc statusDesc,
    a.uid ,
    a.charging_count chargingCount,
    a.caller ,
    DATE_FORMAT(a.report_time, '%Y-%m-%dT%T+08:00') reportTime,
    DATE_FORMAT(a.create_time, '%Y-%m-%dT%T+08:00') createTime
FROM t_sms_status_report a
WHERE a.create_time > :sql_last_value

四、啓動logstash

a、寫入pipline 啓動logstash
b、直接啓動文件

sudo /yourpath/soft/logstash/bin/logstash -f /yourpath/soft/logstash/config/conf.d/config-jdbc_mysql-to-es.conf

五、同步完成後,能夠結束該進程

該方法進行的全量同步,以後使用canal 進行增量同步

相關文章
相關標籤/搜索