Apache Flink 是大數據領域很是流行的流批統一的計算引擎,數據湖是順應雲時代發展潮流的新型技術架構。那麼當 Apache Flink 碰見數據湖時,會碰撞出什麼樣的火花呢?本次分享主要包括如下核心內容:git
- 數據湖的相關背景介紹;
- 經典業務場景介紹;
- 爲何選擇 Apache Iceberg;
- 如何經過 Flink+Iceberg 實現流式入湖
- 社區將來規劃工做。
視頻回顧:https://www.bilibili.com/vide...github
數據湖是個什麼概念呢?通常來講咱們把一家企業產生的數據都維護在一個平臺內,這個平臺咱們就稱之爲「數據湖」。數據庫
看下面這幅圖,這個湖的數據來源多種多樣,有的多是結構化數據,有的多是非結構數據,有的甚至是二進制數據。有一波人站在湖的入口,用設備在檢測水質,這對應着數據湖上的流處理做業;有一批抽水機從湖裏面抽水,這對應着數據湖的批處理做業;還有一批人在船頭釣魚或者在岸上捕魚,這對應着數據科學家從數據湖中經過機器學習的手段來提取數據價值。apache
那麼開源數據湖架構通常是啥樣的呢?這裏我畫了一個架構圖,主要分爲四層:緩存
那麼,Flink 和數據湖結合能夠有哪些經典的應用場景呢?這裏咱們探討業務場景時默認選型了 Apache Iceberg 來做爲咱們的數據湖選型,後面一節會詳細闡述選型背後的理由。架構
首先,Flink+Iceberg 最經典的一個場景就是構建實時的 Data Pipeline。業務端產生的大量日誌數據,被導入到 Kafka 這樣的消息隊列。運用 Flink 流計算引擎執行 ETL後,導入到 Apache Iceberg 原始表中。有一些業務場景須要直接跑分析做業來分析原始表的數據,而另一些業務須要對數據作進一步的提純。那麼咱們能夠再新起一個 Flink 做業從 Apache Iceberg 表中消費增量數據,通過處理以後寫入到提純以後的 Iceberg 表中。此時,可能還有業務須要對數據作進一步的聚合,那麼咱們繼續在iceberg 表上啓動增量 Flink 做業,將聚合以後的數據結果寫入到聚合表中。併發
有人會想,這個場景好像經過 Flink+Hive 也能實現。 Flink+Hive 的確能夠實現,但寫入到 Hive 的數據更多地是爲了實現數倉的數據分析,而不是爲了作增量拉取。通常來講,Hive 的增量寫入以 partition 爲單位,時間是 15min 以上,Flink 長期高頻率地寫入會形成 partition 膨脹。而 Iceberg 允許實現 1 分鐘甚至 30秒的增量寫入,這樣就能夠大大提升了端到端數據的實時性,上層的分析做業能夠看到更新的數據,下游的增量做業能夠讀取到更新的數據。機器學習
第二個經典的場景,就是能夠用 Flink+Iceberg 來分析來自 MySQL 等關係型數據庫的 binlog 等。一方面,Apache Flink 已經原生地支持 CDC 數據解析,一條 binlog 數據經過 ververica flink-cdc-connector 拉取以後,自動轉換成 Flink Runtime 能識別的 INSERT、DELETE、UPDATE_BEFORE、UPDATE_AFTER 四種消息,供用戶作進一步的實時計算。分佈式
另一方面,Apache Iceberg 已經較爲完善地實現了 equality delete 功能,也就是用戶定義好待刪除的 Record,直接寫到 Apache Iceberg 表內就能夠刪除對應的行,自己就是爲了實現數據湖的流式刪除。在 Iceberg 將來的版本中,用戶將不須要設計任何額外的業務字段,不用寫幾行代碼就能夠完成 binlog 流式入湖到 Apache Iceberg(社區的這個 Pull Request 已經提供了一個 flink 寫入 CDC 數據的原型)。ide
此外,CDC 數據成功入湖 Iceberg 以後,咱們還會打一般見的計算引擎,例如 Presto、Spark、Hive 等,他們均可以實時地讀取到 Iceberg 表中的最新數據。
第三個經典場景是近實時場景的流批統一。在經常使用的 lambda 架構中,咱們有一條實時鏈路和一條離線鏈路。實時鏈路通常由 Flink、Kafka、HBase 這些組件構建而成,而離線鏈路通常會用到 Parquet、Spark 等組件構建。這裏面涉及到計算組件和存儲組件都很是多,系統維護成本和業務開發成本都很是高。有不少場景,他們的實時性要求並無那麼苛刻,例如能夠放鬆到分鐘級別,這種場景咱們稱之爲近實時場景。那麼,咱們是否是能夠經過 Flink + Iceberg 來優化咱們經常使用的 lambda 架構呢?
咱們能夠用 Flink+Iceberg 把整個架構優化成上圖所示。實時的數據經過 Flink 寫入到 Iceberg 表中,近實時鏈路依然能夠經過flink計算增量數據,離線鏈路也能夠經過 flink 批計算讀取某個快照作全局分析,獲得對應的分析結果,供不一樣場景下的用戶讀取和分析。通過這種改進以後,咱們把計算引擎統一成了 Flink,把存儲組件統一成了 Iceberg,整個系統的維護開發成本大大下降。
第四個場景,是採用 Iceberg 全量數據和 Kafka 的增量數據來 Bootstrap 新的 Flink 做業。咱們現有的流做業在線上跑着,忽然有一天某個業務方跑過來講,他們遇到一個新的計算場景,須要設計一個新的 Flink 做業,跑一遍去年一年的歷史數據,跑完以後再對接到正在產生的 Kafka 增量數據。那麼這時候應該怎麼辦呢?
咱們依然能夠採用常見的 lambda 架構,離線鏈路經過 kafka->flink->iceberg 同步寫入到數據湖,因爲 Kafka 成本較高,保留最近 7 天數據便可,Iceberg 存儲成本較低,能夠存儲全量的歷史數據(按照 checkpoint 拆分紅多個數據區間)。啓動新 Flink 做業的時候,只須要去拉 Iceberg 的數據,跑完以後平滑地對接到 kafka 數據便可。
第五個場景和第四個場景有點相似。一樣是在 lambda 架構下,實時鏈路因爲事件丟失或者到達順序的問題,可能致使流計算端結果不必定徹底準確,這時候通常都須要全量的歷史數據來訂正實時計算的結果。而咱們的 Iceberg 能夠很好地充當這個角色,由於它能夠高性價比地管理好歷史數據。
回到上一節遺留的一個問題,爲何當時 Flink 在衆多開源數據湖項目中會選擇 Apache Iceberg 呢?
咱們當時詳細地調研了 Delta、Hudi、Iceberg 三個開源項目,並寫了一篇調研報告。咱們發現 Delta 和 Hudi 跟 Spark 的代碼路徑綁定太深,尤爲是寫入路徑。畢竟當時這兩個項目設計之初,都多多少少把 Spark 做爲的他們默認的計算引擎了。而Apache Iceberg 的方向很是堅決,宗旨就是要作一個通用化設計的 Table Format。所以,它完美地解耦了計算引擎和底下的存儲系統,便於接入多樣化計算引擎和文件格式,能夠說正確地完成了數據湖架構中的 Table Format 這一層的實現。咱們認爲它也更容易成爲 Table Format 層的開源事實標準。
另一方面,Apache Iceberg 正在朝着流批一體的數據湖存儲層發展,manifest 和snapshot 的設計,有效地隔離不一樣 transaction 的變動,很是方便批處理和增量計算。而咱們知道 Apache Flink 已是一個流批一體的計算引擎,能夠說這兩者的長遠規劃完美匹配,將來兩者將協力打造流批一體的數據湖架構。
最後,咱們還發現 Apache Iceberg 這個項目背後的社區資源很是豐富。在國外, Netflix、Apple、Linkedin、Adobe 等公司都有 PB 級別的生產數據運行在 Apache Iceberg 上;在國內,騰訊這樣的巨頭也有很是龐大的數據跑在 Apache Iceberg 之上,他們最大的一個業務天天有幾十T的增量數據寫入到 Apache Iceberg。社區成員一樣很是資深和多樣化,擁有來自其餘項目的 7 位 Apache PMC,1 爲 VP。體如今代碼和設計的 review 上,就變得很是苛刻,一個稍微大一點的 PR 涉及 100+ 的comment 很常見。在我我的看來,這些都使得 Apache Iceberg 的設計+代碼質量比較高。
正式基於以上考慮,Apache Flink 最終選擇了 Apache Iceberg 做爲第一個數據湖接入項目。
目前,咱們已經在 Apache Iceberg 0.10.0 版本上實現 Flink 流批入湖功能,同時還支持 Flink 批做業查詢 Iceberg 數據湖的數據。具體關於 Flink 如何讀寫 Apache Iceberg 表,能夠參考 Apache Iceberg 社區的使用文檔,這裏再也不贅述。
https://github.com/apache/ice...
下面來簡要闡述下 Flink iceberg sink 的設計原理:因爲 Iceberg 採用樂觀鎖的方式來實現 Transaction 的提交,也就是說兩我的同時提交更改事務到 Iceberg 時,後開始的一方會不斷重試,等先開始的一方順利提交以後再從新讀取 metadata 信息提交 transaction。考慮到這一點,採用多個併發算子去提交 transaction 是不合適的,容易形成大量事務衝突,致使重試。
因此,咱們把 Flink 寫入流程拆成了兩個算子,一個叫作 IcebergStreamWriter,主要用來寫入記錄到對應的 avro、parquet、orc 文件,生成一個對應的 Iceberg DataFile,併發送給下游算子;另一個叫作 IcebergFilesCommitter,主要用來在 checkpoint 到來時把全部的 DataFile 文件收集起來,並提交 Transaction 到 Apache iceberg,完成本次 checkpoint 的數據寫入。
理解了 Flink Sink 算子的設計後,下一個比較重要的問題就是:如何正確地設計兩個算子的 state ?
首先,IcebergStreamWriter 的設計比較簡單,主要任務是把記錄轉換成 DataFile,並無複雜的 State 須要設計。IcebergFilesCommitter 相對複雜一點,它爲每一個checkpointId 維護了一個 DataFile 文件列表,即 map<Long, List<DataFile>>,這樣即便中間有某個 checkpoint的transaction 提交失敗了,它的 DataFile 文件仍然維護在 State 中,依然能夠經過後續的 checkpoint 來提交數據到 Iceberg 表中。
Apache Iceberg 0.10.0 版本的發佈,已經拉開集成 Flink 和 Iceberg 的序幕。在將來的 Apache Iceberg 0.11.0 和 0.12.0 版本中,咱們規劃了更多高級功能及特性。
對於 Apache 0.11.0 版原本說,主要解決兩個問題:
第一個事情是小文件合併的問題,固然 Apache Iceberg 0.10.0 版本已經支持了Flink 批做業定時去合併小文件,這個功能還相對較爲初級。在 0.11.0 版本中,咱們將設計自動合併小文件功能,簡單來講就是在 Flink checkpoint 到達,觸發 Apache Iceberg transaction 提交後,有一個專門的算子,專門負責處理小文件的合併工做。
第二個事情是 Flink streaming reader 的開發,目前咱們已經在私有倉庫作了一些 PoC 工做,在將來的時間內咱們將貢獻到 Apache Iceberg 社區。
對於 0.12.0 版原本說,主要解決 row-level delete 的問題。如前面提到,咱們已經在 PR 1663 中實現 Flink UPSERT 更新數據湖的全鏈路打通。後續在社區達成一致以後,將逐步推進該功能到社區版本。到時候用戶將能經過 Flink 完成 CDC 數據的實時寫入和分析,也能夠方便地把 Flink 的聚合結果 upsert 到 Apache Iceberg 內。
做者介紹:
胡爭(子毅),阿里巴巴技術專家,目前主要負責 Flink 數據湖方案的設計和開發工做,Apache Iceberg 及 Apache Flink 項目的長期活躍貢獻者,《HBase 原理與實踐》做者。