Flink JDBC Connector:Flink 與數據庫集成最佳實踐

整理:陳政羽(Flink 社區志願者)

摘要:Flink 1.11 引入了 CDC,在此基礎上, JDBC Connector 也發生比較大的變化,本文由 Apache Flink Contributor,阿里巴巴高級開發工程師徐榜江(雪盡)分享,主要介紹 Flink 1.11 JDBC Connector 的最佳實踐。大綱以下:javascript


  1. JDBC connectorhtml

  2. JDBC Catalogjava

  3. JDBC Dialectmysql

  4. Demogit


Tips:點擊下方連接可查看做者原版 PPT 及分享視頻:
https://flink-learning.org.cn/developers/flink-training-course3/
github

 

JDBC-Connector 的重構web


JDBC Connector 在 Flink 1.11 版本發生了比較大的變化,咱們先從如下幾個 Feature 來具體瞭解一下 Flink 社區在這個版本上對 JDBC 所作的改進。

  • FLINK-15782 :Rework JDBC Sinks[1] (重寫 JDBC Sink)sql


這個 issue 主要爲 DataStream API 新增了 JdbcSink,對於使用 DataStream 編程的用戶會更加方便地把數據寫入到 JDBC;而且規範了一些命名規則,之前命名使用的是 JDBC 加上鍊接器名稱,目前命名規範爲 Jdbc+ 鏈接器名稱

  • FLINK-17537:Refactor flink-jdbc connector structure[2] (重構 flink-jdbc 鏈接器的結構)數據庫


這個 issue 將 flink-jdbc 包名重命名爲 flink-connector-jdbc,與 Flink 的其餘 connector 統一,將全部接口和類從 org.apache.flink.java.io.jdbc(舊包)規範爲新包路徑 org.apache.flink.connector.jdbc(新包),經過這種重命名用戶在對底層源代碼的閱讀上面會更加容易的理解和統一。

  • FLIP-95: New TableSource and TableSink interfaces[3] (新的 TableSource 和 TableSink 接口)apache


因爲早期數據類型系統並非很完善,致使了比較多的 Connector 在使用上會常常報數據類型相關的異常,例如 DECIMAL 精度類型,在以往的 Flink 1.10 版本中有可能出現下圖問題:


基於 FLIP-95 新的 TableSource 和 TableSink 在精度支持方面作了重構,目前數據精度方面的支持已經很完善了。

  • FLIP-122:New Connector Property Keys for New Factory[4](新的鏈接器參數)


在 Flink 1.11 版本中,咱們對 DDL 的 WITH 參數相對於 1.10 版本作了簡化,從用戶視角看上就是簡化和規範了參數,如表格所示:

Old Key (Flink 1.10)
New Key (Flink 1.11)
connector.type
connector.type
connector.url
url
connector.table
table-name
connector.driver
driver
connector.username
username
connector.password
password
connector.read.partition.column
scan.partition.column
connector.read.partition.num
scan.partition.num
connector.read.partition.lower-bound
scan.partition.lower-bound
connector.read.partition.upper-bound
scan.partition.upper-bound
connector.read.fetch-size
scan.fetch-size
connector.lookup.cache.max-rows
lookup.cache.max-rows
connector.lookup.cache.ttl
lookup.cache.ttl
connector.lookup.max-retries
lookup.max-retries
connector.write.flush.max-rows
sink.buffer-flush.max-rows
connector.write.flush.interval
sink.buffer-flush.interval
connector.write.max-retries
sink.max-retries

你們能夠看到表格中有 3 個標紅的地方,這個是相對於 1.10 有發生變化比較多的地方。此次 FLIP 但願進一步簡化鏈接器屬性,以便使屬性更加簡潔和可讀,並更好地與 FLIP-107 協做。若是須要了解更多的 Connector 參數能夠進一步參考官方文檔和 FLIP-122 中提到的改變,這樣有助於從舊版本遷移到新版本並瞭解參數的變化。

  • FLIP-87:Primary key Constraints in Table API[5] (Table API 接口中的主鍵約束問題)


Flink 1.10 存在某些 Query 沒法推斷出主鍵致使沒法進行 Upsert 更新操做(以下圖所示錯誤)。因此在 FLIP-87 中爲 Flink SQL 引入的 Primary Key 約束。Flink 的主鍵約束遵循 SQL 標準,主鍵約束分爲 PRIMARY KEY NOT ENFORCED 和 PRIMARY KEY ENFORCED, ENFORCED 表示是否對數據進行校驗。咱們常見數據庫的主鍵約束屬於 PRIMARY KEY ENFORCED,會對數據進行校驗。由於 Flink 並不持有數據,所以 Flink 支持的主鍵模式是 PRIMARY KEY NOT ENFORCED,  這意味着 Flink 不會校驗數據,而是由用戶確保主鍵的完整性。例如 HBase 裏面對應的主鍵應該是 RowKey,在 MySQL 中對應的主鍵是在用戶數據庫的表中對應的主鍵。


JDBC Catalog


目前 Flink 支持 Catalog 主要有 JDBC Catalog 和 Hive Catalog 。在關係數據庫中的表,若是要在 Flink 中使用,用戶須要手動寫表的 DDL,一旦表的 Schema 發生改變,用戶須要手動修改, 這是比較繁瑣的事情。JDBC Catalog 提供了接口用於鏈接到各類關係型數據庫,使得 Flink 可以自動檢索表,不用用戶手動輸入和修改。目前 JDBC Catalog 內置目前實現了 Postgres Catalog。Postgres catalog 是一個 read-only (只讀)的 Catalog,只支持讀取 Postgres 表,支持的功能比較有限。下面代碼展現了目前 Postgres catalog 支持的 6 個功能:數據庫是否存在、數據庫列表、獲取數據庫、根據數據庫名獲取表列表、得到表、表是否存在。


// The supported methods by Postgres Catalog.PostgresCatalog.databaseExists(String databaseName)PostgresCatalog.listDatabases()PostgresCatalog.getDatabase(String databaseName)PostgresCatalog.listTables(String databaseName)PostgresCatalog.getTable(ObjectPath tablePath)PostgresCatalog.tableExists(ObjectPath tablePath)

若是須要支持其餘 DB (如 MySQL),須要用戶根據 FLIP-93 的 JdbcCatalog 接口實現對應不一樣的 JDBC Catalog。

JDBC Dialect


什麼是 Dialect?

Dialect (方言)對各個數據庫來講,Dialect 體現各個數據庫的特性,好比語法、數據類型等。若是須要查看詳細的差別,能夠點擊這裏[6]查看詳細差別。下面經過對比 MySQL 和 Postgres 的一些常見場景舉例:

Dialect
MySQL
Postgres
場景描述
Grammar(語法)
LIMIT 0,30
WITH LIMIT 30 OFFSET 0
分頁
Data Type (數據類型)
BINARY
BYTEA,ARRAY
字段類型
Command (命令)
show tables
\dt
查看全部表

在數據類型上面,Flink SQL 的數據類型目前映射規則以下:

MySQL type
PostgreSQL type
Flink SQL type
TINYINT

TINYINT
SMALLINT
TINYINT UNSIGNED
SMALLINT
INT2
SMALLSERIAL
SERIAL2
SMALLINT
INT
MEDIUMINT
SMALLINT
UNSIGNED
INTEGER
SERIAL
INT
BIGINT
INT
UNSIGNED
BIGINT
BIGSERIAL
BIGINT
BIGINT
UNSIGNED

DECIMAL(20, 0)

Flink 目前支持三種 Dialect: Derby、MySQL、PostgreSQL,Derby 主要用於測試,更多的類型映射能夠點擊下方連接前往官方文檔查看。

https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/connectors/jdbc.html#data-type-mapping


如何保證 Dialect Upsert 的冪等性?

若是定義了主鍵,JDBC 寫入時是可以保證 Upsert 語義的, 若是 DB 不支持 Upsert 語法,則會退化成 DELETE + INSERT 語義。Upsert query 是原子執行的,能夠保證冪等性。這個在官方文檔中也詳細描述了更新失敗或者存在故障時候如何作出的處理,下面的表格是不一樣的 DB 對應不一樣的 Upsert 語法:

Database
Upsert Grammar
MySQL
INSERT .. ON DUPLICATE KEY UPDATE ..
PostgreSQL
INSERT .. ON CONFLICT .. DO UPDATE SET ..

如何自定義 Dialect?

目前若是要實現自定義 Dialect (好比 SQL Server、Oracle 等), 須要用戶本身實現對應 Dialect 修改源碼並從新打包 flink-connector-jdbc。社區正在討論提供一種插件化 dialect 的機制, 讓用戶能夠不用修改源碼打包就能實現自定義 Dialect,這個機制須要把 Dialect 接口暴露給用戶。目前的 Dialect 接口不夠清晰,沒有考慮 DataStream API 的使用場景,也沒有考慮到 一些複雜的 SQL 場景,因此這個接口目前不太穩定(後續版本會修改) 。

社區目前之因此沒有把這個 API 開放給用戶,是從用戶使用的體驗角度考慮,但願把這種頂級 API 設計得儘可能穩定、簡潔後再開放出來給用戶使用,避免用戶在後續 Flink 版本的迭代中屢次修改代碼。目前社區已經有相應的計劃去作了,你們能夠留意 FLINK-16833[7] 提出的 JDBCDialect 插件化設計。

實踐 Demo


你們看完上述 Flink 1.11 在 JDBC 所作的改動後,你們能夠嘗試下面這個關於商品表 CDC 同步和 ETL 的小案例,有助於理解 JDBC Catalog 和 CDC 的同步機制。

環境與版本:Flink 1.11.一、Docker、Kafka 1.11.一、MySQL Driver 5.1.4八、PostgreSQL Driver 42.2.14

流程以下:

  1. Flink standalone 環境準備並在提供的地址下載好對應的安裝包和 connector jar。
  2. 測試數據準備,經過拉起容器運行已經打包好的鏡像。其中 Kafka 中的 changelog 數據是經過 debezium connector 抓取的 MySQL orders表 的 binlog。
  3. 經過 SQL Client 編寫 SQL 做業,分別建立 Flink 訂單表,維表,用戶表,產品表,並建立 Function UDF。從 PG Catalog 獲取結果表信息以後,把做業提交至集羣執行運行。
  4. 測試 CDC 數據同步和維表 join,經過新增訂單、修改訂單、刪除訂單、維表數據更新等一系列操做驗證 CDC 在 Flink 上如何運行以及寫入結果表。


上圖爲業務流程總體圖,項目 Demo 地址:

https://github.com/leonardBang/flink-sql-etl


問答環節


1.Flink SQL Client 上面執行的 use default,是使用哪一個 catlog 呢?

答:Flink 內部有一個內置 Catlog,它把 meta 數據存於內存中。在 SQL Client 上沒有顯式指定 Hive catlog 或者 jdbc catlog 時會使用內置的 Catalog,剛剛的案例給你們演示的是 Postgres Catalog,裏面有結果表。在內置 Catlog 能夠看到咱們剛剛建立 Kafka 的表,MySQL 的維度表。

2.Flink MySQL DDL 鏈接 8 小時後就會自動斷開的問題是否已經解決?

這個問題會在 1.12 版本解決此問題,目前 master 分支已經合併,具體能夠參考如下地址 ,描述了相關問題的討論和解決辦法。

https://issues.apache.org/jira/browse/FLINK-16681 


3.什麼是 CDC?能大概講下目前 Flink 支持的 CDC 嗎?

經過 Change Data Capture 機制(CDC)來將外部系統的動態數據(如 Mysql BinLog、Kafka Compacted Topic)導入 Flink,以及將 Flink 的 Update/Retract 流寫出到外部系統中是用戶一直但願的功能。

Flink 1.11 實現了對 CDC 數據讀取和寫出的支持。目前 Flink 能夠支持 Debezium(Demo 中所用的工具) 和 Canal(阿里巴巴開源同步工具) 兩種 CDC 格式。Debezium 在國外用得比較多,Canal 在國內用得比較多,二者格式會有所區別,詳細能夠參考官方使用文檔。


總結


本文從 JDBC Connector 的重構、數據精度、主鍵約束、命名規範等方面詳細介紹,分享了社區目前實現的 Postgres Catalog 功能點;介紹了 Flink 如何實現 JDBC Dialect 的統一以及目前社區對 Dialect 作的一些計劃;最後的實踐 Demo 環節演示了經過 SQL Client 進行維表 JOIN 和 ETL 操做以及解答了你們在實際生產中所遇到的問題,但願對你們進一步瞭解 Flink CDC 新功能有所幫助。

參考連接:


[1]https://issues.apache.org/jira/browse/FLINK-15782
[2]https://issues.apache.org/jira/browse/FLINK-17537
[3]https://cwiki.apache.org/confluence/display/FLINK/FLIP-95%3A+New+TableSource+and+TableSink+interfaces
[4]https://cwiki.apache.org/confluence/display/FLINK/FLIP-122%3A+New+Connector+Property+Keys+for+New+Factory
[5]https://cwiki.apache.org/confluence/display/FLINK/FLIP+87%3A+Primary+key+constraints+in+Table+API
[6]https://www.postgresqltutorial.com/postgresql-vs-mysql/
[7]https://issues.apache.org/jira/browse/FLINK-16833



  Flink Forward Asia 2020 議題徵集中  


洞察先機,智見將來,  Flink Forward Asia 2020 盛大開啓!誠邀開源社區的各方力量與咱們一塊兒,探討新型數字化技術下的將來趨勢,共同打造 2020 年大數據領域的這場頂級盛會!

在 Flink Forward Asia 2020,您可與全球開發者分享您的真知灼見,同各技術領域頂級專家面對面交流,探索數字化技術下的將來趨勢若是您對以上任意技術方向有積累洞察,歡迎投遞議題!每位嘉賓最多能夠投遞三個Topic, 投遞日期截止至 10 月 12 日。


(點擊可瞭解更多議題投遞詳情)


戳我投議題!

本文分享自微信公衆號 - Flink 中文社區(gh_5efd76d10a8d)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索