摘要:Flink 1.11 引入了 CDC,在此基礎上, JDBC Connector 也發生比較大的變化,本文由 Apache Flink Contributor,阿里巴巴高級開發工程師徐榜江(雪盡)分享,主要介紹 Flink 1.11 JDBC Connector 的最佳實踐。大綱以下:javascript
JDBC connectorhtml
JDBC Catalogjava
JDBC Dialectmysql
Demogit
Tips:點擊下方連接可查看做者原版 PPT 及分享視頻:
https://flink-learning.org.cn/developers/flink-training-course3/github
JDBC-Connector 的重構web
JDBC-Connector 的重構web
FLINK-15782 :Rework JDBC Sinks[1] (重寫 JDBC Sink)sql
FLINK-17537:Refactor flink-jdbc connector structure[2] (重構 flink-jdbc 鏈接器的結構)數據庫
FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)apache
FLIP-122:New Connector Property Keys for New Factory[4](新的鏈接器參數)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主鍵約束問題)
JDBC Catalog
JDBC Catalog
// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)
JDBC Dialect
JDBC Dialect
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping
|
|
|
|
|
|
實踐 Demo
實踐 Demo
-
Flink standalone 環境準備並在提供的地址下載好對應的安裝包和 connector jar。 -
測試數據準備,經過拉起容器運行已經打包好的鏡像。其中 Kafka 中的 changelog 數據是經過 debezium connector 抓取的 MySQL orders表 的 binlog。 -
經過 SQL Client 編寫 SQL 做業,分別建立 Flink 訂單表,維表,用戶表,產品表,並建立 Function UDF。從 PG Catalog 獲取結果表信息以後,把做業提交至集羣執行運行。 -
測試 CDC 數據同步和維表 join,經過新增訂單、修改訂單、刪除訂單、維表數據更新等一系列操做驗證 CDC 在 Flink 上如何運行以及寫入結果表。
https://github.com/leonardBang/flink-sql-etl
問答環節
問答環節
https://issues.apache.org/jira/browse/FLINK-16681
總結
總結
參考連接:
Flink Forward Asia 2020 議題徵集中
(點擊可瞭解更多議題投遞詳情)
本文分享自微信公衆號 - Flink 中文社區(gh_5efd76d10a8d)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。