淺析 Spark Shuffle 內存使用

在使用 Spark 進行計算時,咱們常常會碰到做業 (Job) Out Of Memory(OOM) 的狀況,並且很大一部分狀況是發生在 Shuffle 階段。那麼在 Spark Shuffle 中具體是哪些地方會使用比較多的內存而有可能致使 OOM 呢? 爲此,本文將圍繞以上問題梳理 Spark 內存管理和 Shuffle 過程當中與內存使用相關的知識;而後,簡要分析下在 Spark Shuffle 中有可能致使 OOM 的緣由。算法

1、Spark 內存管理和消費模型

在分析 Spark Shuffle 內存使用以前。咱們首先了解下如下問題:當一個 Spark 子任務 (Task) 被分配到 Executor 上運行時,Spark 管理內存以及消費內存的大致模型是什麼樣呢?(注:因爲 OOM 主要發生在 Executor 端,因此接下來的討論主要針對 Executor 端的內存管理和使用)。
數組

1,在 Spark 中,使用抽象類 MemoryConsumer 來表示須要使用內存的消費者。在這個類中定義了分配,釋放以及 Spill 內存數據到磁盤的一些方法或者接口。具體的消費者能夠繼承 MemoryConsumer 從而實現具體的行爲。 所以,在 Spark Task 執行過程當中,會有各類類型不一樣,數量不一的具體消費者。如在 Spark Shuffle 中使用的 ExternalAppendOnlyMap, ExternalSorter 等等(具體後面會分析)。
2,MemoryConsumer 會將申請,釋放相關內存的工做交由 TaskMemoryManager 來執行。當一個 Spark Task 被分配到 Executor 上運行時,會建立一個 TaskMemoryManager。在 TaskMemoryManager 執行分配內存以前,須要首先向 MemoryManager 進行申請,而後由 TaskMemoryManager 藉助 MemoryAllocator 執行實際的內存分配。
3,Executor 中的 MemoryManager 會統一管理內存的使用。因爲每一個 TaskMemoryManager 在執行實際的內存分配以前,會首先向 MemoryManager 提出申請。所以 MemoryManager 會對當前進程使用內存的狀況有着全局的瞭解。
MemoryManager,TaskMemoryManager 和 MemoryConsumer 以前的對應關係,以下圖。整體上,一個 MemoryManager 對應着至少一個 TaskMemoryManager (具體由 executor-core 參數指定),而一個 TaskMemoryManager 對應着多個 MemoryConsumer (具體由任務而定)。 緩存

瞭解了以上內存消費的總體過程之後,有兩個問題須要注意下:
1,當有多個 Task 同時在 Executor 上執行時, 將會有多個 TaskMemoryManager 共享 MemoryManager 管理的內存。那麼 MemoryManager 是怎麼分配的呢?答案是每一個任務能夠分配到的內存範圍是 [1 / (2 * n), 1 / n],其中 n 是正在運行的 Task 個數。所以,多個併發運行的 Task 會使得每一個 Task 能夠得到的內存變小。
2,前面提到,在 MemoryConsumer 中有 Spill 方法,當 MemoryConsumer 申請不到足夠的內存時,能夠 Spill 當前內存到磁盤,從而避免無節制的使用內存。可是,對於堆內內存的申請和釋放實際是由 JVM 來管理的。所以,在統計堆內內存具體使用量時,考慮性能等各方面緣由,Spark 目前採用的是抽樣統計的方式來計算 MemoryConsumer 已經使用的內存,從而形成堆內內存的實際使用量不是特別準確。從而有可能由於不能及時 Spill 而致使 OOM。

2、Spark Shuffle 過程

總體上 Spark Shuffle 具體過程以下圖,主要分爲兩個階段:Shuffle Write 和 Shuffle Read。

Write 階段大致經歷排序(最低要求是須要按照分區進行排序),可能的聚合 (combine) 和歸併(有多個文件 spill 磁盤的狀況 ),最終每一個寫 Task 會產生數據和索引兩個文件。其中,數據文件會按照分區進行存儲,即相同分區的數據在文件中是連續的,而索引文件記錄了每一個分區在文件中的起始和結束位置。
而對於 Shuffle Read, 首先可能須要經過網絡從各個 Write 任務節點獲取給定分區的數據,即數據文件中某一段連續的區域,而後通過排序,歸併等過程,最終造成計算結果。網絡

對於 Shuffle Write,Spark 當前有三種實現,具體分別爲 BypassMergeSortShuffleWriter, UnsafeShuffleWriter 和 SortShuffleWriter (具體使用哪個實現有一個判斷條件,此處不表)。而 Shuffle Read 只有一種實現。

2.1 Shuffle Write 階段分析

2.1.1 BypassMergeSortShuffleWriter 分析

對於 BypassMergeSortShuffleWriter 的實現,大致實現過程是首先爲每一個分區建立一個臨時分區文件,數據寫入對應的分區文件,最終全部的分區文件合併成一個數據文件,而且產生一個索引文件。因爲這個過程不作排序,combine(若是須要 combine 不會使用這個實現)等操做,所以對於 BypassMergeSortShuffleWriter,整體來講是不怎麼耗費內存的。
併發

2.1.2 SortShuffleWriter 分析

SortShuffleWriter 是最通常的實現,也是平常使用最頻繁的。SortShuffleWriter 主要委託 ExternalSorter 作數據插入,排序,歸併 (Merge),聚合 (Combine) 以及最終寫數據和索引文件的工做。ExternalSorter 實現了以前提到的 MemoryConsumer 接口。下面分析一下各個過程使用內存的狀況:
1,對於數據寫入,根據是否須要作 Combine,數據會被插入到 PartitionedAppendOnlyMap 這個 Map 或者 PartitionedPairBuffer 這個數組中。每隔一段時間,當向 MemoryManager 申請不到足夠的內存時,或者數據量超過 spark.shuffle.spill.numElementsForceSpillThreshold 這個閾值時 (默認是 Long 的最大值,不起做用),就會進行 Spill 內存數據到文件。假設能夠源源不斷的申請到內存,那麼 Write 階段的全部數據將一直保存在內存中,因而可知,PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 是比較吃內存的。
2,不管是 PartitionedAppendOnlyMap 仍是 PartitionedPairBuffer, 使用的排序算法是 TimSort。在使用該算法是正常狀況下使用的臨時額外空間是很小,可是最壞狀況下是 n / 2,其中 n 表示待排序的數組長度(具體見 TimSort 實現)。
3,當插入數據由於申請不到足夠的內存將會 Spill 數據到磁盤,在將最終排序結果寫入到數據文件以前,須要將內存中的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 和已經 spill 到磁盤的 SpillFiles 進行合併。Merge 的大致過程以下圖。性能

從上圖可見,大致差很少就是歸併排序的過程,因而可知這個過程是沒有太多額外的內存消耗。歸併過程當中的聚合計算大致也是差很少的過程,惟一須要注意的是鍵值碰撞的狀況,即當前輸入的各個有序隊列的鍵值的哈希值相同,可是實際的鍵值不等的狀況。這種狀況下,須要額外的空間保存全部鍵值不一樣,但哈希值相同值的中間結果。可是整體上來講,發生這種狀況的機率並非特別大。

4,寫數據文件的過程涉及到不一樣數據流之間的轉化,而在流的寫入過程當中,通常都有緩存,主要由參數 spark.shuffle.file.buffer 和 spark.shuffle.spill.batchSize 控制,整體上這部分開銷也不大。
以上分析了 SortShuffleWriter write 階段的主要過程,從中能夠看出主要的內存消耗在寫入 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 這個階段。

2.1.3 UnsafeShuffleWriter

UnsafeShuffleWriter 是對 SortShuffleWriter 的優化,大致上也和 SortShuffleWriter 差很少,在此再也不贅述。從內存使用角度看,主要差別在如下兩點:
一方面,在 SortShuffleWriter 的 PartitionedAppendOnlyMap 或者 PartitionedPairBuffer 中,存儲的是鍵值或者值的具體類型,也就是 Java 對象,是反序列化事後的數據。而在 UnsafeShuffleWriter 的 ShuffleExternalSorter 中數據是序列化之後存儲到實際的 Page 中,並且在寫入數據過程當中會額外寫入長度信息。整體而言,序列化之後數據大小是遠遠小於序列化以前的數據。

另外一方面,UnsafeShuffleWriter 中須要額外的存儲記錄(LongArray),它保存着分區信息和實際指向序列化後數據的指針(通過編碼的Page num 以及 Offset)。相對於 SortShuffleWriter, UnsafeShuffleWriter 中這部分存儲的開銷是額外的。
優化

2.2 Shuffle Read 階段分析

Spark Shuffle Read 主要經歷從獲取數據,序列化流,添加指標統計,可能的聚合 (Aggregation) 計算以及排序等過程。大致流程以下圖。 編碼

以上計算主要都是迭代進行。在以上步驟中,比較複雜的操做是從遠程獲取數據,聚合和排序操做。接下來,依次分析這三個步驟內存的使用狀況。

1,數據獲取分爲遠程獲取和本地獲取。本地獲取將直接從本地的 BlockManager 取數據, 而對於遠程數據,須要走網絡。在遠程獲取過程當中,有相關參數能夠控制從遠程併發獲取數據的大小,正在獲取數據的請求數,以及單次數據塊請求是否放到內存等參數。具體參數包括 spark.reducer.maxSizeInFlight (默認 48M),spark.reducer.maxReqsInFlight, spark.reducer.maxBlocksInFlightPerAddress 和 spark.maxRemoteBlockSizeFetchToMem。考慮到數據傾斜的場景,若是 Map 階段有一個 Block 數據特別的大,默認狀況因爲 spark.maxRemoteBlockSizeFetchToMem 沒有作限制,因此在這個階段須要將須要獲取的整個 Block 數據放到 Reduce 端的內存中,這個時候是很是的耗內存的。能夠設置 spark.maxRemoteBlockSizeFetchToMem 值,若是超過該閾值,能夠落盤,避免這種狀況的 OOM。 另外,在獲取到數據之後,默認狀況下會對獲取的數據進行校驗(參數 spark.shuffle.detectCorrupt 控制),這個過程也增長了必定的內存消耗。
2,對於須要聚合和排序的狀況,這個過程是藉助 ExternalAppendOnlyMap 來實現的。整個插入,Spill 以及 Merge 的過程和 Write 階段差很少。整體上,這塊也是比較消耗內存的,可是由於有 Spill 操做,當內存不足時,能夠將內存數據刷到磁盤,從而釋放內存空間。

3、Spark Shuffle OOM 可能性分析

圍繞內存使用,前面比較詳細的分析了 Spark 內存管理以及在 Shuffle 過程可能使用較多內存的地方。接下來總結的要點以下:

1,首先須要注意 Executor 端的任務併發度,多個同時運行的 Task 會共享 Executor 端的內存,使得單個 Task 可以使用的內存減小。
2,不管是在 Map 仍是在 Reduce 端,插入數據到內存,排序,歸併都是比較都是比較佔用內存的。由於有 Spill,理論上不會由於數據傾斜形成 OOM。 可是,因爲對堆內對象的分配和釋放是由 JVM 管理的,而 Spark 是經過採樣獲取已經使用的內存狀況,有可能由於採樣不許確而不能及時 Spill,致使OOM。
3,在 Reduce 獲取數據時,因爲數據傾斜,有可能形成單個 Block 的數據很是的大,默認狀況下是須要有足夠的內存來保存單個 Block 的數據。所以,此時極有可能由於數據傾斜形成 OOM。 能夠設置 spark.maxRemoteBlockSizeFetchToMem 參數,設置這個參數之後,超過必定的閾值,會自動將數據 Spill 到磁盤,此時即可以免由於數據傾斜形成 OOM 的狀況。在咱們的生產環境中也驗證了這點,在設置這個參數到合理的閾值後,生產環境任務 OOM 的狀況大大減小了。
4,在 Reduce 獲取數據後,默認狀況會對數據流進行解壓校驗(參數 spark.shuffle.detectCorrupt)。正如在代碼註釋中提到,因爲這部分沒有 Spill 到磁盤操做,也有很大的可性能會致使 OOM。在咱們的生產環境中也有碰到由於檢驗致使 OOM 的狀況。spa

4、小結

本文主要圍繞內存使用這個點,對 Spark shuffle 的過程作了一個比較詳細的梳理,而且分析了可能形成 OOM 的一些狀況以及咱們在生產環境碰到的一些問題。本文主要基於做者對 Spark 源碼的理解以及實際生產過程當中遇到 OOM 案例總結而成,限於經驗等各方面緣由,不免有所疏漏或者有失偏頗。若有問題,歡迎聯繫一塊兒討論。指針

相關文章
相關標籤/搜索