60TB 數據量的作業從 Hive 遷移到 Spark 在 Facebook 的實踐

Facebook 經常使用分析來進行數據驅動的決策。在過去的幾年裏,用戶和產品都得到了增長,使得我們分析引擎中單個查詢的數據量達到了數十TB。我們的一些批處理分析都是基於 Hive 平臺(Apache Hive 是 Facebook 在2009年貢獻給社區的)和 Corona( Facebook 內部的 MapReduce 實現)進行的。Facebook 還針對包括 Hive 在內的多個內部數據存儲,繼續增加了其 Presto 的 ANSI-SQL 查詢的覆蓋範圍。Facebook 內部還支持其他類型的分析,如圖計算、機器學習(Apache Giraph)和流處理(如 Puma、Swift 和 Stylus)。

儘管 Facebook 提供的服務涵蓋了分析領域的廣泛領域,但我們仍在不斷地與開源社區互動,以分享我們的經驗,並向他人學習。Apache Spark 於2009年由加州大學伯克利分校(UC-Berkeley)的 Matei Zaharia 創辦,並於2013年貢獻給 Apache。它是目前增長最快的數據處理平臺之一,因爲它能夠支持流處理、批處理、命令式(RDD)、聲明式(SQL)、圖計算和機器學習用例,所有這些都在相同的 API 和底層計算引擎中。Spark 可以有效地利用大量內存,跨整個管道(pipelines)優化代碼,並跨任務(tasks)重用 jvm 以獲得更好的性能。Facebook 認爲 Spark 已經成熟到可以在許多批處理用例中與 Hive 進行比較的地步。在本文的後面部分,將介紹 Facebook 使用 Spark 替代 Hive 的經驗和教訓。

用例:爲實體排序(entity ranking)做特性準備

實時實體排名在 Facebook 有着多種使用場景。對於一些在線服務平臺,原始的特性值是使用 Hive 離線生成的,並將生成的數據加載到這些實時關聯查詢系統中。這些 Hive 作業是數年前開發的,佔用了大量的計算資源,並且難以維護,因爲這些作業被拆分成數百個 Hive 小作業。爲了使得業務能夠使用到新的特徵數據,並且讓系統變得可維護,我們開始着手將這些作業遷移到 Spark 中。


以前的 Hive 作業實現

基於 Hive 的作業由三個邏輯階段組成,每個階段對應數百個由 entity_id 分割的較小 Hive 作業,因爲爲每個階段運行較大的 Hive 作業不太可靠,並且受到每個作業的最大任務數限制。具體如下:


以上三個邏輯階段可以概括如下:

  • 過濾掉非生產需要的特性和噪音;

  • 對每個(entity_id、target_id)對進行聚合;

  • 將表分爲 N 個分片,並對每個切分通過自定義 UDF 生成一個用於在線查詢的自定義索引文件。

基於 Hive 構建索引的作業大約需要運行三天。管理起來也很有挑戰性,因爲這條管道包含數百個分片作業,因此很難進行監控。沒有簡單的方法來衡量作業的整體進度或計算 ETA。考慮到現有 Hive 作業的上述侷限性,我們決定嘗試使用 Spark 來構建一個更快、更易於管理的作業。

Spark 實現

如果使用 Spark 全部替換上面的作業可能會很慢,並且很有挑戰性,需要大量的資源。所以我們首先將焦點投入在 Hive 作業中資源最密集的部分:第二階段。我們從50GB的壓縮輸入樣本開始,然後逐步擴展到 300 GB、1 TB 和20 TB。在每次增加大小時,我們都解決了性能和穩定性問題,但是嘗試 20 TB 時我們發現了最大改進的地方。

在運行 20 TB 的輸入時,我們發現由於任務太多,生成了太多的輸出文件(每個文件的大小大約爲100 MB)。在作業運行的10個小時中,有3個小時用於將文件從 staging 目錄移動到 HDFS 中的最終目錄。最初,我們考慮了兩個方案:要麼改進 HDFS 中的批量重命名以支持我們的用例;要麼配置 Spark 以生成更少的輸出文件(這一階段有大量的任務——70,000個)。經過認真思考,我們得到了第三種方案。由於我們在作業的第二步中生成的 tmp_table2 表是臨時的,並且只用於存儲作業的中間輸出。最後,我們把上面 Hive 實現的三個階段的作業用一個 Spark 作業表示,該作業讀取 60 TB 的壓縮數據並執行 90 TB的 shuffle 和排序,最後的 Spark job 如下:




我們如何擴展 Spark 來完成這項工作?

當然,在如此大的數據量上運行單個 Spark 作業在第一次嘗試甚至第十次嘗試時都不會起作用。據我們所知,這是生產環境中 shuffle 數據量最大的 Spark 作業(Databricks 的 PB 級排序是在合成數據上進行的)。我們對 Spark 內核和應用程序進行了大量的改進和優化,才使這項工作得以運行。這項工作的好處在於,其中許多改進都適用於 Spark 的其他大型工作負載,並且我們能夠將所有工作重新貢獻給開源 Apache Spark 項目 - 有關更多詳細信息,請參見下面相關的 JIRA。下面我們將重點介紹將一個實體排名作業部署到生產環境的主要改進。

可靠性修復(Reliability fixes)

處理節點頻繁重啓

爲了可靠地執行長時間運行的作業,我們希望系統能夠容錯並從故障中恢復(主要是由於正常維護或軟件錯誤導致的機器重新啓動)。雖然 Spark 最初的設計可以容忍機器重動,但我們還是發現了各種各樣的 bug/問題,我們需要在系統正式投入生產之前解決這些問題。

  • 使得 PipedRDD 容忍節點重啓(SPARK-13793):PipedRDD 之前在處理節點重啓設計不夠健壯,當它獲取數據失敗時,這個作業就會失敗。我們重新設計了 PipedRDD,使得它能夠友好的處理這種異常,並且從這種類型的異常中恢復。

  • 最大的獲取失敗次數可配置( SPARK-13369 ):對於長期運行的作業而言,由於計算機重動而導致獲取失敗的可能性大大增加。 在 Spark 中每個階段允許的最大獲取失敗次數是寫死的,因此,當達到最大失敗次數時,作業通常會失敗。我們做了一個更改,使其變得可配置,並將這個參數的值從 4 增加到 20,使得作業對於 fetch 失敗更加健壯。

  • Less disruptive cluster restart:長時間運行的作業應該能夠在集羣重啓後繼續運行,這樣我們就不會浪費到目前爲止完成的所有處理。Spark 的可重啓 shuffle service 讓我們在節點重啓後保留 shuffle 文件。最重要的是,我們在 Spark driver 中實現了能夠暫停任務調度的功能,這樣作業就不會因爲集羣重啓而導致任務失敗。

其他可靠性修復

  • Unresponsive driver(SPARK-13279):Spark driver 添加任務會進行一項時間複雜度爲 O(N2) 的操作,這可能會導致其被卡住,最終導致作業被 killed。我們刪除這個不必要的 O(N2) 操作來解決這個問題。

  • Excessive driver speculation:我們發現,Spark driver 在管理大量任務時,會花費了大量時間進行推測(speculation)。在短期內,在運行這個作業時我們禁止了 speculation。我們目前正在對 Spark Driver 進行修改,以減少 speculation 的時間。

  • TimSort issue due to integer overflow for large buffer( SPARK-13850 ):我們發現 Spark 的 unsafe 內存操作有一個 bug,這會導致 TimSort 中的內存出現問題。不過 Databricks 的工作人員已經修復了這個問題,使我們能夠在大型內存緩衝區上進行操作。

  • Tune the shuffle service to handle large number of connections:在 shuffle 階段,我們看到許多 executors 在試圖連接 shuffle service 時超時。通過增加 Netty 服務線程(spark.shuffle.io.serverThreads)和 backlog (spark.shuffle.io.backLog)的數量解決了這個問題。

  • Fix Spark executor OOM( SPARK-13958 ):一開始在每個節點上運行四個以上的 reduce 任務是很有挑戰性的。Spark executors 的內存不足,因爲 sorter 中存在一個 bug,該 bug 會導致指針數組無限增長。我們通過在指針數組沒有更多可用內存時強制將數據溢寫到磁盤來修復這個問題。因此,現在我們可以在一個節點上運行 24個任務而不會導致內存不足。


性能提升

在實現了上述可靠性改進之後,我們能夠可靠地運行 Spark 作業。此時,我們將工作重心轉移到與性能相關的問題上,以最大限度地利用 Spark。我們使用Spark 的指標和 profilers 來發現一些性能瓶頸。

我們用來發現性能瓶頸的工具

  • Spark UI Metrics:Spark UI 可以很好地洞察特定階段的時間花在哪裏。每個任務的執行時間被劃分爲子階段,以便更容易地找到作業中的瓶頸。

  • Jstack:Spark UI 中還提供 executor 進程的 jstack 功能,這個可以幫助我們找到代碼中的熱點問題。

  • Spark Linux Perf/Flame Graph support:儘管上面的兩個工具非常方便,但它們並沒有提供同時運行在數百臺機器上作業的 CPU 概要的聚合視圖。在每個作業的基礎上,我們增加了對性能分析的支持,並且可以定製採樣的持續時間/頻率。

性能優化

  • Fix memory leak in the sorter(SPARK-14363)性能提升 30%:我們發現當任務釋放所有內存頁,但指針數組沒有被釋放。結果,大量內存未被使用,導致頻繁溢出和 executor OOMs。現在,我們修復了這個問題,這個功能使得 CPU 性能提高了30%;

  • Snappy optimization ( SPARK-14277 )性能提升 10%:對於每一行的讀/寫,都會調用 JNI 方法(Snappy.ArrayCopy)。我們發現了這個問題,並且將這個調用修改成非 JNI 的System.ArrayCopy調用,修改完之後 CPU 性能提高了10%;

  • Reduce shuffle write latency(SPARK-5581)性能提升近 50%:在 map 端,當將 shuffle 數據寫入磁盤時,map 任務的每個分區打開和關閉相同的文件。我們修復了這個問題,以避免不必要的打開/關閉,修改完之後 CPU 性能提高近 50%;

  • Fix duplicate task run issue due to fetch failure (SPARK-14649):當獲取失敗(fetch failure)發生時,Spark driver 會重新提交已經運行的任務,這會導致性能低下。我們通過避免重新運行正在運行的任務修復了這個問題,並且我們發現當發生獲取操作失敗時,作業也更加穩定。

  • Configurable buffer size for PipedRDD(SPARK-14542)性能提升近 10%:在使用 PipedRDD 時,我們發現用於將數據從排序器(sorter)傳輸到管道處理的默認緩衝區大小太小,我們的作業花費了超過 10% 的時間來複制數據。我們使這個緩衝區大小變得可配置,以避免這個瓶頸。

  • Cache index files for shuffle fetch speed-up(SPARK-15074):我們發現,shuffle service 經常成爲瓶頸,reduce 端花費 10% 到 15% 的時間來等待獲取 map 端的數據。通過更深入的研究這個問題,我們發現 shuffle service 爲每次 shuffle fetch 都需要打開/關閉 shuffle index 文件。我們通過緩存索引信息,這樣我們就可以避免重複打開/關閉文件,這一變化減少了50%的 shuffle fetch 時間;

  • Reduce update frequency of shuffle bytes written metrics(SPARK-15569)性能提升近 20%:使用 Spark Linux Perf 集成,我們發現大約 20% 的 CPU 時間花在探測和更新隨機字節寫的指標上。

  • Configurable initial buffer size for Sorter( SPARK-15958 )性能提升近 5%:Sorter 的默認初始緩衝區大小太小(4 KB),對於大的工作負載來說這個值太小了,因此我們浪費了大量的時間來複制內容。我們將這個緩衝區大小變得可配置(過往記憶大數據備註:spark.shuffle.sort.initialBufferSize), 當將這個參數設置爲 64 MB 時,可以避免大量的數據複製,使得性能提升近 5%;

  • Configuring number of tasks:由於我們輸入的數據大小爲 60 T,每個 HDFS 塊大小爲 256 M,因此我們要生成超過250,000個任務。儘管我們能夠運行具有如此多任務的 Spark 作業,但我們發現,當任務數量過高時,性能會顯著下降。我們引入了一個配置參數,使 map 輸入大小可配置,我們通過將輸入的 split 大小設置爲 2 GB ,使得 task 的數據減少了八倍。

在所有這些可靠性和性能改進之後,我們的實體排名系統變成了一個更快、更易於管理的管道,並且我們提供了在 Spark 中運行其他類似作業的能力。

使用 Spark 和 Hive 運行上面實體排名程序性能比較

我們使用以下性能指標來比較 Spark 和 Hive 運行性能。

CPU time:這是從操作系統的角度來看 CPU 使用情況。例如,如果您的作業在32核機器上僅運行一個進程,使用所有 CPU 的50%持續10秒,那麼您的 CPU 時間將是 32 * 0.5 * 10 = 160 CPU 秒。



CPU reservation time:從資源管理框架的角度來看,這是 CPU 預留(CPU reservation)。例如,如果我們將32核機器預留10秒來運行這個作業,那麼 CPU 預留時間是 32 * 10 = 320 CPU秒。CPU 時間與 CPU 預留時間的比率反映了我們集羣預留 CPU 資源的情況。準確地說,當運行相同的工作負載時,與 CPU 時間相比,預留時間可以更好地比較執行引擎。例如,如果一個進程需要1個 CPU 秒來運行,但是必須保留100個 CPU 秒,那麼根據這個指標,它的效率低於需要10個 CPU 秒但只預留10個 CPU 秒來做相同數量的工作的進程。我們還計算了內存預留時間,但這裏沒有列出來,因爲這些數字與 CPU 預留時間類似,而且使用 Spark 和 Hive 運行這個程序時都沒有在內存中緩存數據。Spark 有能力在內存中緩存數據,但由於集羣內存的限制,我們並沒有使用這個功能。



Latency:作業從開始到結束運行時間。




結論和未來工作

Facebook 使用高性能和可擴展的分析引擎來幫助產品開發。Apache Spark 提供了將各種分析用例統一到單個 API ,並且提供了高效的計算引擎。我們將分解成數百個 Hive 作業管道替換爲一個 Spark 作業,通過一系列的性能和可靠性改進,我們能夠使用 Spark 來處理生產中的實體數據排序的用例。在這個特殊的用例中,我們展示了 Spark 可以可靠地 shuffle 並排序 90 TB 以上的中間數據,並在一個作業中運行 250,000個 tasks。與舊的基於 Hive 計算引擎管道相比,基於 Spark 的管道產生了顯著的性能改進(4.5-6倍 CPU性能提升、節省了 3-4 倍資源的使用,並降低了大約5倍的延遲),並且已經在生產環境中運行了幾個月。


阿里巴巴開源大數據技術團隊成立Apache Spark中國技術社區,定期推送精彩案例,技術專家直播,問答區近萬人Spark技術同學在線提問答疑,只爲營造純粹的Spark氛圍,歡迎釘釘掃碼加入!

對開源大數據和感興趣的同學可以加小編微信(下圖二維碼,備註「進羣」)進入技術交流微信羣。