Flink SQL CDC 上線!咱們總結了 13 條生產實踐經驗

摘要: 7月,Flink 1.11 新版發佈,在生態及易用性上有大幅提高,其中 Table & SQL 開始支持 Change Data Capture(CDC)。CDC 被普遍使用在複製數據、更新緩存、微服務間同步數據、審計日誌等場景,本文由社區由曾慶東同窗分享,主要介紹 Flink SQL CDC 在生產環境的落地實踐以及總結的實戰經驗,文章分爲如下幾部分:

  1. 項目背景css

  2. 解決方案html

  3. 項目運行環境與現狀前端

  4. 具體實現node

  5. 踩過的坑和學到的經驗mysql

  6. 總結git


Tips: 點擊下方連接可查看社區直播的 Flink SQL CDC 相關視頻~
https://flink-learning.org.cn/developers/flink-training-course3/

01 項目背景github


本人目前參與的項目屬於公司裏面數據密集、計算密集的一個重要項目,須要提供高效且準確的 OLAP 服務,提供靈活且實時的報表。業務數據存儲在 MySQL 中,經過主從複製同步到報表庫。做爲集團級公司,數據增加多並且快,出現了多個千萬級、億級的大表。爲了實現各個維度的各類複雜的報表業務,有些千萬級大表仍然須要進行 Join,計算規模很是驚人,常常不能及時響應請求。

隨着數據量的日益增加和實時分析的需求愈來愈大,急需對系統進行流式計算、實時化改造。正是在這個背景下,開始了咱們與 Flink SQL CDC 的故事。

02 解決方案web


針對平臺如今存在的問題,咱們提出了把報表的數據實時化的方案。該方案主要經過 Flink SQL CDC + Elasticsearch 實現。Flink SQL 支持 CDC 模式的數據同步,將 MySQL 中的全增量數據實時地採集、預計算、並同步到 Elasticsearch 中,Elasticsearch 做爲咱們的實時報表和即席分析引擎。項目總體架構圖以下所示:


實時報表實現具體思路是,使用 Flink CDC 讀取全量數據,全量數據同步完成後,Flink CDC 會無縫切換至 MySQL 的 binlog 位點繼續消費增量的變動數據,且保證不會多消費一條也不會少消費一條。讀取到的帳單和訂單的全增量數據會與產品表作關聯補全信息,並作一些預聚合,而後將聚合結果輸出到 Elasticsearch,前端頁面只須要到 Elasticsearch 經過精準匹配(terms)查找數據,或者再使用 agg 作高維聚合統計獲得多個服務中心的報表數據。

從總體架構中,能夠看到,Flink SQL 及其 CDC 功能在咱們的架構中扮演着核心角色。咱們採用 Flink SQL CDC,而不是 Canal + Kafka 的傳統架構,主要緣由仍是由於其依賴組件少,維護成本低,開箱即用,上手容易。具體來講 Flink SQL CDC 是一個集採集、計算、傳輸於一體的工具,其吸引咱們的優勢有:

① 減小維護的組件、簡化實現鏈路; 
② 減小端到端延遲; 
③ 減輕維護成本和開發成本; 
④ 支持 Exactly Once 的讀取和計算(因爲咱們是帳務系統,因此數據一致性很是重要); 
⑤ 數據不落地,減小存儲成本; 
⑥ 支持全量和增量流式讀取;

有關 Flink SQL CDC 的介紹和教程,能夠觀看 Apache Flink 社區發佈的相關視頻:

https://www.bilibili.com/video/BV1zt4y1D7kt/

項目使用的是 flink-cdc-connectors 中提供的 mysql-cdc 組件。這是一個 Flink 數據源,支持對 MySQL 數據庫的全量和增量讀取。它在掃描全表前會先加一個全局讀鎖,而後獲取此時的 binlog position,緊接着釋放全局讀鎖。隨後開始掃描全表,當全錶快照讀取完後,會從以前獲取的 binlog position 獲取增量的變動記錄。所以這個讀鎖是很是輕量的,持鎖時間很是短,不會對線上業務形成太大影響。更多信息能夠參考 flink-cdc-connectors 項目官網: https://github.com/ververica/flink-cdc-connectors。

03 項目運行環境與現狀sql


咱們在生產環境搭建了 Hadoop + Flink + Elasticsearch 分佈式環境,採用的 Flink on YARN 的 per-job 模式運行,使用 RocksDB 做爲 state backend,HDFS 做爲 checkpoint 持久化地址,而且作好了 HDFS 的容錯,保證 checkpoint 數據不丟失。咱們使用 SQL Client 提交做業,全部做業統一使用純 SQL,沒有寫一行 Java 代碼。

目前已上線了 3 個基於 Flink CDC 的做業,已穩定在線上運行了兩個星期,而且業務產生的訂單實收和帳單實收數據能實時聚合輸出到 Elasticsearch,輸出的數據準確無誤。如今也正在對其餘報表採用 Flink SQL CDC 進行實時化改造,替換舊的業務系統,讓系統數據更實時。

04 具體實現數據庫


① 進入 Flink/bin,使用 ./sql-client.sh embedded 啓動 SQL CLI 客戶端。 

② 使用 DDL 建立 Flink Source 和 Sink 表。這裏建立的表字段個數不必定要與 MySQL 的字段個數和順序一致,只須要挑選 MySQL 表中業務須要的字段便可,而且字段類型保持一致。

-- 在Flink建立帳單實收source表CREATE TABLE bill_info ( billCode STRING, serviceCode STRING, accountPeriod STRING, subjectName STRING , subjectCode STRING, occurDate TIMESTAMP, amt DECIMAL(11,2), status STRING, proc_time AS PROCTIME() -–使用維表時須要指定該字段) WITH ( 'connector' = 'mysql-cdc', -- 鏈接器 'hostname' = '******', --mysql地址 'port' = '3307', -- mysql端口 'username' = '******', --mysql用戶名 'password' = '******', -- mysql密碼 'database-name' = 'cdc', -- 數據庫名稱 'table-name' = '***');
-- 在Flink建立訂單實收source表CREATE TABLE order_info ( orderCode STRING, serviceCode STRING, accountPeriod STRING, subjectName STRING , subjectCode STRING, occurDate TIMESTAMP, amt DECIMAL(11, 2), status STRING, proc_time AS PROCTIME() -–使用維表時須要指定該字段) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '******', 'port' = '3307', 'username' = '******', 'password' = '******', 'database-name' = 'cdc', 'table-name' = '***',);
-- 建立科目維表CREATE TABLE subject_info ( code VARCHAR(32) NOT NULL, name VARCHAR(64) NOT NULL, PRIMARY KEY (code) NOT ENFORCED --指定主鍵) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true', 'driver' = 'com.mysql.cj.jdbc.Driver', 'table-name' = '***', 'username' = '******', 'password' = '******', 'lookup.cache.max-rows' = '3000', 'lookup.cache.ttl' = '10s', 'lookup.max-retries' = '3');
-- 建立實收分佈結果表,把結果寫到 ElasticsearchCREATE TABLE income_distribution ( serviceCode STRING, accountPeriod STRING, subjectCode STRING, subjectName STRING, amt DECIMAL(13,2), PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://xxxx:9200', 'index' = 'income_distribution', 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL');

以上的建表 DDL 分別建立了訂單實收 source 表、帳單實收 source 表、產品科目維表和 Elasticsearch 結果表。建表完成後,Flink 是不會立刻去同步 MySQL 的數據,而是等到用戶提交了一個 insert 做業後纔會執行同步數據,而且 Flink 不會存儲數據。咱們的第一個做業是計算收入分佈,數據來源於 bill_info 和 order_info 兩張 MySQL 表,而且帳單實收表和訂單實收表都須要關聯維表數據獲取應收科目的最新中文名稱,按照服務中心、帳期、科目代碼和科目名稱進行分組計算實收金額的 sum 值,實收分佈具體 DML 以下:

INSERT INTO income_distributionSELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt FROM ( SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt  FROM bill_info AS b JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code  GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.nameUNION ALL SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt FROM order_info AS b JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code  GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name) AS t1GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;

Flink SQL 的維表 JOIN 和雙流 JOIN 寫法上不太同樣,對於維表,還須要在 Flink source table 上添加一個 proctime 字段 proc_time AS PROCTIME(),關聯的時候使用 FOR SYSTEM_TIME AS OF 的 SQL 語法查詢時態表,意思是關聯查詢最新版本的維表數據。關於維表 JOIN 的使用可參閱: https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/streaming/joins.html。

③ 在 SQL Client 執行以上做業後,YARN 會建立一個 Flink 集羣運行做業,而且用戶能夠在 Hadoop 上查看到執行做業的全部信息,而且能進入 Flink 的 Web UI 頁面查看 Flink 做業詳情,如下是 Hadoop 全部做業狀況。


④ 做業提交後,Flink SQL CDC 會掃描指定的 MySQL 表,在這期間 Flink 也會進行 checkpoint,因此須要按照上文所述的配置 checkpoint 的重試策略和重試次數。當數據被讀取進 Flink 後,Flink 會流式地進行做業邏輯的計算,實時統計出聚合結果輸出到 Elasticsearch(sink 端)。 至關於咱們使用 Flink 在 MySQL 的表上維護了一個實時的物化視圖,並將這個實時物化視圖的結果存在了 Elasticsearch 中。在 Elasticsearch 中使用 GET /income_distribution/_search{ "query": {"match_all": {}}} 命令查看輸出的實收分佈結果,以下圖:


經過圖中的結果能夠看出聚合結果被實時的計算出來,並寫到了 Elasticsearch 中了。

05 踩過的坑和學到的經驗


1. Flink 做業原來運行在 standalone session 模式下,提交多個 Flink 做業會致使做業失敗報錯。


  • 緣由:由於 standalone session 模式下啓動多個做業會致使多個做業的 Task 共享一個 JVM,可能會致使一些不穩定的問題。而且排查問題時,多個做業的日誌混在一個 TaskManager 中,增長了排查的難度。


  • 解決方法:採用 YARN 的 per-job 模式啓動多個做業,能有更好的隔離性。


2. SELECT elasticsearch table 報如下錯誤:



  • 緣由:Elasticsearch connector 目前只支持了 sink,不支持 source 。因此不能 SELECT elasticsearch table。


3. 在 flink-conf.yaml 裏修改默認並行度,可是在 Web UI 看到做業的並行度仍是 1,並行度修改不生效。

  • 解決辦法:在使用 SQL Client 時 sql-client-defaults.yaml 中的並行度配置的優先級更高。在 sql-client-defaults.yaml 中修改並行度,或者刪除 sql-client-defaults.yaml 中的並行度配置。更建議採用後者。


4. Flink 做業在掃描 MySQL 全量數據時,checkpoint 超時,出現做業 failover,以下圖:


  • 緣由:Flink CDC 在 scan 全表數據(咱們的實收表有千萬級數據)須要小時級的時間(受下游聚合反壓影響),而在 scan 全表過程當中是沒有 offset 能夠記錄的(意味着無法作 checkpoint),可是 Flink 框架任什麼時候候都會按照固定間隔時間作 checkpoint,因此此處 mysql-cdc source 作了比較取巧的方式,即在 scan 全表的過程當中,會讓執行中的 checkpoint 一直等待甚至超時。超時的 checkpoint 會被仍未認爲是 failed checkpoint,默認配置下,這會觸發 Flink 的 failover 機制,而默認的 failover 機制是不重啓。因此會形成上面的現象。


  • 解決辦法:在 flink-conf.yaml 配置 failed checkpoint 容忍次數,以及失敗重啓策略,以下:


execution.checkpointing.interval: 10min # checkpoint間隔時間execution.checkpointing.tolerable-failed-checkpoints: 100 # checkpoint 失敗容忍次數restart-strategy: fixed-delay # 重試策略restart-strategy.fixed-delay.attempts: 2147483647 # 重試次數

目前 Flink 社區也有一個 issue(Flink-18578)來支持 source 主動拒絕 checkpoint 的機制,未來基於該機制,能比較優雅地解決這個問題。

5. Flink 怎麼樣開啓 YARN 的 per-job 模式?

  • 解決方法:在 flink-conf.yaml 中配置 execution.target: yarn-per-job。


6. 進入 SQL Client 建立 table 後,在另一個節點進入 SQL Client 查詢不到 table。

  • 緣由:由於 SQL Client 默認的 Catalog 是在 in-memory 的,不是持久化  Catalog,因此這屬於正常現象,每次啓動 Catalog 裏面都是空的。


7. 做業在運行時 Elasticsearch 報以下錯誤:

Caused by: org.apache.Flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=illegal_argument_exception, reason=mapper [amt] cannot be changed from type [long] to [float]]


  • 緣由:數據庫表的字段 amt 的類型是 decimal,DDL 建立輸出到 es 的 amt 字段的類型也是 decimal,由於輸出到 es 的第一條數據的amt若是是整數,好比是 10,輸出到 es 的類型是 long 類型的,es client 會自動建立 es 的索引而且設置 amt 字段爲 long 類型的格式,那麼若是下一次輸出到 es 的 amt 是非整數 10.1,那麼輸出到 es 的時候就會出現類型不匹配的錯誤。


  • 解決方法:手動生成 es 索引和 mapping 的信息,指定好 decimal 類型的數據格式是 saclefloat,可是在 DDL 處仍然能夠保留該字段類型是 decimal。


8. 做業在運行時 mysql cdc source 報以下錯誤:


  • 緣由:由於數據庫中別的表作了字段修改,CDC source 同步到了 ALTER DDL 語句,可是解析失敗拋出的異常。


  • 解決方法:在 flink-cdc-connectors 最新版本中已經修復該問題(跳過了沒法解析的 DDL)。升級 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替換 flink/lib 下的舊包。


9. 掃描全表階段慢,在 Web UI 出現以下現象:


  • 緣由:掃描全表階段慢不必定是 cdc source 的問題,多是下游節點處理太慢反壓了。


  • 解決方法:經過 Web UI 的反壓工具排查發現,瓶頸主要在聚合節點上。經過在 sql-client-defaults.yaml 文件配上 MiniBatch 相關參數和開啓 distinct 優化(咱們的聚合中有 count distinct),做業的 scan 效率獲得了很大的提高,從原先的 10 小時,提高到了 1 小時。關於性能調優的參數能夠參閱:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/tuning/streaming_aggregation_optimization.html。


configuration: table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 2s table.exec.mini-batch.size: 5000 table.optimizer.distinct-agg.split.enabled: true

10. CDC source 掃描 MySQL 表期間,發現沒法往該表 insert 數據。

  • 緣由:因爲使用的 MySQL 用戶未受權 RELOAD 權限,致使沒法獲取全局讀鎖(FLUSH TABLES WITH READ LOCK), CDC source 就會退化成表級讀鎖,而使用表級讀鎖須要等到全表 scan 完,才能釋放鎖,因此會發現持鎖時間過長的現象,影響其餘業務寫入數據。


  • 解決方法:給使用的 MySQL 用戶授予 RELOAD 權限便可。所需的權限列表詳見文檔:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server。若是出於某些緣由沒法授予 RELOAD 權限,也能夠顯式配上 'debezium.snapshot.locking.mode' = 'none'來避免全部鎖的獲取,但要注意只有當快照期間表的 schema 不會變動才安全。


11. 多個做業共用同一張 source table 時,沒有修改 server id 致使讀取出來的數據有丟失。

  • 緣由:MySQL binlog 數據同步的原理是,CDC source 會假裝成 MySQL 集羣的一個 slave(使用指定的 server id 做爲惟一 id),而後從 MySQL 拉取 binlog 數據。若是一個 MySQL 集羣中有多個 slave 有一樣的 id,就會致使拉取數據錯亂的問題。


  • 解決方法:默認會隨機生成一個 server id,容易有碰撞的風險。因此建議使用動態參數(table hint)在 query 中覆蓋 server id。以下所示:


SELECT *FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;

12. 在啓動做業時,YARN 接收了任務,但做業一直未啓動:


  • 緣由:Queue Resource Limit for AM 超過了限制資源限制。默認的最大內存是 30G (集羣內存) * 0.1 = 3G,而每一個 JM 申請 2G 內存,當提交第二個任務時,資源就不夠了。


  • 解決方法:調大 AM 的 resource limit,在 capacity-scheduler.xml 配置 yarn.scheduler.capacity.maximum-am-resource-percent,表明AM的佔總資源的百分比,默認爲0.1,改爲0.3(根據服務器的性能靈活配置)。


13. AM 進程起不來,一直被 kill 掉。


  • 緣由:386.9 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memory use。默認物理內存是 1GB,動態申請到了 1GB,其中使用了386.9 MB。物理內存 x 2.1=虛擬內存,1GBx2.1≈2.1GB ,2.1GB 虛擬內存已經耗盡,當虛擬內存不夠時候,AM 的 container 就會自殺。


  • 解決方法:兩個解決方案,或調整 yarn.nodemanager.vmem-pmem-ratio 值大點,或 yarn.nodemanager.vmem-check-enabled=false,關閉虛擬內存檢查。參考:https://blog.csdn.net/lzxlfly/article/details/89175452。

06 總結


爲了提高了實時報表服務的可用性和實時性,一開始咱們採用了 Canal+Kafka+Flink 的方案,但是發現須要寫比較多的 Java 代碼,並且還須要處理好 DataStream 和 Table 的轉換以及 binlong 位置的獲取,開發難度相對較大。另外,須要維護 Kafka 和 Canal 這兩個組件的穩定運行,對於咱們小團隊來講成本也不小。因爲咱們公司已經有基於 Flink 的任務在線上運行,所以採用 Flink SQL CDC 就成了瓜熟蒂落的事情。基於 Flink SQL CDC 的方案只須要編寫 SQL ,不用寫一行 Java 代碼就能完成實時鏈路的打通和實時報表的計算,對於咱們來講很是的簡單易用,並且在線上運行的穩定性和性能表現也讓咱們滿意。

咱們正在公司內大力推廣 Flink SQL CDC 的使用,也正在着手改造其餘幾個實時鏈路的任務。很是感謝開源社區能爲咱們提供如此強大的工具,也但願 Flink CDC 愈來愈強大,支持更多的數據庫和功能。也再次感謝雲邪老師對於咱們項目上線的大力支持!

做者介紹:

曾慶東 ,金地物業中級開發工程師,負責聚合營業平臺實時計算開發及運維工做,從事過大數據開發,目前專一於 Apache Flink 實時計算,喜歡開源技術,喜歡分享。 



  福利來了  

Apache Flink 極客挑戰賽


萬衆矚目的第二屆 Apache Flink 極客挑戰賽來啦!本次大賽全面升級,重量級助陣嘉賓專業指導,強大的資源配置供你發揮創意,還有 30w 豐厚獎金等你帶走~聚焦  Flink 與 AI 技術的應用實踐,挑戰疫情防控的世界級難題,你準備好了麼?

(點擊圖片可瞭解更多大賽信息)

戳我報名!

本文分享自微信公衆號 - Flink 中文社區(gh_5efd76d10a8d)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。

相關文章
相關標籤/搜索