還在爲數據同步而苦惱嗎?手把手教你實現canal數據同步(三)

Canal實戰之客戶端適配器
mysql


canal 1.1.1版本以後, 增長客戶端數據落地的適配及啓動功能, 目前支持功能:redis

  • 客戶端啓動器spring

  • 同步管理REST接口sql

  • 日誌適配器, 做爲DEMOmongodb

  • 關係型數據庫的數據同步(表對錶同步), ETL功能數據庫

  • HBase的數據同步(表對錶同步), ETL功能json

  • ElasticSearch多表數據同步,ETL功能(新)bootstrap

  • 後續支持redis、mongodbapp

1

適配器總體結構curl


client-adapter分爲適配器啓動器兩部分, 適配器爲多個fat jar, 每一個適配器會將本身所需的依賴打成一個包, 以SPI的方式讓啓動器動態加載, 目前全部支持的適配器都放置在plugin目錄下。

啓動器爲 SpringBoot 項目, 運行目錄結構爲:

圖片


詳細結構以下:



- bin    restart.sh    startup.bat    startup.sh    stop.sh- confbootstrap.yml    application.yml    - es    biz_order.yml    customer.yml    mytest_user.yml    - hbase        mytest_person2.yml    - rdb    mytest_user.yml- lib...- logs- pluginclient-adapter.elasticsearch-1.1.4-jar-with-dependencies.jar    client-adapter.hbase-1.1.4-jar-with-dependencies.jar    client-adapter.logger-1.1.4-jar-with-dependencies.jar    client-adapter.rdb-1.1.4-jar-with-dependencies.jar



2

適配器配置介紹


總配置文件 application.yml:



canal.conf:  canalServerHost: 127.0.0.1:11111     # 對應單機模式下的canal server的ip:port  zookeeperHosts: slave1:2181          # 對應集羣模式下的zk地址, 若是配置了canalServerHost, 則以canalServerHost爲準  mqServers: slave1:6667 #or rocketmq  # kafka或rocketMQ地址, 與canalServerHost不能並存  flatMessage: true                    # 扁平message開關, 是否以json字符串形式投遞數據, 僅在kafka/rocketMQ模式下有效  batchSize: 50                        # 每次獲取數據的批大小, 單位爲K  syncBatchSize: 1000                  # 每次同步的批數量  retries: 0                           # 重試次數, -1爲無限重試  timeout:                             # 同步超時時間, 單位毫秒  mode: tcp # kafka rocketMQ           # canal client的模式: tcp直連 kafka rocketMQ  srcDataSources:                      # 源數據庫    defaultDS:                         # 自定義名稱      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true   # jdbc url       username: root                                            # jdbc 帳號      password: 121212                                          # jdbc 密碼  canalAdapters:                       # 適配器列表  - instance: example                  # canal 實例名或者 MQ topic 名    groups:                            # 分組列表    - groupId: g1                      # 分組id, 若是是MQ模式將用到該值      outerAdapters:                   # 分組內適配器列表      - name: logger                   # 日誌打印適配器......

說明:


1. 一份數據能夠被多個group同時消費, 多個group之間會是一個並行執行, 一個group內部是一個串行執行多個outerAdapters, 好比例子中logger和hbase


2. 目前client adapter數據訂閱的方式支持兩種,直連canal server 或者 訂閱kafka/RocketMQ的消息



3

適配器啓動


前提:啓動canal server (單機模式)


上傳canal.adapter-1.1.4.tar.gz到/opt目錄下


1. 在/opt目錄下建立canal-adapter目錄:mkdir canal-adapter

2. 解壓壓縮包:tar -zxvf canal.adapter-1.1.4.tar.gz -C canal-adapter

3. 修改conf/application.yml,以下

4. 啓動:bin/startup.sh



server:  port: 8081logging:  level:    com.alibaba.otter.canal.client.adapter.hbase: DEBUGspring:  jackson:    date-format: yyyy-MM-dd HH:mm:ss    time-zone: GMT+8    default-property-inclusion: non_null

canal.conf:  canalServerHost: 127.0.0.1:11111  batchSize: 500                              syncBatchSize: 1000                        retries: 0                                timeout:                                  mode: tcp  canalAdapters:                              - instance: example                          groups:                                    - groupId: g1                                outerAdapters:                              - name: logger

logger適配器:


最簡單的處理, 將受到的變動事件經過日誌打印的方式進行輸出, 如配置所示, 只須要定義name: logger便可。


4

adapter管理REST接口


查詢全部訂閱同步的canal instance或MQ topic:


curl http://127.0.0.1:8081/destinations



圖片


數據同步開關:


curl http://127.0.0.1:8081/syncSwitch/example/off -X PUT


針對 example 這個canal instance/MQ topic 進行開關操做. off表明關閉, instance/topic下的同步將阻塞或者斷開鏈接再也不接收數據, on表明開啓


注: 若是在配置文件中配置了 zookeeperHosts 項, 則會使用分佈式鎖來控制HA中的數據同步開關, 若是是單機模式則使用本地鎖來控制開關


圖片


數據同步開關狀態:


curl http://127.0.0.1:8081/syncSwitch/example


查看指定 canal instance/MQ topic 的數據同步開關狀態


圖片

相關文章
相關標籤/搜索