60TB 數據量的做業從 Hive 遷移到 Spark 在 Facebook 的實踐

Facebook 常用分析來進行數據驅動的決策。在過去的幾年裏,用戶和產品都獲得了增加,使得咱們分析引擎中單個查詢的數據量達到了數十TB。咱們的一些批處理分析都是基於 Hive 平臺(Apache Hive 是 Facebook 在2009年貢獻給社區的)和 Corona( Facebook 內部的 MapReduce 實現)進行的。Facebook 還針對包括 Hive 在內的多個內部數據存儲,繼續增長了其 Presto 的 ANSI-SQL 查詢的覆蓋範圍。Facebook 內部還支持其餘類型的分析,如圖計算、機器學習(Apache Giraph)和流處理(如 Puma、Swift 和 Stylus)。數組

儘管 Facebook 提供的服務涵蓋了分析領域的普遍領域,但咱們仍在不斷地與開源社區互動,以分享咱們的經驗,並向他人學習。Apache Spark 於2009年由加州大學伯克利分校(UC-Berkeley)的 Matei Zaharia 創辦,並於2013年貢獻給 Apache。它是目前增加最快的數據處理平臺之一,由於它可以支持流處理、批處理、命令式(RDD)、聲明式(SQL)、圖計算和機器學習用例,全部這些都在相同的 API 和底層計算引擎中。Spark 能夠有效地利用大量內存,跨整個管道(pipelines)優化代碼,並跨任務(tasks)重用 jvm 以得到更好的性能。Facebook 認爲 Spark 已經成熟到能夠在許多批處理用例中與 Hive 進行比較的地步。在本文的後面部分,將介紹 Facebook 使用 Spark 替代 Hive 的經驗和教訓。緩存

用例:爲實體排序(entity ranking)作特性準備

實時實體排名在 Facebook 有着多種使用場景。對於一些在線服務平臺,原始的特性值是使用 Hive 離線生成的,並將生成的數據加載到這些實時關聯查詢系統中。這些 Hive 做業是數年前開發的,佔用了大量的計算資源,而且難以維護,由於這些做業被拆分紅數百個 Hive 小做業。爲了使得業務可以使用到新的特徵數據,而且讓系統變得可維護,咱們開始着手將這些做業遷移到 Spark 中。性能優化

之前的 Hive 做業實現

基於 Hive 的做業由三個邏輯階段組成,每一個階段對應數百個由 entity_id 分割的較小 Hive 做業,由於爲每一個階段運行較大的 Hive 做業不太可靠,而且受到每一個做業的最大任務數限制。具體以下:app

以上三個邏輯階段能夠歸納以下:框架

  • 過濾掉非生產須要的特性和噪音;
  • 對每一個(entity_id、target_id)對進行聚合;
  • 將表分爲 N 個分片,並對每一個切分經過自定義 UDF 生成一個用於在線查詢的自定義索引文件。

基於 Hive 構建索引的做業大約須要運行三天。管理起來也頗有挑戰性,由於這條管道包含數百個分片做業,所以很難進行監控。沒有簡單的方法來衡量做業的總體進度或計算 ETA。考慮到現有 Hive 做業的上述侷限性,咱們決定嘗試使用 Spark 來構建一個更快、更易於管理的做業。機器學習

Spark 實現

若是使用 Spark 所有替換上面的做業可能會很慢,而且頗有挑戰性,須要大量的資源。因此咱們首先將焦點投入在 Hive 做業中資源最密集的部分:第二階段。咱們從50GB的壓縮輸入樣本開始,而後逐步擴展到 300 GB、1 TB 和20 TB。在每次增長大小時,咱們都解決了性能和穩定性問題,可是嘗試 20 TB 時咱們發現了最大改進的地方。jvm

在運行 20 TB 的輸入時,咱們發現因爲任務太多,生成了太多的輸出文件(每一個文件的大小大約爲100 MB)。在做業運行的10個小時中,有3個小時用於將文件從 staging 目錄移動到 HDFS 中的最終目錄。最初,咱們考慮了兩個方案:要麼改進 HDFS 中的批量重命名以支持咱們的用例;要麼配置 Spark 以生成更少的輸出文件(這一階段有大量的任務——70,000個)。通過認真思考,咱們獲得了第三種方案。因爲咱們在做業的第二步中生成的 tmp_table2 表是臨時的,而且只用於存儲做業的中間輸出。最後,咱們把上面 Hive 實現的三個階段的做業用一個 Spark 做業表示,該做業讀取 60 TB 的壓縮數據並執行 90 TB的 shuffle 和排序,最後的 Spark job 以下:工具

咱們如何擴展 Spark 來完成這項工做?

固然,在如此大的數據量上運行單個 Spark 做業在第一次嘗試甚至第十次嘗試時都不會起做用。據咱們所知,這是生產環境中 shuffle 數據量最大的 Spark 做業(Databricks 的 PB 級排序是在合成數據上進行的)。咱們對 Spark 內核和應用程序進行了大量的改進和優化,才使這項工做得以運行。這項工做的好處在於,其中許多改進都適用於 Spark 的其餘大型工做負載,而且咱們可以將全部工做從新貢獻給開源 Apache Spark 項目 - 有關更多詳細信息,請參見下面相關的 JIRA。下面咱們將重點介紹將一個實體排名做業部署到生產環境的主要改進。性能

可靠性修復(Reliability fixes)

處理節點頻繁重啓

爲了可靠地執行長時間運行的做業,咱們但願系統可以容錯並從故障中恢復(主要是因爲正常維護或軟件錯誤致使的機器從新啓動)。雖然 Spark 最初的設計能夠容忍機器重動,但咱們仍是發現了各類各樣的 bug/問題,咱們須要在系統正式投入生產以前解決這些問題。學習

  • 使得 PipedRDD 容忍節點重啓(SPARK-13793):PipedRDD 以前在處理節點重啓設計不夠健壯,當它獲取數據失敗時,這個做業就會失敗。咱們從新設計了 PipedRDD,使得它可以友好的處理這種異常,而且從這種類型的異常中恢復。
  • 最大的獲取失敗次數可配置( SPARK-13369 ):對於長期運行的做業而言,因爲計算機重動而致使獲取失敗的可能性大大增長。 在 Spark 中每一個階段容許的最大獲取失敗次數是寫死的,所以,當達到最大失敗次數時,做業一般會失敗。咱們作了一個更改,使其變得可配置,並將這個參數的值從 4 增長到 20,使得做業對於 fetch 失敗更加健壯。
  • Less disruptive cluster restart:長時間運行的做業應該可以在集羣重啓後繼續運行,這樣咱們就不會浪費到目前爲止完成的全部處理。Spark 的可重啓 shuffle service 讓咱們在節點重啓後保留 shuffle 文件。最重要的是,咱們在 Spark driver 中實現了可以暫停任務調度的功能,這樣做業就不會由於集羣重啓而致使任務失敗。

其餘可靠性修復

  • Unresponsive driver(SPARK-13279):Spark driver 添加任務會進行一項時間複雜度爲 O(N2) 的操做,這可能會致使其被卡住,最終致使做業被 killed。咱們刪除這個沒必要要的 O(N2) 操做來解決這個問題。
  • Excessive driver speculation:咱們發現,Spark driver 在管理大量任務時,會花費了大量時間進行推測(speculation)。在短時間內,在運行這個做業時咱們禁止了 speculation。咱們目前正在對 Spark Driver 進行修改,以減小 speculation 的時間。
  • TimSort issue due to integer overflow for large buffer( SPARK-13850 ):咱們發現 Spark 的 unsafe 內存操做有一個 bug,這會致使 TimSort 中的內存出現問題。不過 Databricks 的工做人員已經修復了這個問題,使咱們可以在大型內存緩衝區上進行操做。
  • Tune the shuffle service to handle large number of connections:在 shuffle 階段,咱們看到許多 executors 在試圖鏈接 shuffle service 時超時。經過增長 Netty 服務線程(spark.shuffle.io.serverThreads)和 backlog (spark.shuffle.io.backLog)的數量解決了這個問題。
  • Fix Spark executor OOM( SPARK-13958 ):一開始在每一個節點上運行四個以上的 reduce 任務是頗有挑戰性的。Spark executors 的內存不足,由於 sorter 中存在一個 bug,該 bug 會致使指針數組無限增加。咱們經過在指針數組沒有更多可用內存時強制將數據溢寫到磁盤來修復這個問題。所以,如今咱們能夠在一個節點上運行 24個任務而不會致使內存不足。

性能提高

在實現了上述可靠性改進以後,咱們可以可靠地運行 Spark 做業。此時,咱們將工做重心轉移到與性能相關的問題上,以最大限度地利用 Spark。咱們使用Spark 的指標和 profilers 來發現一些性能瓶頸。

咱們用來發現性能瓶頸的工具

  • Spark UI Metrics:Spark UI 能夠很好地洞察特定階段的時間花在哪裏。每一個任務的執行時間被劃分爲子階段,以便更容易地找到做業中的瓶頸。
  • Jstack:Spark UI 中還提供 executor 進程的 jstack 功能,這個能夠幫助咱們找到代碼中的熱點問題。
  • Spark Linux Perf/Flame Graph support:儘管上面的兩個工具很是方便,但它們並無提供同時運行在數百臺機器上做業的 CPU 概要的聚合視圖。在每一個做業的基礎上,咱們增長了對性能分析的支持,而且能夠定製採樣的持續時間/頻率。

性能優化

  • Fix memory leak in the sorter(SPARK-14363)性能提高 30%:咱們發現當任務釋放全部內存頁,但指針數組沒有被釋放。結果,大量內存未被使用,致使頻繁溢出和 executor OOMs。如今,咱們修復了這個問題,這個功能使得 CPU 性能提升了30%;
  • Snappy optimization ( SPARK-14277 )性能提高 10%:對於每一行的讀/寫,都會調用 JNI 方法(Snappy.ArrayCopy)。咱們發現了這個問題,而且將這個調用修改爲非 JNI 的System.ArrayCopy調用,修改完以後 CPU 性能提升了10%;
  • Reduce shuffle write latency(SPARK-5581)性能提高近 50%:在 map 端,當將 shuffle 數據寫入磁盤時,map 任務的每一個分區打開和關閉相同的文件。咱們修復了這個問題,以免沒必要要的打開/關閉,修改完以後 CPU 性能提升近 50%;
  • Fix duplicate task run issue due to fetch failure (SPARK-14649):當獲取失敗(fetch failure)發生時,Spark driver 會從新提交已經運行的任務,這會致使性能低下。咱們經過避免從新運行正在運行的任務修復了這個問題,而且咱們發現當發生獲取操做失敗時,做業也更加穩定。
  • Configurable buffer size for PipedRDD(SPARK-14542)性能提高近 10%:在使用 PipedRDD 時,咱們發現用於將數據從排序器(sorter)傳輸到管道處理的默認緩衝區大小過小,咱們的做業花費了超過 10% 的時間來複制數據。咱們使這個緩衝區大小變得可配置,以免這個瓶頸。
  • Cache index files for shuffle fetch speed-up(SPARK-15074):咱們發現,shuffle service 常常成爲瓶頸,reduce 端花費 10% 到 15% 的時間來等待獲取 map 端的數據。經過更深刻的研究這個問題,咱們發現 shuffle service 爲每次 shuffle fetch 都須要打開/關閉 shuffle index 文件。咱們經過緩存索引信息,這樣咱們就能夠避免重複打開/關閉文件,這一變化減小了50%的 shuffle fetch 時間;
  • Reduce update frequency of shuffle bytes written metrics(SPARK-15569)性能提高近 20%:使用 Spark Linux Perf 集成,咱們發現大約 20% 的 CPU 時間花在探測和更新隨機字節寫的指標上。
  • Configurable initial buffer size for Sorter( SPARK-15958 )性能提高近 5%:Sorter 的默認初始緩衝區大小過小(4 KB),對於大的工做負載來講這個值過小了,所以咱們浪費了大量的時間來複制內容。咱們將這個緩衝區大小變得可配置(備註:spark.shuffle.sort.initialBufferSize), 當將這個參數設置爲 64 MB 時,能夠避免大量的數據複製,使得性能提高近 5%;
  • Configuring number of tasks:因爲咱們輸入的數據大小爲 60 T,每一個 HDFS 塊大小爲 256 M,所以咱們要生成超過250,000個任務。儘管咱們可以運行具備如此多任務的 Spark 做業,但咱們發現,當任務數量太高時,性能會顯著降低。咱們引入了一個配置參數,使 map 輸入大小可配置,咱們經過將輸入的 split 大小設置爲 2 GB ,使得 task 的數據減小了八倍。

在全部這些可靠性和性能改進以後,咱們的實體排名系統變成了一個更快、更易於管理的管道,而且咱們提供了在 Spark 中運行其餘相似做業的能力。

使用 Spark 和 Hive 運行上面實體排名程序性能比較

咱們使用如下性能指標來比較 Spark 和 Hive 運行性能。

CPU time:這是從操做系統的角度來看 CPU 使用狀況。例如,若是您的做業在32核機器上僅運行一個進程,使用全部 CPU 的50%持續10秒,那麼您的 CPU 時間將是 32 0.5 10 = 160 CPU 秒。

CPU reservation time:從資源管理框架的角度來看,這是 CPU 預留(CPU reservation)。例如,若是咱們將32核機器預留10秒來運行這個做業,那麼 CPU 預留時間是 32 * 10 = 320 CPU秒。CPU 時間與 CPU 預留時間的比率反映了咱們集羣預留 CPU 資源的狀況。準確地說,當運行相同的工做負載時,與 CPU 時間相比,預留時間能夠更好地比較執行引擎。例如,若是一個進程須要1個 CPU 秒來運行,可是必須保留100個 CPU 秒,那麼根據這個指標,它的效率低於須要10個 CPU 秒但只預留10個 CPU 秒來作相同數量的工做的進程。咱們還計算了內存預留時間,但這裏沒有列出來,由於這些數字與 CPU 預留時間相似,並且使用 Spark 和 Hive 運行這個程序時都沒有在內存中緩存數據。Spark 有能力在內存中緩存數據,但因爲集羣內存的限制,咱們並無使用這個功能。

Latency:做業從開始到結束運行時間。

結論和將來工做

Facebook 使用高性能和可擴展的分析引擎來幫助產品開發。Apache Spark 提供了將各類分析用例統一到單個 API ,而且提供了高效的計算引擎。咱們將分解成數百個 Hive 做業管道替換爲一個 Spark 做業,經過一系列的性能和可靠性改進,咱們可以使用 Spark 來處理生產中的實體數據排序的用例。在這個特殊的用例中,咱們展現了 Spark 能夠可靠地 shuffle 並排序 90 TB 以上的中間數據,並在一個做業中運行 250,000個 tasks。與舊的基於 Hive 計算引擎管道相比,基於 Spark 的管道產生了顯著的性能改進(4.5-6倍 CPU性能提高、節省了 3-4 倍資源的使用,並下降了大約5倍的延遲),而且已經在生產環境中運行了幾個月。


本文做者:過往記憶大數據

閱讀原文

本文爲阿里雲內容,未經容許不得轉載。

相關文章
相關標籤/搜索