基於 Flink + Hive 構建流批一體準實時數倉

簡介: 想要搭建流式鏈路就必須得拋棄現有的 Hive 數倉嗎?並非,藉助 Flink 能夠實現已有的 Hive 離線數倉準實時化。本文整理自 Apache Flink Committer、阿里巴巴技術專家李勁鬆的分享,文章將分析當前離線數倉實時化的難點,詳解 Flink 如何解決 Hive 流批一體準實時數倉的難題,實現更高效、合理的資源配置。數據庫

基於 Hive 的離線數倉每每是企業大數據生產系統中不可缺乏的一環。Hive 數倉有很高的成熟度和穩定性,但因爲它是離線的,延時很大。在一些對延時要求比較高的場景,須要另外搭建基於 Flink 的實時數倉,將鏈路延時下降到秒級。可是一套離線數倉加一套實時數倉的架構會帶來超過兩倍的資源消耗,甚至致使重複開發。緩存

想要搭建流式鏈路就必須得拋棄現有的 Hive 數倉嗎?並非,藉助 Flink 能夠實現已有的 Hive 離線數倉準實時化。本文整理自 Apache Flink Committer、阿里巴巴技術專家李勁鬆的分享,文章將分析當前離線數倉實時化的難點,詳解 Flink 如何解決 Hive 流批一體準實時數倉的難題,實現更高效、合理的資源配置。文章大綱以下:數據結構

  1. 離線數倉實時化的難點
  2. Flink 在流批一體的探索
  3. 構建流批一體準實時數倉應用實踐

離線數倉實時化的難點

離線數倉

1.jpg

上圖是一個典型的離線數倉,假設如今公司有一個需求,目前公司的數據量很大,須要天天出一個報表且輸出到業務數據庫中。首先是剛入庫的業務數據,大體分爲兩種,一種是 MySQL 的 binlog,另一種是業務系統中的業務打點,這個日誌打點信息能夠經過 Flume 等工具去採集,再離線入庫到數倉中。而後隨着業務愈來愈多,業務中的各個表能夠作一些抽象,抽象的好處是更好的管理和更高效的數據複用和計算複用。因此數倉就分紅了多層 (明細層、中間層、服務層等等),每一層存的是數據表,數據表之間經過 HiveSQL 的計算來實現 ETL 轉換。架構

不止是 HiveSQL ,Hive 只是靜態的批計算,而業務天天都要出報表,這意味着天天都要進行計算,這種狀況下會依賴於調度工具和血緣管理:併發

  • 調度工具:按照某個策略把批計算調度起來。
  • 血緣管理:一個任務是由許多個做業組合而成,可能有很是複雜的表結構層次,整個計算是一個很是複雜的拓撲,做業間的依賴關係很是複雜 (減小冗餘存儲和計算,也能夠有較好的容錯),只有當一級結束後才能進行下一級的計算。

當任務十分龐大的時候,咱們得出結果每每須要很長的一段時間,也就是咱們常說的 T+1,H+1 ,這就是離線數倉的問題。框架

第三方工具

2.jpg

上面說過,離線數倉不只僅是簡單的 Hive 計算,它還依賴了其它的第三方工具,好比:運維

  • 使用 Flume 來入庫,但存在必定的問題,首先,它的容錯可能沒法保證 Exactly-Once 效果,須要下游再次進行去重操做。其次,自定義邏輯須要經過一些手段,好比腳原本控制。第三,離線數倉並不具有良好的擴展能力,當數據劇增時,增長本來的併發數就比較困難了。
  • 基於調度工具的做業調度會帶來級聯的計算延遲,好比凌晨 1 點開始計算昨天的數據,可能須要到早上 六、7 點才能作完,而且沒法保證在設置的調度時間內數據能夠徹底 ready 。此外,級聯的計算還會帶來複雜的血緣管理問題,大任務的 Batch 計算可能會忽然打滿集羣的資源,因此也要求咱們對於負載管理進行考量,這些都會給業務增長負擔。

不管是離線數倉仍是第三方工具,其實主要的問題仍是「慢」,如何解決慢的問題,此時就該實時數倉出場了。分佈式

實時數倉

3.jpg

實時數倉實際上是從 Hive+HDFS 的組合換成了 Kafka,ETL 的功能經過 Flink 的流式處理解決。此時就不存在調度和血緣管理的問題了,經過實時不斷的增量更新,最終輸出到業務的 DB 中。工具

雖然延時下降了,但此時咱們會面臨另一些問題:學習

  • 歷史數據丟失,由於 Kafka 只是臨時的存儲介質,數據會有一個超時的時間 (好比只保存 7 天的數據),這會致使咱們的歷史數據丟失。
  • 成本相對較高,實時計算的成本要大於離線計算。

Lambda 架構

4-1.jpg

因此此時不少人就會選擇一套實時一套離線的作法,互不干擾,根據任務是否須要走實時的需求來對需求進行分離。

這套架構看似解決了全部問題,但實際帶來的問題也是很是多。首先,Lambda 架構形成了離線和實時的割裂問題,它們解決的業務問題都是同樣的,可是兩套方案讓一樣的數據源產生了不一樣的計算結果。不一樣層級的表結構可能不一致,而且當數據產生不一致的問題時,還須要去進行比對排查。

隨着這套 Lambda 架構越走越遠,開發團隊、表結構表依賴、計算模型等均可能會被割裂開,越到後面越會發現,成本愈來愈高,而統一的代價愈來愈大。

4.jpg

那麼問題來了,實時數倉會耗費如此大的資源,且還不能保留歷史數據,Lambda 架構存在如此多的問題,有什麼方案能夠解決呢?

數據湖

5.jpg

數據湖擁有很多的優勢,原子性可讓咱們作到準實時的批流一體,而且支持已有數據的修改操做。可是畢竟數據湖是新一代數倉存儲架構,各方面都還不是很完美,目前已有的數據湖都強依賴於 Spark(固然 Flink 也正在擁抱數據湖),將數據遷移到數據湖須要團隊對遷移成本和人員學習成本進行考量。

若是沒有這麼大的決心遷移數據湖,那有沒有一個稍微緩和一些的方案加速已有的離線數倉呢?

Flink 在批流一體上的探索

統一元數據

6.jpg

Flink 一直持續致力於離線和實時的統一,首先是統一元數據。簡單來講就是把 Kafka 表的元數據信息存儲到 HiveMetaStore 中,作到離線和實時的表 Meta 的統一。(目前開源的實時計算並無一個較爲完善的持久化 MetaStore,Hive MetaStore 不只能保存離線表,也能夠承擔實時計算的 MetaStore 能力)。

統一計算引擎

7.jpg

一樣的元數據以後,實時和離線的表結構和層次能夠設計成同樣,接下來就是能夠共用:

  • 同一套 SQL,Flink 自身提供批流一體的 ANSI-SQL 語法,能夠大大減少用戶 SQL 開發者和運維者的負擔,讓用戶專一於業務邏輯。
  • 同一個引擎,Flink 的流和批覆用一套優化和 Runtime 框架,現階段的大數據引擎還遠遠達不到徹底穩定的狀況,因此仍然有不少時候須要咱們去深刻的分析和優化,一套引擎可讓開發者專一單個技術棧,避免須要接觸多個技術棧,而只有技術廣度,沒有技術深度。

統一數據

8.jpg

分析了元數據和計算引擎的統一,更進一步,是否能統一實時和離線的數據,避免數據的不一致,避免數據的重複存儲和重複計算。ETL 計算是否能統一呢?既然實時表設計上能夠和離線表如出一轍,是否能夠乾脆只有實時表的 ETL 計算,離線表從實時表裏獲取數據?

而且,經過實時鏈路能夠加速離線鏈路的數據準備,批計算能夠把調度換成流輸入。

8.jpg

Flink Hive/File Streaming Sink 即爲解決這個問題,實時 Kafka 表能夠實時的同步到對於的離線表中:

  • 離線表做爲實時的歷史數據,填補了實時數倉不存在歷史數據的空缺。
  • 數據批量準實時攝入爲 Ad-hoc 查詢離線表提供了準實時輸入。

此時離線的批計算也能夠交由實時調度,在實時任務處理中某個契機 (Partition Commit 見後續) 自行調度離線那塊的任務進行數據同步操做。

此時實時和離線的表已經基本統一,那麼問題來了,Kafka 中的表和 Hive 中的表可否就共用一張表呢?個人想法是以後可能會出現如下狀況,在數倉中定義一張表,分別對應着 Kafka 和 Hive+HDFS 兩種物理存儲:

  • 用戶在進行 insert 操做時,就天然插入到了 Kafka 的實時 table 當中,同時生成另一條鏈路,自動同步到 Hive Table 當中。這樣這一張表就很是的完整,不只知足實時的需求,並且擁有歷史的數據。
  • 一個 SQL 讀取這樣的一個 Hybrid Source ,根據你的查詢語句後面的 where 條件,自動路由到 Hive 的歷史數據,或者是 Kafka 的實時數據。根據必定的規則先讀 Hive 歷史數據,再讀 Kafka 實時數據,固然這裏有一個問題,它們之間經過什麼標識來切換呢?一個想法是數據中或者 Kafka 的 Timestamp。

Hive Streaming Sink 的實現

10.jpg

Flink 1.11 前已經有了 StreamingFileSink,在 1.11 中不但把它集成到 SQL 中,讓這個 Hive Streaming Sink 能夠像離線的 Hive SQL 那樣,全部的業務邏輯都由 SQL 去處理,並且帶來了進一步的增量。

接下來介紹下 Hive/File Streaming Sink,分爲兩個組件,FileWriter 和 PartitionCommitter:

  • FileWriter 組件能夠作到分區感知,經過 checkpoint 機制能夠保證 Exactly-Once(分佈式場景是不可靠的,須要經過兩階段提交 + 文件 Rename 的冪等性),FileWriter 也提供了 Rolling 相關的參數,這個 Rolling 指的是咱們的流式處理過程,它能夠經過兩個參數來控制執行頻率,file-size 就是每一個數據流的大小,rollover-interval 就是時長間隔。可是須要注意,checkpoint 不宜設置太頻繁,以避免產生過多的小文件。
  • Partition Committer,經過一系列的業務邏輯處理後獲得的 Finished Flies 就直接可用了嗎?由於咱們典型的 Hive 表都是分區表,當一個分區就緒後,還須要通知下游,Partition 已經處理完成,能夠同步到 Hive metastore 中了。咱們須要在合適的時機來有效的 trigger 特定的 Partition commit。Partition committer 總的來講,就是完成了 Hive 分區表的數據及元數據的寫入,甚至能夠完成通知調度系統開始執行以後的 Batch 做業。

11.jpg

由於流式做業是不間斷的在運行的,如何設置分區提交的時間,某個分區何時提交它呢?

  • 第一種是默認策略 Process time ,也就是咱們所說的事件被處理時的當前系統時間,可是缺點也比較明顯,可能出現各類各樣的數據不完整。
  • 推薦策略就是 partition-time,這種策略能夠作到提交時的語義明確且數據完整,partition 字段就是由 event time ,也就是事件產生的時間所獲得的。

若是當前時間 Current time > 分區產生的時間 + commitDelay 延時,便是能夠開始進行分區提交的時間。一個簡單的例子是小時分區,好比當前已經 12 點過 1 分了,已通過了 11 點的分區 + 一個小時,因此咱們能夠說不會再有 11 點分區的數據過來了,就能夠提交 11 點的分區。(要是有 LateEvent 怎麼辦?因此也要求分區的提交是冪等的。)

12.jpg

接下來介紹分區的提交具體做用,最直接的就是寫 SuccessFile 和 Add partition 到 Hive metastore。

Flink 內置支持了 Hive-MetaStore 和 SuccessFile,只要配置"sink.partition-commit.policy.kind" 爲 "metastore,success-file",便可作到在 commit 分區的時候自動 add 分區到 Hive 中,並且寫 SuccessFile,當 add 操做完成的時候,這個 partition 才真正的對 Hive 可見。

Custom 機制容許自定義一個 Partition Commit Policy 的類,實現這個類能夠作到在這個分區的任務處理完成後:好比觸發下游的調度、Statistic Analysis、又或者觸發 Hive 的小文件合併。(固然觸發 Hive 的小文件合併不但須要啓動另外一個做業,並且作不到一致性保證,後續 Flink 也會有進一步的探索,在 Flink 做業中,主動完成小文件的合併)。

實時消費

13.jpg

不止是準實時的數據攝入,Flink 也帶來了維表關聯 Hive 表和流實時消費 Hive 表。

咱們知道 Flink 是支持維表關聯查詢 MySQL 和 HBase 的,在計算中維護一個 LRU 的緩存,未命中查詢 MySQL 或 HBase。可是沒有 Lookup 的能力怎麼辦呢?數據通常是放在離線數倉中的,因此業務上咱們通常採用 Hive Table 按期同步到 HBase 或者 MySQL。Flink 也能夠容許直接維表關聯 Hive 表,目前的實現很簡單,須要在每一個併發中全量 Load Hive 表的全部數據,只能針對小表的關聯。

傳統的 Hive Table 只支持按照批的方式進行讀取計算,可是咱們如今可使用流的方式來監控 Hive 裏面的分區 / 文件生成,也就是每一條數據過來,均可以實時的進行消費計算,它也是徹底複用 Flink Streaming SQL 的方式,能夠和 HBase、MySQL、Hive Table 進行 Join 操做,最後再經過 FileWriter 實時寫入到 Hive Table 中。

構建流批一體準實時數倉應用實踐

14.jpg

案例以下:經過 Flume 採集日誌打點 Logs,計算各年齡層的 PV,此時咱們存在兩條鏈路:

  • 一條是實時鏈路,經過輸入訪問日誌,關聯 Hive 的 User 表來計算出所須要的結果到業務 DB 中。
  • 而另外一條則是離線鏈路,咱們須要 Hive 提供小時分區表,來實現對歷史數據的 Ad-hoc 查詢。

15.jpg

這裏就是咱們剛剛提到的,雖然是對應兩個 database:realtime_db 和 offline_db,可是它們共用一份元數據。

對於 Hive 表咱們能夠經過 Flink SQL 提供的 Hive dialect 語法,而後經過 Hive 的 DDL 語法來在 Flink 中建立 Hive 表,這裏設置 PARTITION BY 天和小時,是與實時鏈路的不一樣之處,由於實時鏈路是沒有分區概念的。

如何在表結構裏避免分區引發的 Schema 差別?一個能夠解決的方案是考慮引入 Hidden Partition 的定義,Partition 的字段能夠是某個字段的 Computed Column,這也能夠與實際常見的狀況作對比,如天或小時是由時間字段計算出的,以後是下面的三個參數:

  • sink.partition-commit.trigger,指定何時進行 partition 的 commit,這裏設置了 partition-time,用於保證 exactly-once;
  • partition.time-extractor.timestamp-pattern,怎樣從 partition 中提取時間,至關於設置了一個提取格式;
  • sink.partition-commit.policy.kind,既 partition commit 所要進行的操做,也就是剛剛提到的 metastore,success-file。

以後設置回默認的 Flink dialect,建立 Kafka 的實時表,經過 insert into 將 Kafka 中的數據同步到 Hive 之中。

16.jpg

這部分是關於 Kafka 中的表如何經過 Dim join 的方式,拿到 User 表的年齡字段。圖中須要關心的是 lookup.join.cache.ttl 這個參數,咱們會將 user 這張表用相似於 broadcast 的方式,廣播到每個 task 中,可是這個過程當中可能出現 Hive 中的 table 存在更新操做,這裏的 1h 就說明,數據有效期僅爲 1 小時。建立 view 的目的是將 Dim join 所須要的 process time 加上(Dim Join 須要定義 Process time 是個不太天然的過程,後續也在考慮如何在不破壞 SQL 語義的同時,簡化 DimJoin 的語法。)

17.jpg

經過實時 Pipeline 的手段消費 Hive Table,而不是經過調度或者以往手動觸發的 batch 做業,第一個參數 streaming-source.enable,打開流處理機制,而後使用 start-offset 參數指定從哪一個分區 / 文件開始消費。此時,整個流批一體準實時數倉應用基本算是完成啦。

將來規劃

Hive 做爲分區級別管理的 Table Format 在一些方便有比較大的限制,若是是新型的 Table Format 好比 Iceberg 會有更好的支持,將來 Flink 會在下面幾個方面增強:

  • Flink Hive/File Streaming Sink 的 Auto Compaction(Merging) 能力,小文件是實時的最大阻礙之一。
  • Flink 擁抱 Iceberg,目前在社區中已經開發完畢 Iceberg Sink,Iceberg Source 正在推動中,能夠看見在不遠的未來,能夠直接將 Iceberg 當作一個消息隊列,且,它保存了全部的歷史數據,達到真正的流批統一。
  • 加強 Flink Batch 的 Shuffle,目前徹底的 Hash Shuffle 帶來了不少問題,好比小文件、隨機 IO、Buffer 管理帶來的 OOM,後續開源 Flink (1.12) 會增強力量引入 SortedShuffle 以及 ShuffleService。
  • Flink Batch BoundedStream 支持,舊的 Dataset API 已經不能知足流批統一的架構,社區 (1.12) 會在 DataStream 上提供 Batch 計算的能力。

做者介紹:

李勁鬆,花名之信,阿里巴巴技術專家,Apache Flink Committer。2014 年起專一於阿里內部 Galaxy 流計算框架;2017 年起開始 Flink 研發,主要專一於 Batch 計算、數據結構與類型。

原文連接本文爲阿里雲原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索