簡介:由汽車之家實時計算平臺負責人邸星星在 4 月 17 日上海站 Meetup 分享的,基於 Flink + Iceberg 的湖倉一體架構實踐。git
內容簡要:github
1、數據倉庫架構升級的背景算法
2、基於 Iceberg 的湖倉一體架構實踐apache
3、總結與收益json
4、後續規劃緩存
GitHub 地址
https://github.com/apache/flink
歡迎你們給 Flink 點贊送 star~架構
原有的數據倉庫徹底基於 Hive 建造而成,主要存在三大痛點:併發
痛點一:不支持 ACIDide
1)不支持 Upsert 場景;測試
2)不支持 Row-level delete,數據修正成本高。
痛點二:時效性難以提高
1)數據難以作到準實時可見;
2)沒法增量讀取,沒法實現存儲層面的流批統一;
3)沒法支持分鐘級延遲的數據分析場景。
痛點三:Table Evolution
1)寫入型 Schema,對 Schema 變動支持很差;
2)Partition Spec 變動支持不友好。
Iceberg 主要有四大關鍵特性:支持 ACID 語義、增量快照機制、開放的表格式和流批接口支持。
支持 ACID 語義
增量快照機制
開放的表格式
流批接口支持
湖倉一體的意義就是說我不須要看見湖和倉,數據有着打通的元數據的格式,它能夠自由的流動,也能夠對接上層多樣化的計算生態。
——賈揚清(阿里雲計算平臺高級研究員)
上圖爲日誌類數據入湖的鏈路,日誌類數據包含客戶端日誌、用戶端日誌以及服務端日誌。這些日誌數據會實時錄入到 Kafka,而後經過 Flink 任務寫到 Iceberg 裏面,最終存儲到 HDFS。
咱們的 Flink SQL 入湖鏈路打通是基於 「Flink 1.11 + Iceberg 0.11」 完成的,對接 Iceberg Catalog 咱們主要作了如下內容:
1)Meta Server 增長對 Iceberg Catalog 的支持;
2)SQL SDK 增長 Iceberg Catalog 支持。
而後在這基礎上,平臺開放 Iceberg 表的管理功能,使得用戶能夠本身在平臺上建 SQL 的表。
第二步是內部的實踐,對接現有預算體系、權限體系。
由於以前平臺作實時做業的時候,平臺都是默認爲 Flink 用戶去運行的,以前存儲不涉及 HDFS 存儲,所以可能沒有什麼問題,也就沒有思考預算劃分方面的問題。
可是如今寫 Iceberg 的話,可能就會涉及一些問題。好比數倉團隊有本身的集市,數據就應該寫到他們的目錄下面,預算也是劃到他們的預算下,同時權限和離線團隊帳號的體系打通。
如上所示,這塊主要是在平臺上作了代理用戶的功能,用戶能夠去指定用哪一個帳號去把這個數據寫到 Iceberg 裏面,實現過程主要有如下三個。
增長 Table 級別配置:'iceberg.user.proxy' = 'targetUser’
1)啓用 Superuser
2)團隊帳號鑑權
訪問 HDFS 時啓用代理用戶:
訪問 Hive Metastore 時指定代理用戶
1)參考 Spark 的相關實現:
org.apache.spark.deploy.security.HiveDelegationTokenProvider
2)動態代理 HiveMetaStoreClient,使用代理用戶訪問 Hive metastore
DDL + DML
如上所示,咱們有一個 AutoDTS 平臺,負責業務庫數據的實時接入。咱們會把這些業務庫的數據接入到 Kafka 裏面,同時它還支持在平臺上配置分發任務,至關於把進 Kafka 的數據分發到不一樣的存儲引擎裏,在這個場景下是分發到 Iceberg 裏。
下面是咱們基於 「Flink1.11 + Iceberg 0.11」 支持 CDC 入湖所作的改動:
改進 Iceberg Sink:
Flink 1.11 版本爲 AppendStreamTableSink,沒法處理 CDC 流,修改並適配。
表管理
1)支持 Primary key(PR1978)
2)開啓 V2 版本:'iceberg.format.version' = '2'
1. 支持 Bucket
Upsert 場景下,須要確保同一條數據寫入到同一 Bucket 下,這又如何實現?
目前 Flink SQL 語法不支持聲明 bucket 分區,經過配置的方式聲明 Bucket:
'partition.bucket.source'='id', // 指定 bucket 字段
'partition.bucket.num'='10', // 指定 bucket 數量
2. Copy-on-write sink
作 Copy-on-Write 的緣由是本來社區的 Merge-on-Read 不支持合併小文件,因此咱們臨時去作了 Copy-on-write sink 的實現。目前業務一直在測試使用,效果良好。
上方爲 Copy-on-Write 的實現,其實跟原來的 Merge-on-Read 比較相似,也是有 StreamWriter 多並行度寫入和 FileCommitter 單並行度順序提交。
在 Copy-on-Write 裏面,須要根據表的數據量合理設置 Bucket 數,無需額外作小文件合併。
StreamWriter 在 snapshotState 階段多並行度寫入
1)增長 Buffer;
2)寫入前須要判斷上次 checkpoint 已經 commit 成功;
3)按 bucket 分組、合併,逐個 Bucket 寫入。
FileCommitter 單並行度順序提交
1)table.newOverwrite()
2)Flink.last.committed.checkpoint.id
如上圖所示,在實際使用中,業務方能夠在 DTS 平臺上建立或配置分發任務便可。
實例類型選擇 Iceberg 表,而後選擇目標庫,代表要把哪一個表的數據同步到 Iceberg 裏,而後能夠選原表和目標表的字段的映射關係是什麼樣的,配置以後就能夠啓動分發任務。啓動以後,會在實時計算平臺 Flink 裏面提交一個實時任務,接着用 Copy-on-write sink 去實時地把數據寫到 Iceberg 表裏面。
實踐一:減小 empty commit
問題描述:
在上游 Kafka 長期沒有數據的狀況下,每次 Checkpoint 依舊會生成新的 Snapshot,致使大量的空文件和沒必要要的 Snapshot。
解決方案(PR - 2042):
增長配置 Flink.max-continuousempty-commits,在連續指定次數 Checkpoint 都沒有數據後才真正觸發 Commit,生成 Snapshot。
實踐二:記錄 watermark
問題描述:
目前 Iceberg 表自己沒法直接反映數據寫入的進度,離線調度難以精準觸發下游任務。
解決方案( PR - 2109 ):
在 Commit 階段將 Flink 的 Watermark 記錄到 Iceberg 表的 Properties 中,可直觀的反映端到端的延遲狀況,同時能夠用來判斷分區數據完整性,用於調度觸發下游任務。
實踐三:刪表優化
問題描述:
刪除 Iceberg 可能會很慢,致使平臺接口相應超時。由於 Iceberg 是面向對象存儲來抽象 IO 層的,沒有快速清除目錄的方法。
解決方案:
擴展 FileIO,增長 deleteDir 方法,在 HDFS 上快速刪除表數據。
按期爲每一個表執行批處理任務(spark 3),分爲如下三個步驟:
1. 按期合併新增分區的小文件:
rewriteDataFilesAction.execute(); 僅合併小文件,不會刪除舊文件。
2. 刪除過時的 snapshot,清理元數據及數據文件:
table.expireSnapshots().expireOld erThan(timestamp).commit();
3. 清理 orphan 文件,默認清理 3 天前,且沒法觸及的文件:
removeOrphanFilesAction.older Than(timestamp).execute();
Flink 是實時平臺的核心計算引擎,目前主要支持數據入湖場景,主要有如下幾個方面的特色。
數據準實時入湖:
Flink 和 Iceberg 在數據入湖方面集成度最高,Flink 社區主動擁抱數據湖技術。
平臺集成:
AutoStream 引入 IcebergCatalog,支持經過 SQL 建表、入湖 AutoDTS 支持將 MySQL、SQLServer、TiDB 表配置入湖。
流批一體:
在流批一體的理念下,Flink 的優點會逐漸體現出來。
Hive 在 SQL 批處理層面 Iceberg 和 Spark 3 集成度更高,主要提供如下三個方面的功能。
按期小文件合併及 meta 信息查詢:
SELECT * FROM prod.db.table.history 還可查看 snapshots, files, manifests。
離線數據寫入:
1)Insert into 2)Insert overwrite 3)Merge into
分析查詢:
主要支持平常的準實時分析查詢場景。
AutoBI 已經和 Presto 集成,用於報表、分析型查詢場景。
Trino
1)直接將 Iceberg 做爲報表數據源
2)須要增長元數據緩存機制:https://github.com/trinodb/trino/issues/7551
Presto
1. 訪問 Hive Metastore 異常
問題描述:HiveConf 的構造方法的誤用,致使 Hive 客戶端中聲明的配置被覆蓋,致使訪問 Hive metastore 時異常
解決方案(PR-2075):修復 HiveConf 的構造,顯示調用 addResource 方法,確保配置不會被覆蓋:hiveConf.addResource(conf);
2.Hive metastore 鎖未釋放
問題描述:「CommitFailedException: Timed out after 181138 ms waiting for lock xxx.」 緣由是 hiveMetastoreClient.lock 方法,在未得到鎖的狀況下,也須要顯示 unlock,不然會致使上面異常。
解決方案(PR-2263):優化 HiveTableOperations#acquireLock 方法,在獲取鎖失敗的狀況下顯示調用 unlock 來釋放鎖。
3. 元數據文件丟失
問題描述:Iceberg 表沒法訪問,報 「NotFoundException Failed to open input stream for file : xxx.metadata.json」
解決方案(PR-2328):當調用 Hive metastore 更新 iceberg 表的 metadata\_location 超時後,增長檢查機制,確認元數據未保存成功後再刪除元數據文件。
經過對湖倉一體、流批融合的探索,咱們分別作了總結。
湖倉一體
1)Iceberg 支持 Hive Metastore;
2)整體使用上與 Hive 表相似:相同數據格式、相同的計算引擎。
流批融合
準實時場景下實現流批統一:同源、同計算、同存儲。
數據時效性提高:
入倉延遲從 2 小時以上下降到 10 分鐘之內;算法核心任務 SLA 提早 2 小時完成。
準實時的分析查詢:
結合 Spark 3 和 Trino,支持準實時的多維分析查詢。
特徵工程提效:
提供準實時的樣本數據,提升模型訓練時效性。
CDC 數據準實時入倉:
能夠在數倉針對業務表作準實時分析查詢。
上方也提到了,咱們支持準實時的入倉和分析,至關因而爲後續的準實時數倉建設提供了基礎的架構驗證。準實時數倉的優點是一次開發、口徑統1、統一存儲,是真正的批流一體。劣勢是實時性較差,原來多是秒級、毫秒級的延遲,如今是分鐘級的數據可見性。
可是在架構層面上,這個意義仍是很大的,後續咱們能看到一些但願,能夠把整個原來 「T + 1」 的數倉,作成準實時的數倉,提高數倉總體的數據時效性,而後更好地支持上下游的業務。
1. 跟進 Iceberg 版本
全面開放 V2 格式,支持 CDC 數據的 MOR 入湖。
2. 建設準實時數倉
基於 Flink 經過 Data pipeline 模式對數倉各層表全面提速。
3. 流批一體
隨着 upsert 功能的逐步完善,持續探索存儲層面流批一體。
4. 多維分析
基於 Presto/Spark3 輸出準實時多維分析。
本文內容由阿里雲實名註冊用戶自發貢獻,版權歸原做者全部,阿里雲開發者社區不擁有其著做權,亦不承擔相應法律責任。具體規則請查看《阿里雲開發者社區用戶服務協議》和《阿里雲開發者社區知識產權保護指引》。若是您發現本社區中有涉嫌抄襲的內容,填寫侵權投訴表單進行舉報,一經查實,本社區將馬上刪除涉嫌侵權內容。