項目背景css
解決方案html
項目運行環境與現狀前端
具體實現node
踩過的坑和學到的經驗mysql
總結git
https://flink-learning.org.cn/developers/flink-training-course3/
01 項目背景github
02 解決方案web
03 項目運行環境與現狀sql
04 具體實現數據庫
-- 在Flink建立帳單實收source表CREATE TABLE bill_info ( billCode STRING, serviceCode STRING, accountPeriod STRING, subjectName STRING , subjectCode STRING, occurDate TIMESTAMP, amt DECIMAL(11,2), status STRING, proc_time AS PROCTIME() -–使用維表時須要指定該字段) WITH ( 'connector' = 'mysql-cdc', -- 鏈接器 'hostname' = '******', --mysql地址 'port' = '3307', -- mysql端口 'username' = '******', --mysql用戶名 'password' = '******', -- mysql密碼 'database-name' = 'cdc', -- 數據庫名稱 'table-name' = '***');
-- 在Flink建立訂單實收source表CREATE TABLE order_info ( orderCode STRING, serviceCode STRING, accountPeriod STRING, subjectName STRING , subjectCode STRING, occurDate TIMESTAMP, amt DECIMAL(11, 2), status STRING, proc_time AS PROCTIME() -–使用維表時須要指定該字段) WITH ( 'connector' = 'mysql-cdc', 'hostname' = '******', 'port' = '3307', 'username' = '******', 'password' = '******', 'database-name' = 'cdc', 'table-name' = '***',);
-- 建立科目維表CREATE TABLE subject_info ( code VARCHAR(32) NOT NULL, name VARCHAR(64) NOT NULL, PRIMARY KEY (code) NOT ENFORCED --指定主鍵) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://xxxx:xxxx/spd?useSSL=false&autoReconnect=true', 'driver' = 'com.mysql.cj.jdbc.Driver', 'table-name' = '***', 'username' = '******', 'password' = '******', 'lookup.cache.max-rows' = '3000', 'lookup.cache.ttl' = '10s', 'lookup.max-retries' = '3');
-- 建立實收分佈結果表,把結果寫到 ElasticsearchCREATE TABLE income_distribution ( serviceCode STRING, accountPeriod STRING, subjectCode STRING, subjectName STRING, amt DECIMAL(13,2), PRIMARY KEY (serviceCode, accountPeriod, subjectCode) NOT ENFORCED) WITH ( 'connector' = 'elasticsearch-7', 'hosts' = 'http://xxxx:9200', 'index' = 'income_distribution', 'sink.bulk-flush.backoff.strategy' = 'EXPONENTIAL');
INSERT INTO income_distributionSELECT t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName, SUM(amt) AS amt FROM ( SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt FROM bill_info AS b JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.nameUNION ALL SELECT b.serviceCode, b.accountPeriod, b.subjectCode, s.name AS subjectName, SUM(amt) AS amt FROM order_info AS b JOIN subject_info FOR SYSTEM_TIME AS OF b.proc_time s ON b.subjectCode = s.code GROUP BY b.serviceCode, b.accountPeriod, b.subjectCode, s.name) AS t1GROUP BY t1.serviceCode, t1.accountPeriod, t1.subjectCode, t1.subjectName;
![](http://static.javashuo.com/static/loading.gif)
![](http://static.javashuo.com/static/loading.gif)
05 踩過的坑和學到的經驗
1. Flink 做業原來運行在 standalone session 模式下,提交多個 Flink 做業會致使做業失敗報錯。
緣由:由於 standalone session 模式下啓動多個做業會致使多個做業的 Task 共享一個 JVM,可能會致使一些不穩定的問題。而且排查問題時,多個做業的日誌混在一個 TaskManager 中,增長了排查的難度。
解決方法:採用 YARN 的 per-job 模式啓動多個做業,能有更好的隔離性。
2. SELECT elasticsearch table 報如下錯誤:
![](http://static.javashuo.com/static/loading.gif)
緣由:Elasticsearch connector 目前只支持了 sink,不支持 source 。因此不能 SELECT elasticsearch table。
解決辦法:在使用 SQL Client 時 sql-client-defaults.yaml 中的並行度配置的優先級更高。在 sql-client-defaults.yaml 中修改並行度,或者刪除 sql-client-defaults.yaml 中的並行度配置。更建議採用後者。
![](http://static.javashuo.com/static/loading.gif)
緣由:Flink CDC 在 scan 全表數據(咱們的實收表有千萬級數據)須要小時級的時間(受下游聚合反壓影響),而在 scan 全表過程當中是沒有 offset 能夠記錄的(意味着無法作 checkpoint),可是 Flink 框架任什麼時候候都會按照固定間隔時間作 checkpoint,因此此處 mysql-cdc source 作了比較取巧的方式,即在 scan 全表的過程當中,會讓執行中的 checkpoint 一直等待甚至超時。超時的 checkpoint 會被仍未認爲是 failed checkpoint,默認配置下,這會觸發 Flink 的 failover 機制,而默認的 failover 機制是不重啓。因此會形成上面的現象。
解決辦法:在 flink-conf.yaml 配置 failed checkpoint 容忍次數,以及失敗重啓策略,以下:
execution.checkpointing.interval: 10min execution.checkpointing.tolerable-failed-checkpoints: 100 restart-strategy: fixed-delay restart-strategy.fixed-delay.attempts: 2147483647
解決方法:在 flink-conf.yaml 中配置 execution.target: yarn-per-job。
緣由:由於 SQL Client 默認的 Catalog 是在 in-memory 的,不是持久化 Catalog,因此這屬於正常現象,每次啓動 Catalog 裏面都是空的。
Caused by: org.apache.Flink.elasticsearch7.shaded.org.elasticsearch.ElasticsearchException: Elasticsearch exception [type=illegal_argument_exception, reason=mapper [amt] cannot be changed from type [long] to [float]]
緣由:數據庫表的字段 amt 的類型是 decimal,DDL 建立輸出到 es 的 amt 字段的類型也是 decimal,由於輸出到 es 的第一條數據的amt若是是整數,好比是 10,輸出到 es 的類型是 long 類型的,es client 會自動建立 es 的索引而且設置 amt 字段爲 long 類型的格式,那麼若是下一次輸出到 es 的 amt 是非整數 10.1,那麼輸出到 es 的時候就會出現類型不匹配的錯誤。
解決方法:手動生成 es 索引和 mapping 的信息,指定好 decimal 類型的數據格式是 saclefloat,可是在 DDL 處仍然能夠保留該字段類型是 decimal。
![](http://static.javashuo.com/static/loading.gif)
緣由:由於數據庫中別的表作了字段修改,CDC source 同步到了 ALTER DDL 語句,可是解析失敗拋出的異常。
解決方法:在 flink-cdc-connectors 最新版本中已經修復該問題(跳過了沒法解析的 DDL)。升級 connector jar 包到最新版本 1.1.0:flink-sql-connector-mysql-cdc-1.1.0.jar,替換 flink/lib 下的舊包。
![](http://static.javashuo.com/static/loading.gif)
緣由:掃描全表階段慢不必定是 cdc source 的問題,多是下游節點處理太慢反壓了。
解決方法:經過 Web UI 的反壓工具排查發現,瓶頸主要在聚合節點上。經過在 sql-client-defaults.yaml 文件配上 MiniBatch 相關參數和開啓 distinct 優化(咱們的聚合中有 count distinct),做業的 scan 效率獲得了很大的提高,從原先的 10 小時,提高到了 1 小時。關於性能調優的參數能夠參閱:https://ci.apache.org/projects/flink/flink-docs-release-1.11/zh/dev/table/tuning/streaming_aggregation_optimization.html。
configuration: table.exec.mini-batch.enabled: true table.exec.mini-batch.allow-latency: 2s table.exec.mini-batch.size: 5000 table.optimizer.distinct-agg.split.enabled: true
緣由:因爲使用的 MySQL 用戶未受權 RELOAD 權限,致使沒法獲取全局讀鎖(FLUSH TABLES WITH READ LOCK), CDC source 就會退化成表級讀鎖,而使用表級讀鎖須要等到全表 scan 完,才能釋放鎖,因此會發現持鎖時間過長的現象,影響其餘業務寫入數據。
解決方法:給使用的 MySQL 用戶授予 RELOAD 權限便可。所需的權限列表詳見文檔:https://github.com/ververica/flink-cdc-connectors/wiki/mysql-cdc-connector#setup-mysql-server。若是出於某些緣由沒法授予 RELOAD 權限,也能夠顯式配上 'debezium.snapshot.locking.mode' = 'none'來避免全部鎖的獲取,但要注意只有當快照期間表的 schema 不會變動才安全。
緣由:MySQL binlog 數據同步的原理是,CDC source 會假裝成 MySQL 集羣的一個 slave(使用指定的 server id 做爲惟一 id),而後從 MySQL 拉取 binlog 數據。若是一個 MySQL 集羣中有多個 slave 有一樣的 id,就會致使拉取數據錯亂的問題。
解決方法:默認會隨機生成一個 server id,容易有碰撞的風險。因此建議使用動態參數(table hint)在 query 中覆蓋 server id。以下所示:
SELECT *FROM bill_info /*+ OPTIONS('server-id'='123456') */ ;
![](http://static.javashuo.com/static/loading.gif)
緣由:Queue Resource Limit for AM 超過了限制資源限制。默認的最大內存是 30G (集羣內存) * 0.1 = 3G,而每一個 JM 申請 2G 內存,當提交第二個任務時,資源就不夠了。
解決方法:調大 AM 的 resource limit,在 capacity-scheduler.xml 配置 yarn.scheduler.capacity.maximum-am-resource-percent,表明AM的佔總資源的百分比,默認爲0.1,改爲0.3(根據服務器的性能靈活配置)。
![](http://static.javashuo.com/static/loading.gif)
緣由:386.9 MB of 1 GB physical memory used; 2.1 GB of 2.1 GB virtual memory use。默認物理內存是 1GB,動態申請到了 1GB,其中使用了386.9 MB。物理內存 x 2.1=虛擬內存,1GBx2.1≈2.1GB ,2.1GB 虛擬內存已經耗盡,當虛擬內存不夠時候,AM 的 container 就會自殺。
解決方法:兩個解決方案,或調整 yarn.nodemanager.vmem-pmem-ratio 值大點,或 yarn.nodemanager.vmem-check-enabled=false,關閉虛擬內存檢查。參考:https://blog.csdn.net/lzxlfly/article/details/89175452。
06 總結
福利來了 ![](http://static.javashuo.com/static/loading.gif)
Apache Flink 極客挑戰賽
![](http://static.javashuo.com/static/loading.gif)
本文分享自微信公衆號 - Flink 中文社區(gh_5efd76d10a8d)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。