一個近期由Hudi PMC & Uber Senior Engineering Manager Nishith Agarwal分享的Talk數據庫
關於Nishith Agarwal更詳細的介紹,主要從事數據方面的工做,包括攝取標準化,數據湖原語等。網絡
什麼是數據湖?數據湖是一個集中式的存儲,容許以任意規模存儲結構化和非結構化數據。你能夠存儲原始數據,而不須要先轉化爲結構化的數據,基於數據湖之上能夠運行多種類型的分析,如dashboard、大數據處理的可視化、實時分析、機器學習等。架構
接着看看對於構建PB級數據湖有哪些關鍵的要求併發
第一個要求:增量攝取(CDC)機器學習
企業中高價值的數據每每存儲在OLTP中,例以下圖中,users表包含用戶ID,國家/地區,修改時間和其餘詳細信息,但OLTP系統並未針對大批量分析進行優化,所以可能須要引入數據湖。同時一些企業採用備份在線數據庫的方式,並將其存儲到數據湖中的方法來攝取數據,但這種方式沒法擴展,同時它給上游數據庫增長了沉重的負擔,也致使數據重寫的浪費,所以須要一種增量攝取數據的方法。異步
第二個要求:Log Event去重工具
考慮分析大規模時間序列數據的場景,這些事件被寫入數據管道,而且數量很是大,可達數十億,每秒可達百萬的量。但流中可能有重複項,多是因爲至少一次(atleast-once)保證,數據管道或客戶端失敗重試處理等發送了重複的事件,若是不對日誌流進行重複處理,則對這些數據集進行的分析會有正確性問題。下圖是一個示例日誌事件流,其中事件ID爲惟一鍵,帶有事件時間和其餘有效負載。oop
第三個要求:存儲管理(自動管理DFS上文件)性能
咱們已經瞭解瞭如何攝取數據,那麼如何管理數據的存儲以擴展整個生態系統呢?其中小文件是個大問題,它們會致使查詢引擎的開銷並增長文件系統元數據的壓力。而若是寫入較大的文件,則可能致使攝取延遲增長。一種常見的策略是先攝取小文件,而後再進行合併,這種方法沒有標準,而且在某些狀況下是非原子行爲,會致使一致性問題。不管如何,當咱們寫小文件而且在合併這些文件以前,查詢性能都會受到影響。學習
第四個要求:事務寫(ACID能力)
傳統數據湖在數據寫入時的事務性方面作得不太好,但隨着愈來愈多的業務關鍵處理流程移至數據湖,狀況也在發生變化,咱們須要一種機制來原子地發佈一批數據,即僅保存有效數據,部分失敗必須回滾而不會損壞已有數據集。同時查詢的結果必須是可重複的,查詢端看不到任何部分提取的數據,任何提交的數據都必須可靠地寫入。Hudi提供了強大的ACID能力。
第五個要求:更快地派生/ETL數據(增量處理)
僅僅能快速攝取數據還不夠,咱們還須要具備計算派生數據的能力,沒有這個能力,數據工程師一般會繞過原始表來構建其派生/ETL並最終破壞整個體系結構。下面示例中,咱們看到原始付款表(貨幣未標準化)和發生貨幣轉換的派生表。
擴展此類數據管道時頗有挑戰,如僅對變動進行計算,或者基於窗口的Join的挑戰。對基礎數據集進行大規模從新處理不太可能,這會浪費計算資源。須要在數據湖上進行抽象以支持對上游表中已更改的行(數據)進行智能計算。
第六個要求:法律合規/數據刪除(更新&刪除)
近年來隨着新的數據保護法規生效,對數據保留有了嚴格的規定,須要刪除原始記錄,修復數據的正確性等,當須要在PB級數據湖中高效執行合規性時很是困難,如同大海撈針通常,須要高效的刪除,如進行索引,對掃描進行優化,將刪除記錄有效地傳播到下游表的機制。
要求回顧(彙總)
有沒有能知足上面全部需求的系統呢?接下來咱們引入Apache Hudi,HUDI表明Hadoop Upserts Deletes and Incrementals。從高層次講,HUDI容許消費數據庫和kafa事件中的變動事件,也能夠增量消費其餘HUDI數據集中的變動事件,並將其提取到存儲在Hadoop兼容,如HDFS和雲存儲中。在讀取方面,它提供3種不一樣的視圖:增量視圖,快照視圖和實時視圖。
HUDI支持2種存儲格式:「寫時複製」和「讀時合併」。
首先來看看寫時複製。以下圖所示,HUDI管理了數據集,並嘗試將一批數據寫入數據湖,HUDI維護稱爲「提交時間軸(commit timeline)」的內容,以跟蹤HUDI管理的數據集上發生的操做/更改,它在提交時間軸上標記了一個「inflight」文件,表示操做已開始,HUDI會寫2個parquet文件,而後將「inflight」文件標記爲已完成,這從原子上使該新數據寫入HUDI管理的數據集中,並可用於查詢。正如咱們提到的,RO視圖優化查詢性能,並提供parquet的基本原始列存性能,無需增長任何額外成本。
如今假設須要更新另外一批數據,HUDI在提交時間軸上標記了一個「inflight」文件,並開始合併這些更新並重寫Parquet File1。此時,因爲提交仍在進行中,所以用戶看不到正在寫入任何這些更新(這就是咱們稱爲「快照隔離」)。最終以原子方式發佈提交後,就能夠查詢版本爲C2的新合併的parquet文件。
COW已經在Uber投入運行多年,大多數數據集都位於COW存儲類型上。
儘管COW服務於咱們的大多數用例,但仍有一些因素值得咱們關注。以Uber的行程表爲例,能夠想象這多是一個很大的表,它在旅程的整個生命週期中獲取大量更新。每隔30分鐘,咱們就會得到一組新旅行以及對舊旅行的一些更新,在Hive上的旅行數據是按天劃分分區的,所以新旅行最終會在最新分區中寫入新文件,而某些更新會在舊分區中寫入文件。使用COW,咱們只能重寫那些更新所涉及的文件,而且可以高效地更新。因爲COW最終會重寫某些文件,所以能夠像合併和重寫該數據同樣快。在該用例中一般大於15分鐘。再來看另一種狀況,因爲某些業務用例(例如GDPR),必須更新大量歷史行程,這些更新涉及過去幾個月數據,從而致使很高的寫入延遲,並一遍又一遍地重寫大量數據,寫放大也會致使大量的IO。若爲工做負載分配的資源不足,可能就會嚴重損害攝取延遲。
在真實場景中,會將ETL連接在一塊兒來構建數據管道,問題會變得更加複雜。
對問題進行總結以下:在COW中,太多的更新(尤爲是雜亂的跨分區/文件)會嚴重影響提取延遲(因爲做業運行時間較長且沒法追遇上入流量),同時還會引發巨大的寫放大,從而影響HDFS(相同文件的48個版本+過多的IO)。合併更新和重寫parquet文件會限制咱們的數據的新鮮度,由於完成此類工做須要時間 = (重寫parquet文件所花費的時間*parquet文件的數量)/(並行性)。
在COW中,咱們實際上並無太大的parquet文件,由於即便只有一行更新也可能要重寫整個文件,由於Hudi會選擇寫入小於預期大小的文件。
MergeOnRead將全部這些更新分組到一個文件中,而後在稍後的時刻建立一個新版本。對於重更新的表,重寫大文件會致使開銷變大。
如何解決上述寫放大問題呢?除了將更新合併並重寫parquet文件以外,咱們將更新寫入增量文件中,這能夠幫助咱們下降攝取延遲並得到更好的新鮮度。
將更新寫入增量文件將須要在讀取端作額外的工做以便可以讀取增量文件中記錄,這意味着咱們須要構建更智能,更智能的讀取端。
首先來看看寫時複製。以下圖所示,HUDI管理了數據集,並嘗試將一批數據寫入數據湖,HUDI維護稱爲「提交時間軸(commit timeline)」的內容,以跟蹤HUDI管理的數據集上發生的操做/更改,它在提交時間軸上標記了一個「inflight」文件,表示操做已開始,HUDI會寫2個parquet文件,而後將「inflight」文件標記爲已完成,這從原子上使該新數據寫入HUDI管理的數據集中,並可用於查詢。正如咱們提到的,RO視圖優化查詢性能,並提供parquet的基本原始列存性能,無需增長任何額外成本。
如今須要進行第二次更新,與合併和重寫新的parquet文件(如在COW中同樣)不一樣,這些更新被寫到與基礎parquet文件對應的增量文件中。RO視圖繼續查詢parquet文件(過期的數據),而RealTime View(Snapshot query)會合並了parquet中的數據和增量文件中的更新,以提供最新數據的視圖。能夠看到,MOR是在查詢執行時間與較低攝取延遲之間的一個權衡。
那麼,爲何咱們要異步運行壓縮?咱們實現了MERGE_ON_READ來提升數據攝取速度,咱們但願儘快攝取較新的數據。而合併更新和建立列式文件是Hudi數據攝取的主要耗時部分。
所以咱們引入了異步Compaction步驟,該步驟能夠與數據攝取同時運行,減小數據攝取延遲。
Hudi將事務引入到了大規模數據處理中,實際上,咱們是最先這樣作的系統之一,最近,它已經過其餘項目的相似方法得到了社區承認。
Hudi支持多行多分區的原子性提交,Hudi維護一個特殊的文件夾.hoodie,在該文件夾中記錄以單調遞增的時間戳表示的操做,Hudi使用此文件夾以原子方式公開已提交的操做;發生的部分故障會透明地回滾,而且不會影響讀者和後面的寫入;Hudi使用MVCC模型將讀取與併發攝取和壓縮隔離開來;Hudi提交協議和DFS存儲保證了數據的持久寫入。
下面介紹Hudi在Uber的使用狀況
Hudi管理了超過150PB數據湖,超過10000張表,天天攝入5000億條記錄。
接着看看Hudi如何替代分析架構。利用Hudi的upsert原語,能夠在攝取到數據湖中時實現<5分鐘的新鮮度,而且能繼續得到列式數據的原始性能(parquet格式),同時使用Hudi還能夠得到實時視圖,以5-10分鐘的延遲提供dashboard,此外HUDI支持的增量視圖有助於長尾效應對數據集的突變。
爲方便用戶能快速使用Hudi,Hudi提供了一些開箱即用的工具,如HoodieDeltaStreamer,在Uber內部,HoodieDeltaStreamer用來對全球網絡進行近實時分析,可用來消費DFS/Kafka中的數據。
除了DeltaStreamer,Hudi還集成了Spark Datasource,也提供了開箱即用的能力,基於Spark,能夠快速構建ETL管道,同時也可無縫使用Hudi + PySpark。
接着介紹更高級的原語和特性。
如何從損壞的數據中恢復?例如線上因爲bug致使寫入了不正確的數據,或者上游系統將某一列的值標記爲null,Hudi也能夠很好的處理上述場景,能夠將表恢復到最近的一次正確時間,如Hudi提供的savepoint就能夠將不一樣的commit保存起來,用於後續恢復,注意MoR表暫時不支持savepoint;Hudi還提供了文件的版本號,便可以保存多個版本的文件,這對於CoW和MoR表都適用,可是會佔用一些存儲空間。
Hudi還提供便於增量ETL的高級特性,經過Spark/Spark即可以輕鬆增量拉取Hudi表的變動。
除了增量拉取,Hudi也提供了時間旅行特性,一樣經過Spark/Hive即可以輕鬆查詢指定版本的數據,其中對於Hive查詢中指定hoodie.table_name.consume.end.timestamp
也立刻會獲得支持。
下面看看對於線上的Hudi Spark做業如何調優。
下面列舉了幾個調優手段,設置Kryo序列化器,使用Shuffle Service,利用開源的profiler來進行內存調優,固然Hudi也提供了Hudi生產環境的調優配置,可參考【調優 | Apache Hudi應用調優指南】
下面介紹社區正在進行的工做,敬請期待。
即將發佈的0.6.0版本,將企業中存量的parquet表高效導入Hudi中,與傳統經過Spark讀取Parquet表而後再寫入Hudi方案相比,佔用的資源和耗時都將大幅下降。以及對於查詢計劃的O(1)時間複雜度的處理,新增列索引及統一元數據管理以消除對DFS的文件list操做。
還有一些值得關注的特性,好比支持行級別的索引,該功能將極大下降upsert的延遲;異步數據clustering以優化存儲和查詢性能;支持Presto對MoR表的快照查詢;Hudi集成Flink,經過Flink可將數據寫入Hudi數據湖。
整個分享就介紹到這裏,歡迎觀看。