一篇由Apache Hudi PMC Bhavani Sudha Saktheeswaran和AWS Presto團隊工程師Brandon Scheller分享Apache Hudi和Presto集成的一篇文章。git
1. 概述
Apache Hudi 是一個快速迭代的數據湖存儲系統,能夠幫助企業構建和管理PB級數據湖,Hudi經過引入upserts
、deletes
和增量查詢等原語將流式能力帶入了批處理。這些特性使得統一服務層可提供更快、更新鮮的數據。Hudi表可存儲在Hadoop兼容的分佈式文件系統或者雲上對象存儲中,而且很好的集成了 Presto, Apache Hive, Apache Spark 和Apache Impala。Hudi開創了一種新的模型(數據組織形式),該模型將文件寫入到一個更受管理的存儲層,該存儲層能夠與主流查詢引擎進行互操做,同時在項目演變方面有了一些有趣的經驗。github
本博客討論Presto和Hudi集成的演變,同時討論Presto-Hudi查詢即將到來的文件Listing和查詢計劃優化。web
2. Apache Hudi
Apache Hudi(簡稱Hudi)提供在DFS上存儲超大規模數據集,同時使得流式處理若是批處理同樣,該實現主要是經過以下兩個原語實現。數據庫
-
Update/Delete記錄: Hudi支持更新/刪除記錄,使用文件/記錄級別索引,同時對寫操做提供事務保證。查詢可獲取最新提交的快照來產生結果。 -
Change Streams: Hudi也支持增量獲取表中全部更新/插入/刪除的記錄,從指定時間點開始進行增量查詢。
上圖說明了Hudi的原語,配合這些原語能夠直接在DFS抽象之上解鎖流/增量處理功能。這和直接從Kafka Topic消費事件,而後使用狀態存儲來增量計算臨時結果相似,該架構有不少優勢。apache
-
提高效率: 攝取數據常常須要處理更新(例如CDC),刪除(法律隱私條例)以及強制主鍵約束來確保數據質量。然而因爲缺少標準工具,數據工程師每每須要使用批處理做業來從新處理成天的事件或者每次運行時從新加載上游全部數據,這會致使浪費大量的資源。因爲Hudi支持記錄級別更新,只須要從新處理表中更新/刪除的記錄,大大提高了處理效率,而無需重寫表的全部分區或事件。 -
更快的ETL/派生管道: 還有一種廣泛狀況,即一旦從外部源攝取數據,就使用Apache Spark/Apache Hive或任何其餘數據處理框架構建派生的數據管道,以便爲各類用例(如數據倉庫、機器學習功能提取,甚至僅僅是分析)構建派生數據管道。一般該過程再次依賴於以代碼或SQL表示的批處理做業,批量處理全部輸入數據並從新計算全部輸出結果。經過使用增量查詢(而不是常規快照查詢)查詢一個或多個輸入表,從而只處理來自上游表的增量更改,而後對目標派生表執行upsert或delete操做,能夠顯著加快這種數據管道的速度,如第一個圖所示。 -
更新鮮的數據訪問: 一般咱們會添加更多的資源(例如內存)來提升性能指標(例如查詢延遲)。Hudi從根本上改變了數據集的傳統管理方式,這多是大數據時代出現以來的第一次。增量地進行批處理可使得管道運行時間少得多。相比之前的數據湖,如今數據可更快地被查詢。 -
統一存儲: 基於以上三個優勢,在現有數據湖上進行更快、更輕的處理意味着不須要僅爲了得到接近實時數據的訪問而使用專門存儲或數據集市。
2.1 Hudi表和查詢類型
2.1.1 表類型
Hudi支持以下兩種類型表微信
Copy On Write (COW): 使用列式存儲格式(如parquet)存儲數據,在寫入時同步更新版本/重寫數據。架構
Merge On Read (MOR): 使用列式存儲格式(如parquet)+ 行存(如Avro)存儲數據。更新被增量寫入delta文件,後續會進行同步/異步壓縮產生新的列式文件版本。app
下表總結了兩種表類型的trade-off。框架
Trade-off | CopyOnWrite | MergeOnRead |
---|---|---|
數據延遲 | 更高 | 更低 |
更新開銷 (I/O) | 高(重寫整個parquet文件) | 更低 (寫入增量日誌文件) |
Parquet文件大小 | 更小(高update (I/0) 開銷) | 更大 (低updaet開銷) |
寫放大 | 更低 (決定與Compaction策略) |
2.1.2 查詢類型
Hudi支持以下查詢類型運維
快照查詢: 查詢給定commit/compaction的表的最新快照。對於Merge-On-Read表,經過合併基礎文件和增量文件來提供近實時數據(分鐘級);對於Copy-On-Write表,對現有Parquet表提供了一個可插拔替換,同時提供了upsert/delete和其餘特性。
增量查詢: 查詢給定commit/compaction以後新寫入的數據,可爲增量管道提供變動流。
讀優化查詢: 查詢給定commit/compaction的表的最新快照。只提供最新版本的基礎/列式數據文件,並可保證與非Hudi表相同的列式查詢性能。
下表總結了不一樣查詢類型之間的trade-off。
Trade-off | 快照 | 讀優化 |
---|---|---|
數據延遲 | 更低 | 更高 |
查詢延遲 | COW: 與parquet表相同。MOR: 更高 (合併基礎/列式文件和行存增量文件) | 與COW快照查詢有相同列式查詢性能 |
下面動畫簡單演示了插入/更新如何存儲在COW和MOR表中的步驟,以及沿着時間軸的查詢結果。其中X軸表示每一個查詢類型的時間軸和查詢結果。
注意,做爲寫操做的一部分,表的commit被徹底合併到表中。對於更新,包含該記錄的文件將使用全部已更改記錄的新值從新寫入。對於插入,優先會將記錄寫入到每一個分區路徑中最小文件,直到它達到配置的最大大小。其餘剩餘的記錄都將寫入新的文件id組中,會保證再次知足大小要求。
MOR和COW在攝取數據方面經歷了相同步驟。更新將寫入屬於最新文件版本的最新日誌(delta)文件,而不進行合併。對於插入,Hudi支持2種模式:
-
寫入log文件 - 當Hudi表可索引日誌文件(例如HBase索引和即將到來的記錄級別索引)。 -
寫入parquet文件 - 當Hudi表不能索引日誌文件(例如布隆索引)。
增量日誌文件後面經過時間軸中的壓縮(compaction)操做與基礎parquet文件合併。這種表類型是最通用、高度高級的,爲寫入提供很大靈活性(指定不一樣的壓縮策略、處理突發性寫入流量等)和查詢提供靈活性(例如權衡數據新鮮度和查詢性能)。
3. Presto
3.1 早期Presto集成方案
Hudi設計於2016年中後期。那時咱們就着手與Hadoop生態系統中的查詢引擎集成。爲了在Presto中實現這一點,正如社區建議的那樣,咱們引入了一個自定義註解@UseFileSplitsFromInputFormat
。任何註冊的Hive表(若是有此註解)都將經過調用相應的inputformat的getSplits()
方法(而不是Presto Hive原生切片加載邏輯)來獲取切片。經過Presto查詢的Hudi表,只需簡單調用HoodieParquetInputFormat.getSplits()
. 集成很是簡單隻,需將相應的Hudi jar包放到<presto_install>/plugin/hive-hadoop2/
目錄下。它支持查詢COW Hudi表,並讀取MOR Hudi表的優化查詢(只從壓縮的基本parquet文件中獲取數據)。在Uber,這種簡單的集成已經支持天天超過100000次的Presto查詢,這些查詢來自使用Hudi管理的HDFS中的100PB的數據(原始數據和模型表)。
3.2 移除InputFormat.getSplits()
調用inputformat.getSplits()
是個簡單的集成,可是可能會致使對NameNode的大量RPC調用,之前的集成方法有幾個缺點。
-
從Hudi返回的InputSplits不夠。Presto須要知道每一個InputSplit返回的文件狀態和塊位置。所以,對於每次切片乘以加載的分區數,這將增長2個額外的NameNode RPC調用。有時,NameNode承受很大的壓力,會觀察到背壓。 -
此外對於Presto Split計算中加載的每一個分區(每一個 loadPartition()
調用),HoodieParquetInputFormat.getSplits()
將被調用。這致使了冗餘的Hudi表元數據Listing,其實能夠被屬於從查詢掃描的表的全部分區複用。
咱們開始從新思考Presto-Hudi的整合方案。在Uber,咱們經過在Hudi上添加一個編譯時依賴項來改變這個實現,並在BackgroundHiveSplitLoader
構造函數中實例化HoodieTableMetadata
一次。而後咱們利用Hudi Api過濾分區文件,而不是調用HoodieParquetInputFormat.getSplits()
,這大大減小了該路徑中NameNode調用次數。
爲了推廣這種方法並使其可用於Presto-Hudi社區,咱們在Presto的DirectoryLister
接口中添加了一個新的API,它將接受PathFilter
對象。對於Hudi表,咱們提供了這個PathFilter對象HoodieROTablePathFilter
,它將負責過濾爲查詢Hudi表而預先列出的文件,並得到與Uber內部解決方案相同的結果。
這一變化是從0.233版本的Presto開始提供,依賴Hudi版本爲0.5.1-incubating。因爲Hudi如今是一個編譯時依賴項,所以再也不須要在plugin目錄中提供Hudi jar文件。
3.3 Presto支持查詢Hudi MOR表
咱們看到社區有愈來愈多人對使用Presto支持Hudi MOR表的快照查詢感興趣。以前Presto只支持查詢Hudi表讀優化查詢(純列式數據)。隨着該PR https://github.com/prestodb/presto/pull/14795被合入,如今Presto(0.240及後面版本)已經支持查詢MOR表的快照查詢,這將經過在讀取時合併基本文件(parquet數據)和日誌文件(avro數據)使更新鮮的數據可用於查詢。
在Hive中,這能夠經過引入一個單獨的InputFormat
類來實現,該類提供了處理切片的方法,並引入了一個新的RecordReader
類,該類能夠掃描切片以獲取記錄。對於使用Hive查詢MOR Hudi表,在Hudi中已經有相似類可用:
-
InputFormat
-org.apache.hudi.hadoop.realtime.HoodieParquetRealtimeInputFormat
-
InputSplit
-org.apache.hudi.hadoop.realtime.HoodieRealtimeFileSplit
-
RecordReader
-org.apache.hudi.hadoop.realtime.HoodieRealtimeRecordReader
在Presto中支持這一點須要理解Presto如何從Hive表中獲取記錄,並在該層中進行必要的修改。由於Presto使用其原生的ParquetPageSource
而不是InputFormat的記錄讀取器,Presto將只顯示基本Parquet文件,而不顯示來自Hudi日誌文件的實時更新,後者是avro數據(本質上與普通的讀優化Hudi查詢相同)。
爲了讓Hudi實時查詢正常工做,咱們肯定並進行了如下必要更改:
-
向可序列化HiveSplit添加額外的元數據字段以存儲Hudi切片信息。Presto-Hive將其拆分轉換爲可序列化的HiveSplit以進行傳遞。由於它須要標準的切片,因此它將丟失從FileSplit擴展的複雜切片中包含的任何額外信息的上下文。咱們的第一個想法是簡單地添加整個切片做爲
HiveSplit
的一個額外的字段。但這並不起做用,由於複雜的切片不可序列化,並且還會複製基本切片數據。相反咱們添加了一個
CustomSplitConverter
接口。它接受一個自定義切片並返回一個易於序列化的String->String Map,其中包含來自自定義切片的額外數據。爲了實現這點,咱們還將此Map做爲一個附加字段添加到Presto的HiveSplit中。咱們建立了HudiRealtimeSplitConverter
來實現用於Hudi實時查詢的CustomSplitConverter
接口。 -
從HiveSplit的額外元數據從新建立Hudi切片。如今咱們已經掌握了HiveSplit中包含的自定義切片的完整信息,咱們須要在讀取切片以前識別並從新建立
HoodieRealtimeFileSplit
。CustomSplitConverter
接口還有另外一個方法,它接受普通的FileSplit和額外的split信息映射,並返回實際複雜的FileSplit,在本例中是HudiRealtimeFileSplit
。 -
使用
HoodieParquetRealtimeInputFormat
中的HoodieRealtimeRecordReader
讀取從新建立的HoodieRealtimeFileSplit
。Presto須要使用新的記錄讀取器來正確處理HudiRealtimeFileSplit
中的額外信息。爲此,咱們引入了與第一個註釋相似的另外一個註解@UseRecordReaderFromInputFormat
。這指示Presto使用Hive記錄光標(使用InputFormat
的記錄讀取器)而不是PageSource
。Hive記錄光標能夠理解從新建立的自定義切片,並基於自定義切片設置其餘信息/配置。
有了這些變動,Presto用戶即可查詢Hudi MOR表中更新鮮的數據了。
4. 下一步計劃
下面是一些頗有意思的工做(RFCs),可能也須要在Presto中支持。
RFC-12: Bootstrapping Hudi tables efficiently
ApacheHudi維護每一個記錄的元數據,使咱們可以提供記錄級別的更新、惟一的鍵語義和相似數據庫的更改流。然而這意味着,要利用Hudi的upsert和增量處理能力,用戶須要重寫整個數據集,使其成爲Hudi表。這個RFC提供了一種機制來高效地遷移他們的數據集,而不須要重寫整個數據集,同時還提供了Hudi的所有功能。
這將經過在新的引導Hudi表中引用外部數據文件(來自源表)的機制來實現。因爲數據可能駐留在外部位置(引導數據)或Hudi表的basepath(最近的數據)下,FileSplits將須要在這些位置上存儲更多的元數據。這項工做還將利用並創建在咱們當前添加的Presto MOR查詢支持之上。
支持Hudi表增量和時間點時間旅行查詢
增量查詢容許咱們從源Hudi表中提取變動日誌。時間點查詢容許在時間T1和T2之間獲取Hudi表的狀態。這些已經在Hive和Spark中獲得支持。咱們也在考慮在Presto中支持這個特性。
在Hive中,經過在JobConf
中設置一些配置來支持增量查詢,例如-query mode設置爲INCREMENTAL
、啓動提交時間和要使用的最大提交數。在Spark中有一個特定的實現來支持增量查詢—IncrementalRelation
。爲了在Presto中支持這一點,咱們須要一種識別增量查詢的方法。若是Presto不向hadoop Configuration對象傳遞會話配置,那麼最初的想法是在metastore中將同一個表註冊爲增量表。而後使用查詢謂詞獲取其餘詳細信息,如開始提交時間、最大提交時間等。
RFC-15: 查詢計劃和Listing優化
Hudi write client和Hudi查詢須要對文件系統執行listStatus
操做以得到文件系統的當前視圖。在Uber,HDFS基礎設施爲Listing作了大量優化,但對於包含數千個分區的大型數據集以及每一個分區在雲/對象存儲上有數千個文件的大型數據集來講,這多是一個昂貴的操做。上面的RFC工做旨在消除Listing操做,提供更好的查詢性能和更快的查找,只需將Hudi的時間軸元數據逐漸壓縮到表狀態的快照中。
該方案旨在解決:
-
存儲和維護最新文件的元數據 -
維護表中全部列的統計信息,以幫助在掃描以前有效地修剪文件,這能夠在引擎的查詢規劃階段使用。
爲此,Presto也須要一些變動。咱們正在積極探索在查詢規劃階段利用這些元數據的方法。這將是對Presto-Hudi集成的重要補充,並將進一步下降查詢延遲。
記錄級別索引
Upsert是Hudi表上一種流行的寫操做,它依賴於索引將傳入記錄標記爲Upsert。HoodieIndex
在分區或非分區數據集中提供記錄id到文件id的映射,實現有BloomFilters/Key ranges(用於臨時數據)和Apache HBase(用於隨機更新)支持。許多用戶發現Apache HBase(或任何相似的key-value-store-backed索引)很昂貴,而且增長了運維開銷。該工做試圖提出一種新的索引格式,用於記錄級別的索引,這是在Hudi中實現的。Hudi將存儲和維護記錄級索引(有HFile、RocksDB等可插拔存儲實現支持)。這將被writer(攝取)和reader(攝取/查詢)使用,並將顯著提升upsert性能,而不是基於join的方法,或者是用於支持隨機更新工做負載的布隆索引。這是查詢引擎在列出文件以前修剪文件時能夠利用這些信息的另外一個領域。咱們也在考慮一種在查詢時利用Presto中的元數據的方法。
5. 總結
像Presto這樣的查詢引擎是用戶瞭解Hudi優點的入口。隨着不斷增加的社區和活躍的開發路線圖,Hudi中有許多有趣的工做,因爲Hudi在上面的工做上投入了大量精力,所以只須要與Presto這樣的系統進行深度集成。爲此,咱們期待着與Presto社區合做。咱們歡迎您的建議反饋,並鼓勵您做出貢獻 ,與咱們聯繫。
推薦閱讀
Apache Hudi 異步Compaction部署方式彙總
本文分享自微信公衆號 - ApacheHudi(ApacheHudi)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。