在處理實時數據時,須要即時地得到數據庫表中數據的變化,而後將數據變化發送到Kafka中。這篇文章將介紹如何使用Kafka Connector完成這一工做。當獲取實時數據時,數據源須要支持對數據變化進行反饋。不一樣的數據源採用了不一樣的技術和方法實現該功能,由於咱們的業務數據庫是MS SQL Server,所以這篇文章採用MSQL做爲數據源。html
1. 選擇Connector
首先須要選擇Connector,不一樣的數據源有不一樣的Connector,例如ActiveMQ Connector、MySql Connector、MSSQL Connector等。即使是同一數據源,也可能有不一樣的第三方提供。我一共嘗試了下面兩個MSSQL Connector:java
比較遺憾的是:這兩個Connector,debezium的是Alpha版本,confluent的是Preview版本,反正都不是正式版,而MySql都已經有正式版本了,可見開源社區對MS真的不友好呀 >_<、 它們兩個一個是使用MSSQL Server的 Change Data Capture 獲取數據變動,一個是使用 Change Tracking。sql
由於Change Tracking相比Change Data Capture來講,更輕量一些,所以我選用了confluent的Connector。其下載地址是:<https://www.confluent.io/hub/>;shell
下載後,將其解壓縮至 $KAFKA_HOME/connectors 文件夾下,以下圖所示:數據庫
說明:$KAFKA_HOME是你的kafka安裝目錄,若是是集羣,要安裝在集羣下每臺機器的connectors目錄下。apache
在上面的截圖中,能夠看到我還安裝了confluentinc-kafka-connect-hdfs-5.0.0和debezium-connector-sqlserver兩個connector。json
3. 配置Connector
接下來要對Connector進行配置,此時能夠回顧一下 Kafka Connect 基本概念。Connector是一組獨立的集羣,而且是做爲Kafka集羣的客戶端,咱們首先須要對Connector進行配置,配置文件位於 $KAFKA_HOME/config/connect-distributed.properties:bootstrap
# kafka集羣地址 bootstrap.servers=kafka1:9092,kafka2:9092,kafka3:9092 # Connector集羣的名稱,同一集羣內的Connector須要保持此group.id一致 group.id=connect-cluster # 存儲到kafka的數據格式 key.converter=org.apache.kafka.connect.json.JsonConverter value.converter=org.apache.kafka.connect.json.JsonConverter key.converter.schemas.enable=false value.converter.schemas.enable=false # 內部轉換器的格式,針對offsets、config和status,通常不須要修改 internal.key.converter=org.apache.kafka.connect.json.JsonConverter internal.value.converter=org.apache.kafka.connect.json.JsonConverter internal.key.converter.schemas.enable=false internal.value.converter.schemas.enable=false # 用於保存offsets的topic,應該有多個partitions,而且擁有副本(replication) # Kafka Connect會自動建立這個topic,可是你能夠根據須要自行建立 # 若是kafka單機運行,replication.factor設置爲1;當kafka爲集羣時,能夠設置不大於集羣中主機數 # 由於我這裏的環境是3主機的集羣,所以設爲2 offset.storage.topic=connect-offsets offset.storage.replication.factor=2 offset.storage.partitions=12 # 保存connector和task的配置,應該只有1個partition,而且有多個副本 config.storage.topic=connect-configs config.storage.replication.factor=2 # 用於保存狀態,能夠擁有多個partition和replication status.storage.topic=connect-status status.storage.replication.factor=2 status.storage.partitions=6 # Flush much faster than normal, which is useful for testing/debugging offset.flush.interval.ms=10000 # RESET主機名,默認爲本機 #rest.host.name= # REST端口號 rest.port=18083 # The Hostname & Port that will be given out to other workers to connect to i.e. URLs that are routable from other servers. #rest.advertised.host.name= #rest.advertised.port= # 保存connectors的路徑 # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors, plugin.path=/opt/kafka/kafka_2.11-1.1.0/connectors注意到connect-distributed.properties中的distributed。Kafka Connector有兩種運行模式,單機(Standalone)和分佈式(Distrubited)。由於單機一般做爲測試運行,所以這篇文章只演示分佈式運行模式。在config文件夾下,還有一個單機運行的配置文件,叫作connect-standalone.properties,內容大同小異。4. 建立Topic
儘管首次運行Kafka connector時,會自動建立上面的topic,可是若是建立出錯,那麼Connector就會啓動失敗。保險起見,能夠在運行Connector以前,手動建立好上面的三個特殊topic。vim
# bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic connect-offsets --replication-factor 2 --partitions 12 # bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic connect-configs --replication-factor 2 --partitions 1 # bin/kafka-topics.sh --zookeeper zookeeper1:2181/kafka --create --topic connect-status --replication-factor 2 --partitions 65. 運行Connector
接下來就能夠運行Connctor了,此時尚未涉及到任何業務或者數據庫相關的配置和操做(即 Kafka Connect 基本概念 中提到的用戶配置)。數組
執行下面的代碼以運行Connector:
# bin/connect-distributed.sh config/connect-distributed.properties上面這樣是前臺運行,當退出shell後進程也就結束了,前臺運行的好處就是在開始運行時便於調試。若是想要後臺運行,則需加上-daemon選項:
# bin/connect-distributed.sh -daemon config/connect-distributed.properties運行connect時,會看到不停地涌現大量INFO信息,此時能夠修改一下connect-log4j.properties,只顯示WARN信息。
# vim config/connect-log4j.properties log4j.rootLogger=WARN, stdout6. 開啓MSSQL數據庫的Change Tracking
在繼續進行以前,咱們在數據庫中建立表test_online,而且開啓Change Tracking功能:
Go CREATE TABLE [dbo].[test_online]( [Id] [int] IDENTITY(1,1) NOT NULL, [UserName] [varchar](50) NOT NULL, [IsOnline] [bit] NOT NULL, [LastLogin] [int] NOT NULL, CONSTRAINT [PK_test_online] PRIMARY KEY CLUSTERED ( [Id] ASC ) Go ALTER DATABASE db_name SET CHANGE_TRACKING = ON (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON) Go ALTER TABLE [db_name].dbo.[test_online] ENABLE CHANGE_TRACKING WITH (TRACK_COLUMNS_UPDATED = ON)7. Kafka Connector REST API
當Kafka Connector運行起來之後,它就開啓了REST API端口,像咱們上面配置的是:18083。若是咱們須要運行Task,好比實時捕捉數據庫數據變化並寫入Kafka,那麼就須要像這個REST API提交用戶配置(User Config)。在提交用戶配置以前,咱們先看看Kafka Connector REST API都包含哪些常見功能:
7.1 獲取Worker的信息
由於個人kafka(主機名分別爲kafka一、kafka二、kafka3)和kafka connector集羣是共用主機的,所以可使用下面的命令獲取(你須要將下面的kafka1改爲ip或者相應的主機名):
# curl -s kafka1:18083/ | jq { "version": "1.1.0", "commit": "fdcf75ea326b8e07", "kafka_cluster_id": "N93UISCxTS-SYZPfM8p1sQ" }7.2 獲取Worker上已經安裝的Connector
此時的Connector是靜態概念,即上面第一節安裝的Confluent MSSQL Connector,從下面的顯示能夠看到,我安裝了好幾個Connector:
# curl -s kafka1:18083/connector-plugins | jq [ { "class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector", "type": "source", "version": "0.0.1.9" }, { "class": "io.confluent.connect.hdfs.HdfsSinkConnector", "type": "sink", "version": "5.0.0" }, { "class": "io.confluent.connect.hdfs.tools.SchemaSourceConnector", "type": "source", "version": "1.1.0" }, { "class": "io.confluent.connect.storage.tools.SchemaSourceConnector", "type": "source", "version": "1.1.0" }, { "class": "io.debezium.connector.sqlserver.SqlServerConnector", "type": "source", "version": "0.9.0.Alpha1" }, { "class": "org.apache.kafka.connect.file.FileStreamSinkConnector", "type": "sink", "version": "1.1.0" }, { "class": "org.apache.kafka.connect.file.FileStreamSourceConnector", "type": "source", "version": "1.1.0" } ]對於你來講,可能就只有io.confluent.connect.cdc.mssql.MsSqlSourceConnector這一個connector。
7.3 列出當前運行的connector(task)
# curl -s kafka1:18083/connectors | jq []由於咱們當前Connector中沒有提交過任何的用戶配置(即沒有啓動Task),所以上面返回空數組。
7.4 提交Connector用戶配置
當提交用戶配置時,就會啓動一個Connector Task,Connector Task執行實際的做業。用戶配置是一個Json文件,一樣經過REST API提交:
# curl -s -X POST -H "Content-Type: application/json" --data '{ "name": "connector-mssql-online", "config": { "connector.class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector", "tasks.max": 1, "server.name": "192.168.0.21", "server.port" : "1433", "username": "user_id", "password": "your_password", "initial.database": "db_name", "change.tracking.tables": "dbo.test_online" } }' http://kafka1:18083/connectors | jq注意上面的配置要修改爲你的本地配置。提交完成後,再次執行上一小節的命令,會看到已經有一個connector在運行了,其名稱爲connector-mssql-online:
curl -s kafka1:18083/connectors | jq [ "connector-mssql-online" ]7.5 查看connector的信息
# curl -s kafka1:18083/connectors/connector-mssql-online | jq { "name": "connector-mssql-online", "config": { "connector.class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector", "password": "your_password", "initial.database": "db_name", "server.name": "192.168.0.21", "tasks.max": "1", "server.port": "1433", "name": "connector-mssql-online", "change.tracking.tables": "dbo.test_online", "username": "user_id" }, "tasks": [ { "connector": "connector-mssql-online", "task": 0 } ], "type": "source" }注意:上面task:0,不是說有0個task,是task的id是0。
7.6 查看connector下運行的task信息
使用下面的命令,能夠查看connector下運行的task的信息:
# curl -s kafka1:18083/connectors/connector-mssql-online/tasks | jq [ { "id": { "connector": "connector-mssql-online", "task": 0 }, "config": { "connector.class": "io.confluent.connect.cdc.mssql.MsSqlSourceConnector", "password": "your_password", "initial.database": "db_name", "task.class": "io.confluent.connect.cdc.mssql.MsSqlSourceTask", "server.name": "192.168.0.21", "tasks.max": "1", "server.port": "1433", "name": "connector-mssql-online", "change.tracking.tables": "dbo.test_online", "username": "user_id" } } ]這裏task的配置信息繼承自connector的配置。
7.7 查看connector當前狀態
# curl -s kafka1:18083/connectors/connector-mssql-online/status | jq { "name": "connector-mssql-online", "connector": { "state": "RUNNING", "worker_id": "192.168.0.31:18083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "192.168.0.31:18083" } ], "type": "source" }7.8 暫停/重啓 Connector
# curl -s -X PUT kafka1:18083/connectors/connector-mssql-online/pause # curl -s -X PUT kafka1:18083/connectors/connector-mssql-online/resume7.9 刪除 Connector
# curl -s -X DELETE kafka1:18083/connectors/connector-mssql-online8. 從Kafka中讀取變更數據
默認狀況下,MSSQL Connector會將表的變更寫入到:${databaseName}.${tableName} 這個topic中,這個topic的名稱能夠經過 topic.format 這個用戶配置參數中進行設置,由於咱們並無配置,所以,topic的名稱爲db_name.test_online。
運行下面的控制檯腳本,從Kafka中實時讀取topic的內容:
# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic db_name.test_online --from-beginning此時由於沒有任何數據,所以控制檯會阻塞。
9. 對test_online表進行修改
依次執行下面的增刪改語句,對test_online表進行修改:
insert into [test_online](UserName,IsOnline,LastLogin) values('子陽', 1, DATEDIFF(s, '19700101',GETDATE())) update test_online Set UserName='吉米' where UserName='子陽' Delete test_online Where UserName='吉米'如今查看Kafka讀取端控制檯,能夠看到以Json格式實時收到了數據庫變更的消息:
# bin/kafka-console-consumer.sh --bootstrap-server kafka1:9092 --topic tgstat_ddztest.test_online {"Id":5,"UserName":"子陽","IsOnline":true,"LastLogin":1540635666,"_cdc_metadata":{"sys_change_operation":"I","sys_change_creation_version":"13","sys_change_version":"13","databaseName":"tgstat_ddztest","schemaName":"dbo","tableName":"test_online"}} {"Id":5,"UserName":"吉米","IsOnline":true,"LastLogin":1540635666,"_cdc_metadata":{"sys_change_operation":"U","sys_change_creation_version":"0","sys_change_version":"14","databaseName":"tgstat_ddztest","schemaName":"dbo","tableName":"test_online"}} null從上面的消息能夠看到,對於delete操做,收到了null。對於insert和update操做,收到了詳細的變更信息。
至此,咱們就配置完了Kafka Connector,而且實時獲取到了數據庫變動的消息。後續可使用Spark Stream鏈接至此Topic,進行實時的數據運算和分析。之後有機會再進行掩飾。