kafka connector 監聽sqlserver 新手嘗試

        以前拿canal監聽mysql的binlog並將消息遞給kafka topic,可是canal只能監聽mysql,假如數據庫是sqlserver\orcale\mongodb那麼徹底無能爲力.看了一下網上的資料,主流是用kafka connect來監聽sqlserver,下面分享一下我嘗試的過程.html

        最開始我看博客,上面寫的比較亂,介紹了不少東西,可是我不清楚這些東西之間有什麼關係,誰和誰通信,誰又起到什麼做用,和當初配置canal徹底不一樣.如今簡單說說,配置過程當中涉及到kafka connector,confluent,kafka.    kafka connector是kafka自帶特性,用來建立和管理數據流管道,是個和其它系統交換數據的簡單模型;java

confluent是一家圍繞kafka作產品的公司,不但提供數據傳輸的系統,也提供數據傳輸的工具,內部封裝了kafka.在這裏咱們只用它下載kafka連接sqlserver的connector組件.mysql

    我使用的kafka是用CDH cloudera manager安裝的,所以kafka的bin目錄\配置目錄\日誌什麼的都不在一塊兒,也沒有$KAFKA_HOME.雖然此次是測試功能,可是爲了之後下載更多connector組件考慮,我仍是下載了confluent.建議在官網下載,沒翻&牆,網速還能夠.git

confluent下載地址 https://www.confluent.io/download/    選擇下面的Download Confluent Platform,填寫郵件地址和用途下載.github

5.2版本下載地址:  http://packages.confluent.io/archive/5.2/web

在準備下載和解壓的位置,開始下載和解壓:sql

wget http://packages.confluent.io/archive/5.2/confluent-5.2.3-2.11.zip
tar -zxvf confluent-5.2.3-2.11.zip  confluent-5.2.3-2.11

解壓出來應該是有一下幾個文件夾(usr是我本身建立的,用來存儲用戶的配置文件和語句):mongodb

將CONFLUENT_HOME配置進環境變量裏:數據庫

vi /etc/profile
export CONFLUENT_HOME=/usr/software/confluent-5.2.3
export PATH=$PATH:$JAVA_HOME/bin:$JRE_HOME:$CONFLUENT_HOME/bin

路徑是我本身的,你們改爲本身的文件路徑.express

下載connector鏈接器組件,每一個組件鏈接jdbc的配置文件均可能不同,注意看官方文檔.我選擇的是 debezium-connector-sqlserver .先進入bin目錄,可以看到有confluent-hub  指令,咱們靠它來下載組件.

[root@centos04 bin]# confluent-hub install debezium/debezium-connector-sqlserver:latest
The component can be installed in any of the following Confluent Platform installations: 
  1. /usr/software/confluent-5.2.3 (based on $CONFLUENT_HOME) 
  2. /usr/software/confluent-5.2.3 (where this tool is installed) 
Choose one of these to continue the installation (1-2): 2
Do you want to install this into /usr/software/confluent-5.2.3/share/confluent-hub-components? (yN) y^H
Do you want to install this into /usr/software/confluent-5.2.3/share/confluent-hub-components? (yN) y

 
Component's license: 
Apache 2.0 
https://github.com/debezium/debezium/blob/master/LICENSE.txt 
I agree to the software license agreement (yN) y

輸入指令後先問你安裝組件位置,是$CONFLUENT_HOME目錄下仍是confluent目錄下,再問你組件是否安裝在{$confluent}/share/confluent-hub-components這個默認位置,選擇n的話能夠本身輸入文件位置,再問是否贊成許可,以及是否更新組件.假如沒有特別需求的話,直接選擇y就能夠了.

其它組件能夠在https://www.confluent.io/hub/裏面挑選,還有官方文檔教你如何配置,很重要.光看網上教程怎麼作沒有理解爲何這麼作很容易走彎路,根本不知道哪裏作錯了.我看了不少篇都是如出一轍,用的組件是 Confluent MSSQL Connector .可是這個組件已經沒有了,換其它組件的話配置須要更改.我就在這裏花費了很長時間.注意看官方文檔.

Debezium SQL Server的說明文檔地址:https://docs.confluent.io/current/connect/debezium-connect-sqlserver/index.html#sqlserver-source-connector

下載完成後就能夠在{$confluent}/share/confluent-hub-components目錄下面看見下載好的組件了.接下來配置kafka.

進入kafka的配置目錄,kafka單獨安裝的話位置是$KAFKA_HOME/config,CDH版本的配置文件在/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/etc/kafka/conf.dist下面.不知道安裝位置的話直接搜文件名connect-distributed.properties.假如這都沒有那說明你的kafka可能版本過低,沒有這個特性.

修改其中的connect-distributed.properties文件.

##
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements.  See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License.  You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
##

# This file contains some of the configurations for the Kafka Connect distributed worker. This file is intended
# to be used with the examples, and some settings may differ from those used in a production system, especially
# the `bootstrap.servers` and those specifying replication factors.

# A list of host/port pairs to use for establishing the initial connection to the Kafka cluster.
#kafka集羣位置,須要配置
bootstrap.servers=centos04:9092,centos05:9092,centos06:9092

# unique name for the cluster, used in forming the Connect cluster group. Note that this must not conflict with consumer group IDs
#group.id,默認都是connect-cluster,保持一致就行
group.id=connect-cluster

# The converters specify the format of data in Kafka and how to translate it into Connect data. Every Connect user will
# need to configure these based on the format they want their data in when loaded from or stored into Kafka
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
# Converter-specific settings can be passed in by prefixing the Converter's setting with the converter we want to apply
# it to
key.converter.schemas.enable=true
value.converter.schemas.enable=true

# Topic to use for storing offsets. This topic should have many partitions and be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
offset.storage.topic=connect-offsets
offset.storage.replication.factor=3
offset.storage.partitions=1

# Topic to use for storing connector and task configurations; note that this should be a single partition, highly replicated,
# and compacted topic. Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
config.storage.topic=connect-configs
config.storage.replication.factor=3

# Topic to use for storing statuses. This topic can have multiple partitions and should be replicated and compacted.
# Kafka Connect will attempt to create the topic automatically when needed, but you can always manually create
# the topic before starting Kafka Connect if a specific topic configuration is needed.
# Most users will want to use the built-in default replication factor of 3 or in some cases even specify a larger value.
# Since this means there must be at least as many brokers as the maximum replication factor used, we'd like to be able
# to run this example on a single-broker cluster and so here we instead set the replication factor to 1.
status.storage.topic=connect-status
status.storage.replication.factor=3
#status.storage.partitions=1

offset.storage.file.filename=/var/log/confluent/offset-storage-file

# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000

# These are provided to inform the user about the presence of the REST host and port configs 
# Hostname & Port for the REST API to listen on. If this is set, it will bind to the interface used to listen to requests.
#rest.host.name=
#kafka connector端口號,能夠修改
rest.port=8083

# The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers.
#rest.advertised.host.name=
#rest.advertised.port=

# Set to a list of filesystem paths separated by commas (,) to enable class loading isolation for plugins
# (connectors, converters, transformations). The list should consist of top level directories that include 
# any combination of: 
# a) directories immediately containing jars with plugins and their dependencies
# b) uber-jars with plugins and their dependencies
# c) directories immediately containing the package directory structure of classes of plugins and their dependencies
# Examples: 
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
# Replace the relative path below with an absolute path if you are planning to start Kafka Connect from within a
# directory other than the home directory of Confluent Platform.
#組件位置,把confluent組件下載位置加上去
plugin.path=/usr/software/confluent-5.2.3/share/java/confluent-hub-client,,/usr/software/confluent-5.2.3/share/confluent-hub-client,/usr/software/confluent-5.2.3/share/confluent-hub-components

先建立使用connector要用到的特殊topic,避免在啓動kafka connector的時候建立失敗致使kafka connector啓動失敗.特殊topic有三個:

kafka-topics --create --zookeeper 192.168.49.104:2181 --topic connect-offsets --replication-factor 3 --partitions 1
kafka-topics --create --zookeeper 192.168.49.104:2181 --topic connect-configs --replication-factor 3 --partitions 1
kafka-topics --create --zookeeper 192.168.49.104:2181 --topic connect-status --replication-factor 3 --partitions 1

再進入kafka的bin目錄,CDH版本的是/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/lib/kafka/bin.

執行connect-distributed.sh指令:

sh connect-distributed.sh  /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/etc/kafka/conf.dist/connect-distributed.properties

說一點,CDH安裝的kafka在執行指令的時候會報錯找不到日誌文件,緣由是CDH安裝的kafka各個部分都不在一塊兒.直接修改connect-distributed.sh ,把裏面的地址寫死就行了.

vi connect-distributed.sh

#修改的地方
base_dir=$(dirname $0)

if [ "x$KAFKA_LOG4J_OPTS" = "x" ]; then
    export KAFKA_LOG4J_OPTS="-Dlog4j.configuration=file:/opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/etc/kafka/conf.dist/connect-log4j.properties"
fi

這樣執行起來就沒有問題了.

以上執行的時候是在前臺執行,前臺中止退出的話kafka connector也就中止了,這種狀況適合調試.在後臺運行須要加上 -daemon 參數.

sh connect-distributed.sh -daemon /opt/cloudera/parcels/CDH-6.3.0-1.cdh6.3.0.p0.1279813/etc/kafka/conf.dist/connect-distributed.properties

使用Debezium SQL Server來監聽的話須要開啓sqlserver的CDC功能.CDC功能要先開啓庫的捕獲,再開啓表的捕獲,才能監聽到表的變化.

我使用的是navicat來鏈接數據庫,你們用本身合適的工具來就能夠了.

開啓庫的捕獲:

use database;
EXEC sys.sp_cdc_enable_db

這一步後數據庫會多出一個叫cdc的模式,下面有5張表.

查詢哪些數據庫開啓了CDC功能:

select * from sys.databases where is_cdc_enabled = 1

啓用表的CDC功能:

use database; 
EXEC sys.sp_cdc_enable_table  
    @source_schema = 'dbo',  
    @source_name = 'table_name',  
    @role_name = null;

查看哪些表啓用了CDC功能:

use database;
select name, is_tracked_by_cdc from sys.tables where is_tracked_by_cdc = 1

以上就開啓了對錶監聽的CDC功能.

當咱們啓動KafkaConnector後,就可以經過接口的形式來訪問和提交信息.

查看kafka connector信息:

[root@centos04 huishui]# curl -s centos04:8083 | jq
{
  "version": "2.2.1-cdh6.3.0",
  "commit": "unknown",
  "kafka_cluster_id": "GwdoyDpbT5uP4k2CN6zbrw"
}

8083是上面配置的端口號,一樣也能夠經過web頁面來訪問.

查看安裝了哪些connector鏈接器:

[root@centos04 huishui]# curl -s centos04:8083 | jq
{
  "version": "2.2.1-cdh6.3.0",
  "commit": "unknown",
  "kafka_cluster_id": "GwdoyDpbT5uP4k2CN6zbrw"
}
[root@centos04 huishui]# curl -s centos04:8083/connector-plugins | jq
[
  {
    "class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "type": "sink",
    "version": "10.0.2"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSinkConnector",
    "type": "sink",
    "version": "5.5.1"
  },
  {
    "class": "io.confluent.connect.jdbc.JdbcSourceConnector",
    "type": "source",
    "version": "5.5.1"
  },
  {
    "class": "io.debezium.connector.sqlserver.SqlServerConnector",
    "type": "source",
    "version": "1.2.2.Final"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "type": "sink",
    "version": "2.2.1-cdh6.3.0"
  },
  {
    "class": "org.apache.kafka.connect.file.FileStreamSourceConnector",
    "type": "source",
    "version": "2.2.1-cdh6.3.0"
  }
]

我安裝了不少,有io.debezium.connector.sqlserver.SqlServerConnector就說明沒問題.

查看當前運行的任務/Task:

[root@centos04 huishui]# curl -s centos04:8083/connectors | jq
[]

因爲咱們尚未提交任何用戶配置,因此也就沒有任務,返回就是一個空的json.到這裏說明kafka connector啓動成功,可以正常進行用戶配置.接下來纔是有關業務的操做,編寫一個用戶配置的json,經過接口進行提交:

#我選擇把用戶配置保存下來.因爲個人kafka都不在一個文件夾下面,因此我把配置文件都存在confluent/usr中.其實存不存都無所謂的.按照官方文檔,我選擇存下來.
#當建立好kafka connector以後,會自動建立kafka topic.名稱爲 ${server.name}.$tableName.debezium不能監聽單獨一張表,全部表都會有對應的topic.
cd $CONFLUENT
mkdir usr
cd usr
vi register-sqlserver.json
{
 "name": "inventory-connector",
 "config": {
     "connector.class" : "io.debezium.connector.sqlserver.SqlServerConnector",
     "tasks.max" : "1",
     "database.server.name" : "server.name",
     "database.hostname" : "localhost",
     "database.port" : "1433",
     "database.user" : "sa",
     "database.password" : "password!",
     "database.dbname" : "rcscounty_quannan",
     "database.history.kafka.bootstrap.servers" : "centos04:9092",
     "database.history.kafka.topic": "schema-changes.inventory"
     }
 }

curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://centos04:8083/connectors/ -d @register-sqlserver.json

提交失敗會有錯誤信息.看看錯誤信息是什麼而後跟着改就能夠了.當提交成功後,再查看當前運行的Task,就會出現有一個connector:

[root@centos04 huishui]# curl -s centos04:8083/connectors | jq
[
  "inventory-connector"
]

查看kafka topic:

kafka-topics --list --zookeeper centos04:2181

會看見kafka建立好了topic,假如沒有對應的topic,那麼多是connector在運行時出現了問題.查看當時建立的connector狀態:

[root@centos04 usr]# curl -s centos04:8083/connectors/inventory-connector/status | jq
{
  "name": "inventory-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "192.168.49.104:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "192.168.49.104:8083"
    }
  ],
  "type": "source"
}

我這個是運行良好的狀態.運行沒有問題,就開始監聽開啓了CDC功能的表對應的topic,看看是否可以成功監聽表的改動:

kafka-console-consumer --bootstrap-server centos04:9092 --topic server.name.tableName

能夠看到一次Debezium connector 建立的topic傳遞的消息很是多,可能須要修改kafka最大消息體.我以前設置的是9M,因此這裏沒遇到問題.

Debezium 傳遞的數據庫變更,新增\修改\刪除\模式更改的json都有所不一樣,具體詳情請看用於SQL Server的Debezium鏈接器.

總之能看到變更就說明調試成功,至於以後如何利用kafka topic,就以後再說.

但願這篇文章可以對您有所幫助!

 

 

附上學習中看過的文章:

sqlserver數據實時同步至kafka

啓動Debezium SQL Server鏈接器

Kafka Connect簡介

confluent介紹

confluent本地安裝

SQLServer 啓用 CDC

相關文章
相關標籤/搜索