簡介: 想要搭建流式鏈路就必須得拋棄現有的 Hive 數倉嗎?並非,藉助 Flink 能夠實現已有的 Hive 離線數倉準實時化。本文整理自 Apache Flink Committer、阿里巴巴技術專家李勁鬆的分享,文章將分析當前離線數倉實時化的難點,詳解 Flink 如何解決 Hive 流批一體準實時數倉的難題,實現更高效、合理的資源配置。數據庫
基於 Hive 的離線數倉每每是企業大數據生產系統中不可缺乏的一環。Hive 數倉有很高的成熟度和穩定性,但因爲它是離線的,延時很大。在一些對延時要求比較高的場景,須要另外搭建基於 Flink 的實時數倉,將鏈路延時下降到秒級。可是一套離線數倉加一套實時數倉的架構會帶來超過兩倍的資源消耗,甚至致使重複開發。緩存
想要搭建流式鏈路就必須得拋棄現有的 Hive 數倉嗎?並非,藉助 Flink 能夠實現已有的 Hive 離線數倉準實時化。本文整理自 Apache Flink Committer、阿里巴巴技術專家李勁鬆的分享,文章將分析當前離線數倉實時化的難點,詳解 Flink 如何解決 Hive 流批一體準實時數倉的難題,實現更高效、合理的資源配置。文章大綱以下:數據結構
上圖是一個典型的離線數倉,假設如今公司有一個需求,目前公司的數據量很大,須要天天出一個報表且輸出到業務數據庫中。首先是剛入庫的業務數據,大體分爲兩種,一種是 MySQL 的 binlog,另一種是業務系統中的業務打點,這個日誌打點信息能夠經過 Flume 等工具去採集,再離線入庫到數倉中。而後隨着業務愈來愈多,業務中的各個表能夠作一些抽象,抽象的好處是更好的管理和更高效的數據複用和計算複用。因此數倉就分紅了多層 (明細層、中間層、服務層等等),每一層存的是數據表,數據表之間經過 HiveSQL 的計算來實現 ETL 轉換。架構
不止是 HiveSQL ,Hive 只是靜態的批計算,而業務天天都要出報表,這意味着天天都要進行計算,這種狀況下會依賴於調度工具和血緣管理:併發
當任務十分龐大的時候,咱們得出結果每每須要很長的一段時間,也就是咱們常說的 T+1,H+1 ,這就是離線數倉的問題。框架
上面說過,離線數倉不只僅是簡單的 Hive 計算,它還依賴了其它的第三方工具,好比:運維
不管是離線數倉仍是第三方工具,其實主要的問題仍是「慢」,如何解決慢的問題,此時就該實時數倉出場了。分佈式
實時數倉實際上是從 Hive+HDFS 的組合換成了 Kafka,ETL 的功能經過 Flink 的流式處理解決。此時就不存在調度和血緣管理的問題了,經過實時不斷的增量更新,最終輸出到業務的 DB 中。工具
雖然延時下降了,但此時咱們會面臨另一些問題:學習
因此此時不少人就會選擇一套實時一套離線的作法,互不干擾,根據任務是否須要走實時的需求來對需求進行分離。
這套架構看似解決了全部問題,但實際帶來的問題也是很是多。首先,Lambda 架構形成了離線和實時的割裂問題,它們解決的業務問題都是同樣的,可是兩套方案讓一樣的數據源產生了不一樣的計算結果。不一樣層級的表結構可能不一致,而且當數據產生不一致的問題時,還須要去進行比對排查。
隨着這套 Lambda 架構越走越遠,開發團隊、表結構表依賴、計算模型等均可能會被割裂開,越到後面越會發現,成本愈來愈高,而統一的代價愈來愈大。
那麼問題來了,實時數倉會耗費如此大的資源,且還不能保留歷史數據,Lambda 架構存在如此多的問題,有什麼方案能夠解決呢?
數據湖擁有很多的優勢,原子性可讓咱們作到準實時的批流一體,而且支持已有數據的修改操做。可是畢竟數據湖是新一代數倉存儲架構,各方面都還不是很完美,目前已有的數據湖都強依賴於 Spark(固然 Flink 也正在擁抱數據湖),將數據遷移到數據湖須要團隊對遷移成本和人員學習成本進行考量。
若是沒有這麼大的決心遷移數據湖,那有沒有一個稍微緩和一些的方案加速已有的離線數倉呢?
Flink 一直持續致力於離線和實時的統一,首先是統一元數據。簡單來講就是把 Kafka 表的元數據信息存儲到 HiveMetaStore 中,作到離線和實時的表 Meta 的統一。(目前開源的實時計算並無一個較爲完善的持久化 MetaStore,Hive MetaStore 不只能保存離線表,也能夠承擔實時計算的 MetaStore 能力)。
一樣的元數據以後,實時和離線的表結構和層次能夠設計成同樣,接下來就是能夠共用:
分析了元數據和計算引擎的統一,更進一步,是否能統一實時和離線的數據,避免數據的不一致,避免數據的重複存儲和重複計算。ETL 計算是否能統一呢?既然實時表設計上能夠和離線表如出一轍,是否能夠乾脆只有實時表的 ETL 計算,離線表從實時表裏獲取數據?
而且,經過實時鏈路能夠加速離線鏈路的數據準備,批計算能夠把調度換成流輸入。
Flink Hive/File Streaming Sink 即爲解決這個問題,實時 Kafka 表能夠實時的同步到對於的離線表中:
此時離線的批計算也能夠交由實時調度,在實時任務處理中某個契機 (Partition Commit 見後續) 自行調度離線那塊的任務進行數據同步操做。
此時實時和離線的表已經基本統一,那麼問題來了,Kafka 中的表和 Hive 中的表可否就共用一張表呢?個人想法是以後可能會出現如下狀況,在數倉中定義一張表,分別對應着 Kafka 和 Hive+HDFS 兩種物理存儲:
Flink 1.11 前已經有了 StreamingFileSink,在 1.11 中不但把它集成到 SQL 中,讓這個 Hive Streaming Sink 能夠像離線的 Hive SQL 那樣,全部的業務邏輯都由 SQL 去處理,並且帶來了進一步的增量。
接下來介紹下 Hive/File Streaming Sink,分爲兩個組件,FileWriter 和 PartitionCommitter:
由於流式做業是不間斷的在運行的,如何設置分區提交的時間,某個分區何時提交它呢?
若是當前時間 Current time > 分區產生的時間 + commitDelay 延時,便是能夠開始進行分區提交的時間。一個簡單的例子是小時分區,好比當前已經 12 點過 1 分了,已通過了 11 點的分區 + 一個小時,因此咱們能夠說不會再有 11 點分區的數據過來了,就能夠提交 11 點的分區。(要是有 LateEvent 怎麼辦?因此也要求分區的提交是冪等的。)
接下來介紹分區的提交具體做用,最直接的就是寫 SuccessFile 和 Add partition 到 Hive metastore。
Flink 內置支持了 Hive-MetaStore 和 SuccessFile,只要配置"sink.partition-commit.policy.kind" 爲 "metastore,success-file",便可作到在 commit 分區的時候自動 add 分區到 Hive 中,並且寫 SuccessFile,當 add 操做完成的時候,這個 partition 才真正的對 Hive 可見。
Custom 機制容許自定義一個 Partition Commit Policy 的類,實現這個類能夠作到在這個分區的任務處理完成後:好比觸發下游的調度、Statistic Analysis、又或者觸發 Hive 的小文件合併。(固然觸發 Hive 的小文件合併不但須要啓動另外一個做業,並且作不到一致性保證,後續 Flink 也會有進一步的探索,在 Flink 做業中,主動完成小文件的合併)。
不止是準實時的數據攝入,Flink 也帶來了維表關聯 Hive 表和流實時消費 Hive 表。
咱們知道 Flink 是支持維表關聯查詢 MySQL 和 HBase 的,在計算中維護一個 LRU 的緩存,未命中查詢 MySQL 或 HBase。可是沒有 Lookup 的能力怎麼辦呢?數據通常是放在離線數倉中的,因此業務上咱們通常採用 Hive Table 按期同步到 HBase 或者 MySQL。Flink 也能夠容許直接維表關聯 Hive 表,目前的實現很簡單,須要在每一個併發中全量 Load Hive 表的全部數據,只能針對小表的關聯。
傳統的 Hive Table 只支持按照批的方式進行讀取計算,可是咱們如今可使用流的方式來監控 Hive 裏面的分區 / 文件生成,也就是每一條數據過來,均可以實時的進行消費計算,它也是徹底複用 Flink Streaming SQL 的方式,能夠和 HBase、MySQL、Hive Table 進行 Join 操做,最後再經過 FileWriter 實時寫入到 Hive Table 中。
案例以下:經過 Flume 採集日誌打點 Logs,計算各年齡層的 PV,此時咱們存在兩條鏈路:
這裏就是咱們剛剛提到的,雖然是對應兩個 database:realtime_db 和 offline_db,可是它們共用一份元數據。
對於 Hive 表咱們能夠經過 Flink SQL 提供的 Hive dialect 語法,而後經過 Hive 的 DDL 語法來在 Flink 中建立 Hive 表,這裏設置 PARTITION BY 天和小時,是與實時鏈路的不一樣之處,由於實時鏈路是沒有分區概念的。
如何在表結構裏避免分區引發的 Schema 差別?一個能夠解決的方案是考慮引入 Hidden Partition 的定義,Partition 的字段能夠是某個字段的 Computed Column,這也能夠與實際常見的狀況作對比,如天或小時是由時間字段計算出的,以後是下面的三個參數:
以後設置回默認的 Flink dialect,建立 Kafka 的實時表,經過 insert into 將 Kafka 中的數據同步到 Hive 之中。
這部分是關於 Kafka 中的表如何經過 Dim join 的方式,拿到 User 表的年齡字段。圖中須要關心的是 lookup.join.cache.ttl 這個參數,咱們會將 user 這張表用相似於 broadcast 的方式,廣播到每個 task 中,可是這個過程當中可能出現 Hive 中的 table 存在更新操做,這裏的 1h 就說明,數據有效期僅爲 1 小時。建立 view 的目的是將 Dim join 所須要的 process time 加上(Dim Join 須要定義 Process time 是個不太天然的過程,後續也在考慮如何在不破壞 SQL 語義的同時,簡化 DimJoin 的語法。)
經過實時 Pipeline 的手段消費 Hive Table,而不是經過調度或者以往手動觸發的 batch 做業,第一個參數 streaming-source.enable,打開流處理機制,而後使用 start-offset 參數指定從哪一個分區 / 文件開始消費。此時,整個流批一體準實時數倉應用基本算是完成啦。
Hive 做爲分區級別管理的 Table Format 在一些方便有比較大的限制,若是是新型的 Table Format 好比 Iceberg 會有更好的支持,將來 Flink 會在下面幾個方面增強:
做者介紹:
李勁鬆,花名之信,阿里巴巴技術專家,Apache Flink Committer。2014 年起專一於阿里內部 Galaxy 流計算框架;2017 年起開始 Flink 研發,主要專一於 Batch 計算、數據結構與類型。
原文連接本文爲阿里雲原創內容,未經容許不得轉載。