Canal實戰之客戶端適配器
mysql
canal 1.1.1版本以後, 增長客戶端數據落地的適配及啓動功能, 目前支持功能:redis
客戶端啓動器spring
同步管理REST接口sql
日誌適配器, 做爲DEMOmongodb
關係型數據庫的數據同步(表對錶同步), ETL功能數據庫
HBase的數據同步(表對錶同步), ETL功能json
ElasticSearch多表數據同步,ETL功能(新)bootstrap
後續支持redis、mongodbapp
適配器總體結構curl
client-adapter分爲適配器和啓動器兩部分, 適配器爲多個fat jar, 每一個適配器會將本身所需的依賴打成一個包, 以SPI的方式讓啓動器動態加載, 目前全部支持的適配器都放置在plugin目錄下。
啓動器爲 SpringBoot 項目, 運行目錄結構爲:
詳細結構以下:
- bin restart.sh startup.bat startup.sh stop.sh- confbootstrap.yml application.yml - es biz_order.yml customer.yml mytest_user.yml - hbase mytest_person2.yml - rdb mytest_user.yml- lib...- logs- pluginclient-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar client-adapter.hbase-1.1.4-jar-with-dependencies.jar client-adapter.logger-1.1.4-jar-with-dependencies.jar client-adapter.rdb-1.1.4-jar-with-dependencies.jar
2
適配器配置介紹
總配置文件 application.yml:
canal.conf: canalServerHost: 127.0.0.1:11111 # 對應單機模式下的canal server的ip:port zookeeperHosts: slave1:2181 # 對應集羣模式下的zk地址, 若是配置了canalServerHost, 則以canalServerHost爲準 mqServers: slave1:6667 #or rocketmq # kafka或rocketMQ地址, 與canalServerHost不能並存 flatMessage: true # 扁平message開關, 是否以json字符串形式投遞數據, 僅在kafka/rocketMQ模式下有效 batchSize: 50 # 每次獲取數據的批大小, 單位爲K syncBatchSize: 1000 # 每次同步的批數量 retries: 0 # 重試次數, -1爲無限重試 timeout: # 同步超時時間, 單位毫秒 mode: tcp # kafka rocketMQ # canal client的模式: tcp直連 kafka rocketMQ srcDataSources: # 源數據庫 defaultDS: # 自定義名稱 url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true # jdbc url username: root # jdbc 帳號 password: 121212 # jdbc 密碼 canalAdapters: # 適配器列表 - instance: example # canal 實例名或者 MQ topic 名 groups: # 分組列表 - groupId: g1 # 分組id, 若是是MQ模式將用到該值 outerAdapters: # 分組內適配器列表 - name: logger # 日誌打印適配器......
說明:
1. 一份數據能夠被多個group同時消費, 多個group之間會是一個並行執行, 一個group內部是一個串行執行多個outerAdapters, 好比例子中logger和hbase
2. 目前client adapter數據訂閱的方式支持兩種,直連canal server 或者 訂閱kafka/RocketMQ的消息
3
適配器啓動
前提:啓動canal server (單機模式)
上傳canal.adapter-1.1.4.tar.gz到/opt目錄下
1. 在/opt目錄下建立canal-adapter目錄:mkdir canal-adapter
2. 解壓壓縮包:tar -zxvf canal.adapter-1.1.4.tar.gz -C canal-adapter
3. 修改conf/application.yml,以下
4. 啓動:bin/startup.sh
server: port: 8081logging: level: com.alibaba.otter.canal.client.adapter.hbase: DEBUGspring: jackson: date-format: yyyy-MM-dd HH:mm:ss time-zone: GMT+8 default-property-inclusion: non_null
canal.conf: canalServerHost: 127.0.0.1:11111 batchSize: 500 syncBatchSize: 1000 retries: 0 timeout: mode: tcp canalAdapters: - instance: example groups: - groupId: g1 outerAdapters: - name: logger
logger適配器:
最簡單的處理, 將受到的變動事件經過日誌打印的方式進行輸出, 如配置所示, 只須要定義name: logger便可。
adapter管理REST接口
查詢全部訂閱同步的canal instance或MQ topic:
curl http://127.0.0.1:8081/destinations
數據同步開關:
curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT
針對 example 這個canal instance/MQ topic 進行開關操做. off表明關閉, instance/topic下的同步將阻塞或者斷開鏈接再也不接收數據, on表明開啓
注: 若是在配置文件中配置了 zookeeperHosts 項, 則會使用分佈式鎖來控制HA中的數據同步開關, 若是是單機模式則使用本地鎖來控制開關
數據同步開關狀態:
curl http://127.0.0.1:8081/syncSwitch/example
查看指定 canal instance/MQ topic 的數據同步開關狀態