在現實業務中,Kafka常常會遇到的一個集成場景就是,從數據庫獲取數據,由於關係數據庫是一個很是豐富的事件源。數據庫中的現有數據以及對該數據的任何更改均可以流式傳輸到Kafka主題中,在這裏這些事件可用於驅動應用,也能夠流式傳輸到其它數據存儲(好比搜索引擎或者緩存)用於分析等。html
實現這個需求有不少種作法,可是在本文中,會聚焦其中的一個解決方案,即Kafka鏈接器中的JDBC鏈接器,講述如何進行配置,以及一些問題排查的技巧,至於更多的細節,請參見Kafka的文檔。java
Kafka鏈接器中的JDBC鏈接器包含在Confluent Platform中,也能夠與Confluent Hub分開安裝。它能夠做爲源端從數據庫提取數據到Kafka,也能夠做爲接收端從一個Kafka主題中將數據推送到數據庫。幾乎全部關係數據庫都提供JDBC驅動,包括Oracle、Microsoft SQL Server、DB二、MySQL和Postgres。mysql
下面將從最簡單的Kafka鏈接器配置開始,而後進行構建。本文中的示例是從MySQL數據庫中提取數據,該數據庫有兩個模式,每一個模式都有幾張表:git
mysql> SELECT table_schema, table_name FROM INFORMATION_SCHEMA.tables WHERE TABLE_SCHEMA != 'information_schema'; +--------------+--------------+ | TABLE_SCHEMA | TABLE_NAME | +--------------+--------------+ | demo | accounts | | demo | customers | | demo | transactions | | security | firewall | | security | log_events | +--------------+--------------+
在進行配置以前,要確保Kafka鏈接器能夠實際鏈接到數據庫,即確保JDBC驅動可用。若是使用的是SQLite或Postgres,那麼驅動已經包含在內,就能夠跳過此步驟。對於全部其它數據庫,須要將相關的JDBC驅動JAR文件放在和kafka-connect-jdbc
JAR相同的文件夾中。此文件夾的標準位置爲:github
share/java/kafka-connect-jdbc/
;/usr/share/java/kafka-connect-jdbc/
,關於如何將JDBC驅動添加到Kafka鏈接器的Docker容器,請參閱此處;kafka-connect-jdbc
JAR位於其它位置,則可使用plugin.path
指向包含它的文件夾,並確保JDBC驅動位於同一文件夾中。還能夠在啓動Kafka鏈接器時指定CLASSPATH
,設置爲能夠找到JDBC驅動的位置。必定要將其設置爲JAR自己,而不只僅是包含它的文件夾,例如:正則表達式
CLASSPATH=/u01/jdbc-drivers/mysql-connector-java-8.0.13.JAR ./bin/connect-distributed ./etc/kafka/connect-distributed.properties
兩個事情要注意一下:sql
kafka-connect-jdbc
JAR位於其它位置,則Kafka鏈接器的plugin.path
選項將沒法直接指向JDBC驅動JAR文件 。根據文檔,每一個JDBC驅動JAR必須與kafka-connect-jdbc
JAR位於同一目錄;找不到合適的驅動docker
與JDBC鏈接器有關的常見錯誤是No suitable driver found
,好比:數據庫
{"error_code":400,"message":"Connector configuration is invalid and contains the following 2 error(s):\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nInvalid value java.sql.SQLException: No suitable driver found for jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=admin for configuration Couldn't open connection to jdbc:mysql://X.X.X.X:3306/test_db?user=root&password=pwd\nYou can also find the above list of errors at the endpoint `/{connectorType}/config/validate`"}
這可能有2個緣由:apache
確認是否已加載JDBC驅動
Kafka鏈接器會加載與kafka-connect-jdbc
JAR文件在同一文件夾中的全部JDBC驅動,還有在CLASSPATH
上找到的任何JDBC驅動。若是要驗證一下,能夠將鏈接器工做節點的日誌級別調整爲DEBUG
,而後會看到以下信息:
1.DEBUG Loading plugin urls
:包含kafka-connect-jdbc-5.1.0.jar
(或者對應當前正在運行的版本號)的一組JAR文件:
DEBUG Loading plugin urls: [file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/audience-annotations-0.5.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/common-utils-5.1.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/jline-0.9.94.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/jtds-1.3.1.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/kafka-connect-jdbc-5.1.0.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/mysql-connector-java-8.0.13.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/netty-3.10.6.Final.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/postgresql-9.4-1206-jdbc41.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/slf4j-api-1.7.25.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/sqlite-jdbc-3.25.2.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/zkclient-0.10.jar, file:/Users/Robin/cp/confluent-5.1.0/share/java/kafka-connect-jdbc/zookeeper-3.4.13.jar] (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
在這個JAR列表中,應該有JDBC驅動JAR。在上面的輸出中,能夠看到MySQL、Postgres和SQLite的JAR。若是指望的JDBC驅動JAR不在,能夠將驅動放入kafka-connect-jdbc
JAR所在的文件夾中。
2.INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector'
:在此以後,在記錄任何其它插件以前,能夠看到JDBC驅動已註冊:
INFO Added plugin 'io.confluent.connect.jdbc.JdbcSourceConnector' (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) DEBUG Registered java.sql.Driver: jTDS 1.3.1 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) DEBUG Registered java.sql.Driver: com.mysql.cj.jdbc.Driver@7bbbb6a8 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) DEBUG Registered java.sql.Driver: org.postgresql.Driver@ea9e141 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader) DEBUG Registered java.sql.Driver: org.sqlite.JDBC@236134a1 to java.sql.DriverManager (org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader)
確認JDBC驅動包含在已註冊的列表中。若是沒有,那麼就是安裝不正確。
注意,雖然可能會在日誌的其它地方看到驅動的Registered java.sql.Driver
信息,但若是要確認其對於JDBC鏈接器可用,那麼它必須直接
出如今INFO Added plugin 'io.confluent.connect.jdbc
消息的後面。
JDBC URL
對於源數據庫來講JDBC URL必須是正確的,若是搞錯了,那麼Kafka鏈接器即便驅動正確,也是不行。如下是一些常見的JDBC URL格式:
數據庫 | 下載地址 | JDBC URL |
---|---|---|
IBM DB2 | 下載 | jdbc:db2://<host>:<port50000>/<database> |
IBM Informix | jdbc:informix-sqli://:/:informixserver=<debservername> |
|
MS SQL | 下載 | jdbc:sqlserver://<host>[:<port1433>];databaseName=<database> |
MySQL | 下載 | jdbc:mysql://<host>:<port3306>/<database> |
Oracle | 下載 | jdbc:oracle:thin://<host>:<port>/<service> or jdbc:oracle:thin:<host>:<port>:<SID> |
Postgres | Kafka鏈接器自帶 | jdbc:postgresql://<host>:<port5432>/<database> |
Amazon Redshift | 下載 | jdbc:redshift://<server>:<port5439>/<database> |
Snowflake | jdbc:snowflake://<account_name>.snowflakecomputing.com/?<connection_params> |
注意,雖然JDBC URL一般容許嵌入身份驗證信息,但這些內容將以明文形式記錄在Kafka鏈接器日誌中。所以應該使用單獨的connection.user
和connection.password
配置項,這樣在記錄時會被合理地處理。
JDBC驅動安裝完成以後,就能夠配置Kafka鏈接器從數據庫中提取數據了。下面是最小的配置,不過它不必定是最有用的,由於它是數據的批量導入,在本文後面會討論如何進行增量加載。
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-01-", "mode":"bulk" } }'
使用此配置,每一個表(用戶有權訪問)將徹底複製到Kafka,經過使用KSQL列出Kafka集羣上的主題,咱們能夠看到:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- mysql-01-accounts | false | 1 | 1 | 0 | 0 mysql-01-customers | false | 1 | 1 | 0 | 0 mysql-01-firewall | false | 1 | 1 | 0 | 0 mysql-01-log_events | false | 1 | 1 | 0 | 0 mysql-01-transactions | false | 1 | 1 | 0 | 0
注意mysql-01
前綴,表格內容的完整副本將每五秒刷新一次,能夠經過修改poll.interval.ms
進行調整,例如每小時刷新一次:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_02", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-02-", "mode":"bulk", "poll.interval.ms" : 3600000 } }'
找個主題確認一下,顯示完整的數據,看看是否是本身想要的:
ksql> PRINT 'mysql-02-accounts' FROM BEGINNING; Format:AVRO 12/20/18 3:18:44 PM UTC, null, {"id": 1, "first_name": "Hamel", "last_name": "Bly", "username": "Hamel Bly", "company": "Erdman-Halvorson", "created_date": 17759} 12/20/18 3:18:44 PM UTC, null, {"id": 2, "first_name": "Scottie", "last_name": "Geerdts", "username": "Scottie Geerdts", "company": "Mante Group", "created_date": 17692} 12/20/18 3:18:44 PM UTC, null, {"id": 3, "first_name": "Giana", "last_name": "Bryce", "username": "Giana Bryce", "company": "Wiza Inc", "created_date": 17627} 12/20/18 3:18:44 PM UTC, null, {"id": 4, "first_name": "Allen", "last_name": "Rengger", "username": "Allen Rengger", "company": "Terry, Jacobson and Daugherty", "created_date": 17746} 12/20/18 3:18:44 PM UTC, null, {"id": 5, "first_name": "Reagen", "last_name": "Volkes", "username": "Reagen Volkes", "company": "Feeney and Sons", "created_date": 17798} …
目前會展現全部可用的表,這可能不是實際的需求,可能只但願包含特定模式的表,這個可使用catalog.pattern/schema.pattern
(具體哪個取決於數據庫)配置項進行控制:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_03", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-03-", "mode":"bulk", "poll.interval.ms" : 3600000, "catalog.pattern" : "demo" } }'
這樣就只會從demo
模式中取得3張表:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- […] mysql-03-accounts | false | 1 | 1 | 0 | 0 mysql-03-customers | false | 1 | 1 | 0 | 0 mysql-03-transactions | false | 1 | 1 | 0 | 0 […]
也可使用table.whitelist
(白名單)或table.blacklist
(黑名單)來控制鏈接器提取的表,下面的示例顯式地列出了但願拉取到Kafka中的表清單:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_04", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-04-", "mode":"bulk", "poll.interval.ms" : 3600000, "catalog.pattern" : "demo", "table.whitelist" : "accounts" } }'
這時就只有一個表從數據庫流式傳輸到Kafka:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- mysql-04-accounts | false | 1 | 1 | 0 | 0
由於只有一個表,下面的配置:
"catalog.pattern" : "demo", "table.whitelist" : "accounts",
等同於:
"table.whitelist" : "demo.accounts",
也能夠在一個模式中指定多個表,好比:
"catalog.pattern" : "demo", "table.whitelist" : "accounts, customers",
或者也能夠跨越多個模式:
"table.whitelist" : "demo.accounts, security.firewall",
還可使用其它的表過濾選項,好比table.types
能夠選擇表以外的對象,例如視圖。
過濾表時要注意,由於若是最終沒有對象匹配該模式(或者鏈接到數據庫的已認證用戶沒有權限訪問),那麼鏈接器將報錯:
INFO After filtering the tables are: (io.confluent.connect.jdbc.source.TableMonitorThread) … ERROR Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder) java.lang.IllegalArgumentException: Number of groups must be positive
在經過table.whitelist/table.blacklist
進行過濾以前,能夠將日誌級別調整爲DEBUG
,查看用戶能夠訪問的表清單:
DEBUG Got the following tables: ["demo"."accounts", "demo"."customers"] (io.confluent.connect.jdbc.source.TableMonitorThread)
而後,鏈接器會根據提供的白名單/黑名單過濾此列表,所以要確認指定的列表位於鏈接器可用的列表中,還要注意鏈接用戶要有權限訪問這些表,所以還要檢查數據庫端的GRANT
語句。
到目前爲止,已經按計劃將整張表都拉取到Kafka,這雖然對於轉存數據很是有用,不過都是批量而且並不老是適合將源數據庫集成到Kafka流系統中。
JDBC鏈接器還有一個流式傳輸到Kafka的選項,它只會傳輸上次拉取後的數據變動,具體能夠基於自增列(例如自增主鍵)和/或時間戳(例如最後更新時間戳)來執行此操做。在模式設計中的常見作法是使用這些中的一個或兩個,例如,事務表ORDERS
可能有:
ORDER_ID
:一個惟一鍵(多是主鍵),每一個新訂單遞增;UPDATE_TS
:每次數據變動時更新的時間戳列。可使用mode
參數配置該選項,好比使用timestamp
:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_08", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-08-", "mode":"timestamp", "table.whitelist" : "demo.accounts", "timestamp.column.name": "UPDATE_TS", "validate.non.null": false } }'
下面會獲取表的所有數據,外加源數據後續的更新和插入:
注意:
時間戳+自增列
選項爲識別新行和更新行提供了最大的覆蓋範圍;CREATE TABLE foo ( … UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP );
CREATE TABLE foo ( … UPDATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP ); -- Courtesy of https://techblog.covermymeds.com/databases/on-update-timestamps-mysql-vs-postgres/ CREATE FUNCTION update_updated_at_column() RETURNS trigger LANGUAGE plpgsql AS $$ BEGIN NEW.update_ts = NOW(); RETURN NEW; END; $$; CREATE TRIGGER t1_updated_at_modtime BEFORE UPDATE ON foo FOR EACH ROW EXECUTE PROCEDURE update_updated_at_column();
CREATE TABLE foo ( … CREATE_TS TIMESTAMP DEFAULT CURRENT_TIMESTAMP , ); CREATE OR REPLACE TRIGGER TRG_foo_UPD BEFORE INSERT OR UPDATE ON foo REFERENCING NEW AS NEW_ROW FOR EACH ROW BEGIN SELECT SYSDATE INTO :NEW_ROW.UPDATE_TS FROM DUAL; END; /
有時可能想從RDBMS中提取數據,但但願有比整個表更靈活的方式,緣由可能包括:
這可使用JDBC鏈接器的query
模式。在瞭解如何實現以前,須要注意如下幾點:
query
模式可能不那麼靈活,所以從源中簡單地刪除列的另外一種方法(不管是簡單地減小數量,仍是由於敏感信息)都是在鏈接器自己中使用ReplaceField
單消息轉換;後處理
的絕佳方式,使管道盡量簡單。下面將展現如何將transactions
表,再加上customers
表中的數據流式傳輸到Kafka:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_09", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-09", "mode":"bulk", "query":"SELECT t.txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;", "poll.interval.ms" : 3600000 } }'
可能注意到已切換回bulk
模式,可使用主鍵或者時間戳其中一個增量選項,但要確保在SELECT子句中包含相應的主鍵/時間戳列(例如txn_id
):
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_10", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-10", "mode":"incrementing", "query":"SELECT txn_id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id", "incrementing.column.name": "txn_id", "validate.non.null": false } }'
若是不包括該列(即便它存在於源表中),那麼鏈接器會報錯並顯示org.apache.kafka.connect.errors.DataException
異常(#561)或java.lang.NullPointerException
異常(#560),這是由於鏈接器須要在返回的數據中獲取值,以即可以存儲相應偏移量的最新值。
若是使用query
選項,除非使用mode: bulk
(#566),不然沒法指定本身的WHERE子句,也就是說,在查詢中使用本身的謂詞和使用Kafka進行增量提取之間是互斥的。
若是須要不一樣的參數設定,能夠建立新的鏈接器,例如,可能但願有不一樣的參數:
簡單來講,若是全部表參數都同樣,則可使用單個鏈接器。
建立鏈接器以後,可能在目標Kafka主題中看不到任何數據。下面會一步步進行診斷:
1.查詢/connectors
端點,可確認鏈接器是否建立成功:
$ curl -s「http:// localhost:8083 / connectors」 [ 「jdbc_source_mysql_10」]
應該看到鏈接器列表,若是沒有,則須要按照以前的步驟進行建立,而後關注Kafka鏈接器返回的任何錯誤。
2.檢查鏈接器及其任務的狀態:
$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_10/status"|jq '.' { "name": "jdbc_source_mysql_10", "connector": { "state": "RUNNING", "worker_id": "kafka-connect:8083" }, "tasks": [ { "state": "RUNNING", "id": 0, "worker_id": "kafka-connect:8083" } ], "type": "source" }
正常應該看到全部的鏈接器和任務的state
都是RUNNING
,不過RUNNING
不老是意味着正常。
3.若是鏈接器或任務的狀態是FAILED
,或者即便狀態是RUNNING
可是沒有按照預期行爲運行,那麼能夠轉到Kafka鏈接器工做節點的輸出(這裏有相關的說明),這裏會顯示是否存在任何實際的問題。以上面的鏈接器爲例,其狀態爲RUNNING
,可是鏈接器工做節點日誌中實際上全是重複的錯誤:
ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;', topicPrefix='mysql-10', incrementingColumn='t.id', timestampColumns=[]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask) java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE `t.id` > -1 ORDER BY `t.id` ASC' at line 1 at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:120)
4.在這裏,問題是什麼並不明確,須要調出鏈接器的配置來檢查指定的查詢是否正確:
$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_10/config"|jq '.' { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "mode": "incrementing", "incrementing.column.name": "t.id", "topic.prefix": "mysql-10", "connection.password": "asgard", "validate.non.null": "false", "connection.user": "connect_user", "query": "SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;", "name": "jdbc_source_mysql_10", "connection.url": "jdbc:mysql://mysql:3306/demo" }
5.在MySQL中運行此查詢發現能正常執行:
mysql> SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id; +------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------+ | id | customer_id | amount | currency | txn_timestamp | first_name | last_name | email | gender | comments | +------+-------------+--------+----------+----------------------+------------+-----------+----------------------------+--------+------------------------------------------------------+ | 1 | 5 | -72.97 | RUB | 2018-12-12T13:58:37Z | Modestia | Coltart | mcoltart4@scribd.com | Female | Reverse-engineered non-volatile success
6.因此確定是Kafka鏈接器在執行時作了什麼。鑑於錯誤消息引用t.id
,這是在incrementing.column.name
參數中指定的,可能問題與此有關。經過將Kafka鏈接器的日誌級別調整爲DEBUG
,能夠看到執行的完整SQL語句:
DEBUG TimestampIncrementingTableQuerier{table=null, query='SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id;', topicPrefix='mysql-10', incrementingColumn='t.id', timestampColumns=[]} prepared SQL query: SELECT t.id, t.customer_id, t.amount, t.currency, t.txn_timestamp, c.first_name, c.last_name, c.email, c.gender, c.comments FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id; WHERE `t.id` > ? ORDER BY `t.id` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier)
7.看一下該prepared SQL query
部分,可能會發現:
[…] FROM demo.transactions t LEFT OUTER JOIN demo.customers c on t.customer_id = c.id; WHERE `t.id` > ? ORDER BY `t.id` ASC
8.注意在JOIN
子句的c.id
後面有語句終止符(;),後面有WHERE子句。該WHERE
子句由Kafka鏈接器附加,用於實現所要求的incrementing
模式,但建立了一個無效的SQL語句; 9.而後在GitHub中查找與看到的錯誤相關的問題,由於有時它其實是一個已知的問題,例如這個問題; 10.若是鏈接器存在而且是RUNNING
,而且Kafka鏈接器工做節點日誌中也沒有錯誤,還應該檢查:
- 鏈接器的提取間隔是多少?也許它徹底按照配置運行,而且源表中的數據已經更改,但就是沒有拉取到新數據。要檢查這一點,能夠在Kafka鏈接器工做節點的輸出中查找`JdbcSourceTaskConfig`的值和`poll.interval.ms`的值; - 若是正在使用的是增量攝取,Kafka鏈接器關於偏移量是如何存儲的?若是刪除並重建相同名稱的鏈接器,則將保留前一個實例的偏移量。考慮這樣的場景,建立完鏈接器以後,成功地將全部數據提取到源表中的給定主鍵或時間戳值,而後刪除並從新建立了它,新版本的鏈接器將得到以前版本的偏移量,所以僅提取比先前處理的數據更新的數據,具體能夠經過查看保存在其中的`offset.storage.topic`值和相關表來驗證這一點。
當Kafka鏈接器以分佈式模式運行時,它會在Kafka主題(經過offset.storage.topic
配置)中存儲有關它在源系統中讀取的位置(稱爲偏移量)的信息,當鏈接器任務重啓時,它能夠從以前的位置繼續進行處理,具體能夠在鏈接器工做節點日誌中看到:
INFO Found offset {{protocol=1, table=demo.accounts}={timestamp_nanos=0, timestamp=1547030056000}, {table=accounts}=null} for partition {protocol=1, table=demo.accounts} (io.confluent.connect.jdbc.source.JdbcSourceTask)
每次鏈接器輪詢時,都會使用這個偏移量,它會使用預編譯的SQL語句,而且使用Kafka鏈接器任務傳遞的值替換?
佔位符:
DEBUG TimestampIncrementingTableQuerier{table="demo"."accounts", query='null', topicPrefix='mysql-08-', incrementingColumn='', timestampColumns=[UPDATE_TS]} prepared SQL query: SELECT * FROM `demo`.`accounts` WHERE `demo`.`accounts`.`UPDATE_TS` > ? AND `demo`.`accounts`.`UPDATE_TS` < ? ORDER BY `demo`.`accounts`.`UPDATE_TS` ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier) DEBUG Executing prepared statement with timestamp value = 2019-01-09 10:34:16.000 end time = 2019-01-09 13:23:40.000 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)
這裏,第一個時間戳值就是存儲的偏移量,第二個時間戳值是當前時間戳。
雖然沒有文檔記載,但能夠手動更改鏈接器使用的偏移量,由於是在JDBC源鏈接器的上下文中,因此能夠跨多個源鏈接器類型,這意味着更改時間戳或主鍵,鏈接器會將後續記錄視爲未處理的狀態。
首先要作的是確保Kafka鏈接器已經刷新了週期性的偏移量,能夠在工做節點日誌中看到什麼時候執行此操做:
INFO WorkerSourceTask{id=jdbc_source_mysql_08-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
看下Kafka的主題,能夠看到Kafka鏈接器建立的內部主題,而且負責偏移量的主題也是其中之一,名字可能有所不一樣:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- docker-connect-configs | false | 1 | 1 | 0 | 0 docker-connect-offsets | false | 1 | 1 | 0 | 0 docker-connect-status | false | 5 | 1 | 0 | 0 ksql> PRINT 'docker-connect-offsets' FROM BEGINNING; Format:JSON {"ROWTIME":1547038346644,"ROWKEY":"[\"jdbc_source_mysql_08\",{\"protocol\":\"1\",\"table\":\"demo.customers\"}]","timestamp_nanos":0,"timestamp":1547030057000}
當Kafka鏈接器任務啓動時,它會讀取此主題並使用適當主鍵的最新值。要更改偏移量,只需插入一個新值便可。最簡單的方法是轉存當前主題內容,修改內容並從新執行,由於一致性和簡單,能夠考慮使用kafkacat:
$ kafkacat -b kafka:29092 -t docker-connect-offsets -C -K# -o-1 % Reached end of topic docker-connect-offsets [0] at offset 0 ["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547030056000} );
若是是多個鏈接器,可能複雜些,可是這裏只有一個,因此使用了-o-1
標誌,它定義了返回的偏移量。
mode=timestamp
來監測表中的變化。時間戳值是1547030056000
,使用相關的時間戳轉換之類的工具,能夠很容易地轉換和操做,好比將其提早一小時(1547026456000
)。接下來,使用更新後的timestamp
值準備新消息:["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547026456000}
echo '["jdbc_source_mysql_08",{"protocol":"1","table":"demo.accounts"}]#{"timestamp_nanos":0,"timestamp":1547026456000}' | \ kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#
NULL
消息值:echo'[「jdbc_source_mysql_08」,{「protocol」:「1」,「table」:「demo.accounts」}]#'| \ kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#
curl -i -X POST -H "Accept:application/json" \ -H "Content-Type:application/json" http://localhost:8083/connectors/jdbc_source_mysql_08/tasks/0/restart
從指定的時間戳或者主鍵處開啓表的捕獲
當使用時間戳或自增主鍵模式建立JDBC源鏈接器時,它會從主鍵爲-1
和/或時間戳爲1970-01-01 00:00:00.00
開始,這意味着會得到表的所有內容,而後在後續的輪詢中獲取任何插入/更新的數據。
可是若是不想要表的完整副本,只是但願鏈接器從如今開始,該怎麼辦呢?這在目前的Kafka鏈接器中還不支持,但可使用前述的方法。不須要獲取現有的偏移量消息並對其進行定製,而是本身建立。消息的格式依賴於正在使用的鏈接器和表的名稱,一種作法是先建立鏈接器,肯定格式,而後刪除鏈接器,另外一種作法是使用具備相同源表名和結構的環境,除非在該環境中沒有可供鏈接器提取的數據,不然一樣也能獲得所需的消息格式。
在建立鏈接器以前,使用適當的值配置偏移量主題。在這裏,但願從demo.transactions
表中提取自增主鍵大於42的全部行:
echo '["jdbc_source_mysql_20",{"protocol":"1","table":"demo.transactions"}]#{"incrementing":42}' | \ kafkacat -b kafka:29092 -t docker-connect-offsets -P -Z -K#
下面建立鏈接器:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_20", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-20-", "mode":"incrementing", "table.whitelist" : "demo.transactions", "incrementing.column.name": "txn_id", "validate.non.null": false } }'
在生成的Kafka鏈接器工做日誌中,能夠看到:
INFO Found offset {{protocol=1, table=demo.transactions}={incrementing=42}, {table=transactions}=null} for partition {protocol=1, table=demo.transactions} (io.confluent.connect.jdbc.source.JdbcSourceTask) … DEBUG Executing prepared statement with incrementing value = 42 (io.confluent.connect.jdbc.source.TimestampIncrementingCriteria)
和預期同樣,Kafka主題中只注入了txn_id
大於42的行:
ksql> PRINT 'mysql-20x-transactions' FROM BEGINNING; Format:AVRO 1/9/19 1:44:07 PM UTC, null, {"txn_id": 43, "customer_id": 3, "amount": {"bytes": "ús"}, "currency": "CNY", "txn_timestamp": "2018-12-15T08:23:24Z"} 1/9/19 1:44:07 PM UTC, null, {"txn_id": 44, "customer_id": 5, "amount": {"bytes": "\f!"}, "currency": "CZK", "txn_timestamp": "2018-10-04T13:10:17Z"} 1/9/19 1:44:07 PM UTC, null, {"txn_id": 45, "customer_id": 3, "amount": {"bytes": "çò"}, "currency": "USD", "txn_timestamp": "2018-04-03T03:40:49Z"}
Kafka消息是鍵/值對,其中值是有效內容
。在JDBC鏈接器的上下文中,值是要被提取的錶行的內容。Kafka消息中的鍵對於分區和下游處理很是重要,其中任何關聯(好比KSQL)都將在數據中完成。
JDBC鏈接器默認不設置消息鍵,可是使用Kafka鏈接器的單消息轉換(SMT)機制能夠輕鬆實現。假設想要提取accounts
表並將其ID
列用做消息鍵。只需簡單地將其添加到下面的配置中便可:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_06", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-06-", "poll.interval.ms" : 3600000, "table.whitelist" : "demo.accounts", "mode":"bulk", "transforms":"createKey,extractInt", "transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey", "transforms.createKey.fields":"id", "transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key", "transforms.extractInt.field":"id" } }'
這時若是使用諸如kafka-avro-console-consumer
之類的工具檢查數據,就會看到鍵(JSON內容以前的最左列)與id
值匹配:
kafka-avro-console-consumer \ --bootstrap-server kafka:29092 \ --property schema.registry.url=http://schema-registry:8081 \ --topic mysql-06-accounts --from-beginning --property print.key=true 1 {"id":{"int":1},"first_name":{"string":"Hamel"},"last_name":{"string":"Bly"},"username":{"string":"Hamel Bly"},"company":{"string":"Erdman-Halvorson"},"created_date":{"int":17759}} 2 {"id":{"int":2},"first_name":{"string":"Scottie"},"last_name":{"string":"Geerdts"},"username":{"string":"Scottie Geerdts"},"company":{"string":"Mante Group"},"created_date":{"int":17692}}
若是要在數據中設置鍵以便與KSQL一塊兒使用,則須要將其建立爲字符串類型,由於KSQL目前不支持其它鍵類型,具體能夠在鏈接器配置中添加以下內容:
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
而後就能夠在KSQL中使用了:
ksql> CREATE STREAM ACCOUNTS WITH (KAFKA_TOPIC='mysql-06X-accounts', VALUE_FORMAT='AVRO'); ksql> SELECT ROWKEY, ID, FIRST_NAME + ' ' + LAST_NAME FROM ACCOUNTS; 1 | 1 | Hamel Bly 2 | 2 | Scottie Geerdts 3 | 3 | Giana Bryce
JDBC鏈接器要求指定topic.prefix
,但若是不想要,或者想將主題名更改成其它模式,SMT能夠實現。
假設要刪除mysql-07-
前綴,那麼須要一點正則表達式的技巧:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_07", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-07-", "poll.interval.ms" : 3600000, "catalog.pattern" : "demo", "table.whitelist" : "accounts", "mode":"bulk", "transforms":"dropTopicPrefix", "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter", "transforms.dropTopicPrefix.regex":"mysql-07-(.*)", "transforms.dropTopicPrefix.replacement":"$1" } }'
這樣主題名就和表名一致了:
ksql> LIST TOPICS; Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups ---------------------------------------------------------------------------------------------------- accounts | false | 1 | 1 | 0 | 0
這是話題比較深刻。
numeric.mapping
: best_fit
若是源中包含NUMERIC/NUMBER
類型的數據,則可能須要這個配置項;query
選項,用於對源表中的數據進行轉換;DECIMAL
類型暴露,則numeric.mapping
沒法處理:
DECIMAL
;DECIMAL
和NUMERIC
原生存儲,所以必須將DECIMAL
字段轉換爲NUMERIC
;NUMBER
字段中指定長度和標度,例如NUMBER(5,0)
,不能是NUMBER
;NUMERIC
和DECIMAL
都被視爲NUMBER,INT
也是;完成以後,下面會作一個解釋:
Kafka鏈接器是一個能夠將數據注入Kafka、與特定源技術無關的框架。不管是來自SQL Server、DB二、MQTT、文本文件、REST仍是Kafka鏈接器支持的任何其它數十種來源,它發送給Kafka的數據格式都爲Avro
或JSON
,這一般是一個透明的過程,只是在處理數值數據類型時有些特別,好比DECIMAL
,NUMBER
等等,如下面的MySQL查詢爲例:
mysql> SELECT * FROM transactions LIMIT 1; +--------+-------------+--------+----------+----------------------+ | txn_id | customer_id | amount | currency | txn_timestamp | +--------+-------------+--------+----------+----------------------+ | 1 | 5 | -72.97 | RUB | 2018-12-12T13:58:37Z | +--------+-------------+--------+----------+----------------------+
挺正常是吧?其實,amount
列是DECIMAL(5,2)
:
mysql> describe transactions; +---------------+--------------+------+-----+---------+-------+ | Field | Type | Null | Key | Default | Extra | +---------------+--------------+------+-----+---------+-------+ | txn_id | int(11) | YES | | NULL | | | customer_id | int(11) | YES | | NULL | | | amount | decimal(5,2) | YES | | NULL | | | currency | varchar(50) | YES | | NULL | | | txn_timestamp | varchar(50) | YES | | NULL | | +---------------+--------------+------+-----+---------+-------+ 5 rows in set (0.00 sec)
可是當使用JDBC鏈接器的默認設置提取到Kafka中時,最終會是這樣:
ksql> PRINT 'mysql-02-transactions' FROM BEGINNING; Format:AVRO 1/4/19 5:38:45 PM UTC, null, {"txn_id": 1, "customer_id": 5, "amount": {"bytes": "ã\u007F"}, "currency": "RUB", "txn_timestamp": "2018-12-12T13:58:37Z"}
DECIMAL
變成了一個看似亂碼的bytes
值,鏈接器默認會使用本身的DECIMAL
邏輯類型,該類型在Avro中被序列化爲字節,能夠經過查看Confluent Schema Registry中的相關條目來看到這一點:
$ curl -s "http://localhost:8081/subjects/mysql-02-transactions-value/versions/1"|jq '.schema|fromjson.fields[] | select (.name == "amount")' { "name": "amount", "type": [ "null", { "type": "bytes", "scale": 2, "precision": 64, "connect.version": 1, "connect.parameters": { "scale": "2" }, "connect.name": "org.apache.kafka.connect.data.Decimal", "logicalType": "decimal" } ], "default": null }
當鏈接器使用AvroConverter
消費時,這會正常處理並保存爲DECIMAL
(而且在Java中也能夠反序列化爲BigDecimal
),但對於反序列化Avro的其它消費者,它們只會獲得字節。在使用啓用了模式的JSON時,也會看到這一點,amount
值會是Base64編碼的字節字符串:
{ "schema": { "type": "struct", "fields": [ { "type": "bytes", "optional": true, "name": "org.apache.kafka.connect.data.Decimal", "version": 1, "parameters": { "scale": "2" }, "field": "amount" }, }, "payload": { "txn_id": 1000, "customer_id": 5, "amount": "Cv8=" } }
所以,無論使用的是JSON仍是Avro,這都是numeric.mapping配置項的來源。它默認設置爲none
(即便用鏈接器的DECIMAL
類型),但一般但願鏈接器將類型實際轉換爲更兼容的類型,以適合數字的精度,更具體的說明,能夠參見相關的文檔。
此選項目前不支持DECIMAL
類型,所以這裏是在Postgres中具備NUMERIC
類型的相同原理的示例:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_postgres_12", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:postgresql://postgres:5432/postgres", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "postgres-12-", "numeric.mapping": "best_fit", "table.whitelist" : "demo.transactions", "mode":"bulk", "poll.interval.ms" : 3600000 } }'
結果以下所示:
ksql> PRINT 'postgres-12-transactions' FROM BEGINNING; Format:AVRO 1/7/19 6:27:16 PM UTC, null, {"txn_id": 1, "customer_id": 5, "amount": -72.97, "currency": "RUB", "txn_timestamp": "2018-12-12T13:58:37Z"}
能夠在這裏看到有關此內容的更多詳細信息,以及Postgres、Oracle和MS SQL Server中的示例。
若是須要從多個表中提取數據,則能夠經過並行處理來減小總提取時間,這在Kafka的JDBC鏈接器有兩種方法:
前者具備更高的管理開銷,但確實提供了每一個表自定義設置的靈活性。若是可使用相同的鏈接器配置提取全部表,則增長單個鏈接器中的任務數是一種好方法。
當增長從數據庫中提取數據的併發性時,要從總體上考慮。由於運行一百個併發任務雖然可能會更快,但數百個與數據庫的鏈接可能會對數據庫產生負面影響。
如下是同一鏈接器的兩個示例。二者都將從數據庫中提取全部表,總共6個。在第一個鏈接器中,未指定最大任務數,所以爲默認值1。在第二個中,指定了最多運行三個任務("tasks.max":3
):
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_01", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-01-", "mode":"bulk" } }'
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{ "name": "jdbc_source_mysql_11", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector", "connection.url": "jdbc:mysql://mysql:3306/demo", "connection.user": "connect_user", "connection.password": "asgard", "topic.prefix": "mysql-11-", "mode":"bulk", "tasks.max":3 } }'
當查詢鏈接器的Kafka鏈接器RESTAPI時,能夠看到每一個鏈接器正在運行的任務數以及它們已分配的表。第一個鏈接器有一個任務負責全部6張表:
$ curl -s "http://localhost:8083/connectors/jdbc_source_mysql_01/tasks"|jq '.' [ { "id": { "connector": "jdbc_source_mysql_01", "task": 0 }, "config": { "tables": "`demo`.`NUM_TEST`,`demo`.`accounts`,`demo`.`customers`,`demo`.`transactions`,`security`.`firewall`,`security`.`log_events`", … } } ]
第二個鏈接器有3個任務,每一個任務分配2張表:
$ curl -s「http:// localhost:8083 / connectors / jdbc_source_mysql_11 / tasks」| jq'。' [ { 「ID」: { 「connector」:「jdbc_source_mysql_11」,「任務」:0 }, 「config」:{ 「tables」:「`demo` .NUM_TEST`,`demo` .accounts`」, ... } }, { 「ID」: { 「connector」:「jdbc_source_mysql_11」,「任務」:1 }, 「config」:{ 「tables」:「`demo``customers`,`demo` .transactions`」, ... } }, { 「ID」: { 「connector」:「jdbc_source_mysql_11」,「任務」:2 }, 「config」:{ 「tables」:「`security``firewall`,`security``log_events`」, ... } } ]