Flink集成Iceberg在同程藝龍的實踐

本文由同城藝龍大數據開發工程師張軍分享,主要介紹同城藝龍 Flink 集成 Iiceberg 的生產實踐。內容包括:git

  1. 背景及痛點
  2. Flink + Iceberg 的落地
  3. Iceberg 優化實踐
  4. 後續工做
  5. 收益及總結

1、背景及痛點

業務背景

同程藝龍是一個提供機票、住宿、交通等服務的在線旅遊服務平臺,目前我所在的部門屬於公司的研發部門,主要職責是爲公司內其餘業務部門提供一些基礎服務,咱們的大數據系統主要承接的業務是部門內的一些大數據相關的數據統計、分析工做等。數據來源有網關日誌數據、服務器監控數據、K8s 容器的相關日誌數據,App 的打點日誌, MySQL 的 binlog 日誌等。咱們主要的大數據任務是基於上述日誌構建實時報表,提供基於 Presto 的報表展現和即時查詢服務,同時也會基於 Flink 開發一些實時、批處理任務,爲業務方提供準確及時的數據支撐。github

原架構方案

因爲咱們全部的原始數據都是存儲在 Kafka 的,因此原來的技術架構就是首先是 Flink 任務消費 Kafka 的數據,通過 Flink SQL 或者 Flink jar 的各類處理以後實時寫入 Hive,其中絕大部分任務都是 Flink SQL 任務,由於我認爲 SQL 開發相對代碼要簡單的多,而且維護方便、好理解,因此能用 SQL 寫的都儘可能用 SQL 來寫。
提交 Flink 的平臺使用的是 Zeppelin,其中提交 Flink SQL 任務是 Zeppelin 自帶的功能,提交 jar 包任務是我本身基於 Application 模式開發的 Zeppelin 插件。
對於落地到 Hive 的數據,使用開源的報表系統 metabase (底層使用 Presto) 提供實時報表展現、定時發送郵件報表,以及自定義 SQL 查詢服務。因爲業務對數據的實時性要求比較高,但願數據能儘快的展現出來,因此咱們不少的 Flink 流式任務的 checkpoint 設置爲 1 分鐘,數據格式採用的是 orc 格式。sql

痛點

因爲採用的是列式存儲格式 ORC,沒法像行式存儲格式那樣進行追加操做,因此不可避免的產生了一個大數據領域很是常見且很是棘手的問題,即 HDFS 小文件問題。apache

開始的時候咱們的小文件解決方案是本身寫的一個小文件壓縮工具,按期去合併,咱們的 Hive 分區通常都是天級別的,因此這個工具的原理就是天天凌晨啓動一個定時任務去壓縮昨天的數據,首先把昨天的數據寫入一個臨時文件夾,壓縮完,和原來的數據進行記錄數的比對檢驗,數據條數一致以後,用壓縮後的數據覆蓋原來的數據,可是因爲沒法保證事務,因此出現了不少問題:服務器

  • 壓縮的同時因爲延遲數據的到來致使昨天的 Hive 分區又有數據寫入了,檢驗就會失敗,致使合併小文件失敗。
  • 替換舊數據的操做是沒有事務保證的,若是替換的過程當中舊分區有新的數據寫入,就會覆蓋新寫入的數據,形成數據丟失。
  • 沒有事務的支持,沒法實時合併當前分區的數據,只能合併壓縮前一個分區的,最新的分區數據仍然有小文件的問題,致使最新數據查詢性能提升不了。

2、Flink+Iceberg 的落地

Iceberg 技術調研

因此基於以上的 HDFS 小文件、查詢慢等問題,結合咱們的現狀,我調研了目前市面上的數據湖技術:Delta、Apache Iceberg 和 Apache Hudi,考慮了目前數據湖框架支持的功能和之後的社區規劃,最終咱們是選擇了 Iceberg,其中考慮的緣由有如下幾方面:架構

■ Iceberg 深度集成 Flink

前面講到,咱們的絕大部分任務都是 Flink 任務,包括批處理任務和流處理任務,目前這三個數據湖框架,Iceberg 是集成 Flink 作的最完善的,若是採用 Iceberg 替代 Hive 以後,遷移的成本很是小,對用戶幾乎是無感知的,
好比咱們原來的 SQL 是這樣的:併發

INSERT INTO hive_catalog.db.hive_table SELECT * FROM kafka_table

遷移到 Iceberg 之後,只須要修改 catalog 就行。框架

INSERT INTO iceberg_catalog.db.iIcebergceberg_table SELECT * FROM kafka_table

Presto 查詢也是和這個相似,只須要修改 catalog 就好了。運維

■ Iceberg 的設計架構使得查詢更快

image.png

在 Iceberg 的設計架構中,manifest 文件存儲了分區相關信息、data files 的相關統計信息(max/min)等,去查詢一些大的分區的數據,就能夠直接定位到所要的數據,而不是像 Hive 同樣去 list 整個 HDFS 文件夾,時間複雜度從 O(n) 降到了 O(1),使得一些大的查詢速度有了明顯的提高,在 Iceberg PMC Chair Ryan Blue 的演講中,咱們看到命中 filter 的任務執行時間從 61.5 小時降到了 22 分鐘。jvm

■ 使用 Flink SQL 將 CDC 數據寫入 Iceberg

Flink CDC 提供了直接讀取 MySQL binlog 的方式,相對之前須要使用 canal 讀取 binlog 寫入 Iceberg,而後再去消費 Iceberg 數據。少了兩個組件的維護,鏈路減小了,節省了維護的成本和出錯的機率。而且能夠實現導入全量數據和增量數據的完美對接,因此使用 Flink SQL 將 MySQL binlog 數據導入 Iceberg 來作 MySQL->Iceberg 的導入將會是一件很是有意義的事情。

此外對於咱們最初的壓縮小文件的需求,雖然 Iceberg 目前還沒法實現自動壓縮,可是它提供了一個批處理任務,已經能知足咱們的需求。

■ Hive 表遷移 Iceberg 表

遷移準備工做

目前咱們的全部數據都是存儲在 Hive 表的,在驗證完 Iceberg 以後,咱們決定將 Hive 的數據遷移到 Iceberg,因此我寫了一個工具,可使用 Hive 的數據,而後新建一個 Iceberg 表,爲其創建相應的元數據,可是測試的時候發現,若是採用這種方式,須要把寫入 Hive 的程序中止,由於若是 Iceberg 和 Hive 使用同一個數據文件,而壓縮程序會不斷地壓縮 Iceberg 表的小文件,壓縮完以後,不會立刻刪除舊數據,因此 Hive 表就會查到雙份的數據,故咱們採用雙寫的策略,原來寫入 Hive 的程序不動,新啓動一套程序寫入 Iceberg,這樣能對 Iceberg 表觀察一段時間。還能和原來 Hive 中的數據進行比對,來驗證程序的正確性。

通過一段時間觀察,天天將近幾十億條數據、壓縮後幾個 T 大小的 Hive 表和 Iceberg 表,一條數據也不差。因此在最終對比數據沒有問題以後,把 Hive 表中止寫入,使用新的 Iceberg 表。

遷移工具

我將這個 Hive 表遷移 Iceberg 表的工具作成了一個基於 Flink batch job 的 Iceberg Action,提交了社區,不過目前還沒合併:https://github.com/apache/ice...。這個功能的思路是使用 Hive 原始的數據不動,而後新建一個 Iceberg table,再爲這個新的 Iceberg table 生成對應的元數據,你們有須要的話能夠先看看。

此外,Iceberg 社區,還有一個把現有的數據遷移到已存在的 Iceberg table 的工具,相似 Hive 的 LOAD DATA INPATH ... INTO TABLE ,是用 Spark 的存儲過程作的,你們也能夠關注下:https://github.com/apache/ice...

3、Iceberg 優化實踐

壓縮小文件

目前壓縮小文件是採用的一個額外批任務來進行的,Iceberg 提供了一個 Spark 版本的 action,我在作功能測試的時候發現了一些問題,此外我對 Spark 也不是很是熟悉,擔憂出了問題很差排查,因此參照 Spark 版本的本身實現了一個 Flink 版本,並修復了一些 bug,進行了一些功能的優化。

因爲咱們的 Iceberg 的元數據都是存儲在 Hive 中的,也就是咱們使用了 HiveCatalog,因此壓縮程序的邏輯是把 Hive 中全部的 Iceberg 表所有都查出來,依次壓縮。壓縮沒有過濾條件,無論是分區表仍是非分區表,都進行全表的壓縮,這樣作是爲了處理某些使用 eventtime 的 Flink 任務。若是有延遲的數據的到來,就會把數據寫入之前的分區,若是不是全表壓縮只壓縮當天分區的話,新寫入的其餘天的數據就不會被壓縮。

之因此沒有開啓定時任務來壓縮,是由於好比定時五分鐘壓縮一個表,若是五分鐘以內這個壓縮任務沒完成,沒有提交新的 snapshot,下一個定時任務又開啓了,就會把上一個沒有完成的壓縮任務中的數據從新壓縮一次,因此每一個表依次壓縮的策略能夠保證某一時刻一個表只有一個任務在壓縮。

代碼示例參考:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();Actions.forTable(env, table) .rewriteDataFiles() //.maxParallelism(parallelism) //.filter(Expressions.equal("day", day)) //.targetSizeInBytes(targetSizeInBytes) .execute();

目前系統運行穩定,已經完成了幾萬次任務的壓縮。

image.png

注意:
不過目前對於新發布的 Iceberg 0.11 來講,還有一個已知的 bug,即當壓縮前的文件大小大於要壓縮的大小(targetSizeInBytes)時,會形成數據丟失,其實這個問題我在最開始測試小文件壓縮的時候就發現了,而且提了一個 pr,個人策略是大於目標文件的數據文件不參與壓縮,不過這個 pr 沒有合併到 0.11 版本中,後來社區另一個兄弟也發現了相同的問題,提交了一個 pr( https://github.com/apache/ice... ) ,策略是將這個大文件拆分到目標文件大小,目前已經合併到 master,會在下一個 bug fix 版本 0.11.1 中發佈。

查詢優化

■ 批處理定時任務

目前對於定時調度中的批處理任務,Flink 的 SQL 客戶端還沒 Hive 那樣作的很完善,好比執行 hive-f 來執行一個文件。並且不一樣的任務須要不一樣的資源,並行度等。

因此我本身封裝了一個 Flink 程序,經過調用這個程序來進行處理,讀取一個指定文件裏面的 SQL,來提交批任務。在命令行控制任務的資源和並行度等。

/home/flink/bin/fFlinklinklink run -p 10 -m yarn-cluster /home/work/iceberg-scheduler.jar my.sql

■ 優化

批任務的查詢這塊,我作了一些優化工做,好比 limit 下推,filter 下推,查詢並行度推斷等,能夠大大提升查詢的速度,這些優化都已經推回給社區,而且在 Iceberg 0.11 版本中發佈。

運維管理

■ 清理 orphan 文件

  1. 定時任務刪除

在使用 Iceberg 的過程當中,有時候會有這樣的狀況,我提交了一個 Flink 任務,因爲各類緣由,把它停了,這個時候 Iceberg 還沒提交相應的快照。此外因爲一些異常致使程序失敗,會產生一些不在 Iceberg 元數據裏面的孤立的數據文件,這些文件對 Iceberg 來講是不可達的,也是沒用的。因此咱們須要像 jvm 的垃圾回收同樣來清理這些文件。

目前 Iceberg 提供了一個 Spark 版本的 action 來處理這些沒用的文件,咱們採起的策略和壓縮小文件同樣,獲取 Hive 中的全部的 Iceberg 表。每隔一個小時執行一次定時任務來刪除這些沒用的文件。

SparkSession spark = ...... Actions.forTable(spark, table) .removeOrphanFiles() //.deleteWith(...) .execute();
  1. 踩坑

咱們在程序運行過程當中出現了正常的數據文件被刪除的問題,通過調研,因爲快照保留設置是一小時,這個清理程序清理時間也是設置一個小時,經過日誌發現是這個清理程序刪除了正常的數據。查了查代碼,應該是設置了同樣的時間,在清理孤立文件的時候,有其餘程序正在讀取要 expired 的 snapshot,致使刪除了正常的數據。最後把這個清理程序的清理時間改爲默認的三天,沒有再出現刪除數據文件的問題。
固然,爲了保險起見,咱們能夠覆蓋原來的刪除文件的方法,改爲將文件到一個備份文件夾,檢查沒有問題以後,手工刪除。

■ 快照過時處理

咱們的快照過時策略,是和壓縮小文件的批處理任務寫在一塊兒的,壓縮完小文件以後,進行表的快照過時處理,目前保留的時間是一個小時。這是由於對於有一些比較大的表,分區比較多,並且 checkpoint 比較短,若是保留的快照過長的話,仍是會保留過多小文件,咱們暫時沒有查詢歷史快照的需求,因此我將快照的保留時間設置了一個小時。

long olderThanTimestamp = System.currentTimeMillis() - TimeUnit.HOURS.toMillis(1);table.expireSnapshots()// .retainLast(20).expireOlderThan(olderThanTimestamp).commit();

■ 數據管理

寫入了數據以後,當想查看相應的快照有多少數據文件時,直接查詢 Spark 沒法知道哪一個是有用的,哪一個是沒用的。因此須要有對應的管理工具。目前 Flink 這塊還不太成熟,咱們可使用 Spark3 提供的工具來查看。

  1. DDL

目前 create table 這些操做咱們是經過 Flink SQL Client 來作的。其餘相關的 DDL 的操做可使用 Spark 來作:https://iceberg.apache.org/sp...

  1. DML

一些相關的數據的操做,好比刪除數據等能夠經過 MySQL 來實現,Presto 目前只支持分區級別的刪除功能。

  1. show partitions & show create table

在咱們操做 Hive 的時候,有一些很經常使用的操做,好比 show partitions、 show create table 等,這些目前 Flink 尚未支持,因此在操做 Iceberg 的時候就很不方便,咱們本身基於 Flink 1.12 作 了修改,不過目前尚未徹底提交到社區,後續有時間會提交到 Flink 和 Iceberg 社區。

4、後續工做

  • Flink SQL 接入 CDC 數據到 Iceberg

目前在咱們內部的版本中,我已經測試經過可使用 Flink SQL 將 CDC 數據(好比 MySQL binlog)寫入 Iceberg,社區的版本中實現該功能還須要作一些工做,我也提交了一些相關的 PR 來推動這個工做。

  • 使用 SQL 進行刪除和更新

對於 copy-on-write 表,咱們可使用 Spark SQL 來進行行級的刪除和更新。具體的支持的語法能夠參考源碼中的測試類:

org.apache.iceberg.spark.extensions.TestDelete & org.apache.iceberg.spark.extensions.TestUpdate,這些功能我在測試環境測試是能夠的,可是尚未來得及更新到生產。

  • 使用 Flink SQL 進行 streaming read

在工做中會有一些這樣的場景,因爲數據比較大,Iceberg 的數據只存了較短的時間,若是很不幸由於程序寫錯了等緣由,想從更早的時間來消費就無能爲力了。
當引入了 Iceberg 的 streaming read 以後,這些問題就能夠解決了,由於 Iceberg 存儲了全部的數據,固然這裏有一個前提就是對於數據沒有要求特別精確,好比達到秒級別,由於目前 Flink 寫入 Iceberg 的事務提交是基於 Flink Checkpoint 間隔的。

5、收益及總結

通過對 Iceberg 大概一個季度的調研,測試,優化和 bug 修復,咱們將現有的 Hive 表都遷移到了 Iceberg,完美解決了原來的全部的痛點問題,目前系統穩定運行,並且相對 Hive 獲得了不少的收益:

  • Flink 寫入的資源減小

舉一個例子,默認配置下,原來一個 flink 讀取 kafka 寫入 hive 的任務,須要60個並行度纔不會讓 Kafka 產生積壓。改爲寫入 iceberg 以後,只須要20個並行度就夠了。

  • 查詢速度變快

前面咱們講到 Iceberg 查詢的時候不會像 Hive 同樣去 list 整個文件夾來獲取分區數據,而是先從 manifest 文件中獲取相關數據,查詢的性能獲得了顯著的提高,一些大的報表的查詢速度從 50 秒提升到 30 秒。

  • 併發讀寫

因爲 Iceberg 的事務支持,咱們能夠實現對一個表進行併發讀寫,Flink 流式數據實時入湖,壓縮程序同時壓縮小文件,清理過時文件和快照的程序同時清理無用的文件,這樣就能更及時的提供數據,作到分鐘級的延遲,查詢最新分區數據的速度大大加快了,而且因爲 Iceberg 的 ACID 特性能夠保證數據的準確性。

  • time travel

能夠回溯查詢之前某一時刻的數據。

總結一下,咱們目前能夠實現使用 Flink SQL 對 Iceberg 進行批、流的讀寫,並能夠對小文件進行實時的壓縮,使用 Spark SQL 作一些 delete 和 update 工做以及一些 DDL 操做,後續可使用 Flink SQL 將 CDC 的數據寫入 Iceberg。目前對 Iceberg 的全部的優化和 bug fix,我已經貢獻給社區。因爲筆者水平有限,有時候也不免有錯誤,還請你們不吝賜教。

做者介紹:張軍,同程藝龍大數據開發工程師

相關文章
相關標籤/搜索