Spark內存模型詳解

1 堆內和堆外內存規劃

Spark執行器(Executor)的內存管理創建在 JVM 的內存管理之上,Spark 對 JVM 的空間(OnHeap+Off-heap)進行了更爲詳細的分配,以充分利用內存。同時,Spark 引入了Off-heap 內存模式,使之能夠直接在工做節點的系統內存中開闢空間,進一步優化了內存的使用(能夠理解爲是獨立於JVM託管的Heap以外利用c-style的malloc從os分配到的memory。因爲再也不由JVM託管,經過高效的內存管理,能夠避免JVM object overhead和Garbage collection的開銷)。sql

運行於Executor中的Task同時可以使用JVM(OnHeap+Off-heap)和Off-heap兩種模式的內存。shell

  • JVM OnHeap內存:大小由」--executor-memory」(即 spark.executor.memory)參數指定。Executor中運行的併發任務共享JVM堆內內存。
  • JVM OffHeap內存:大小由」spark.yarn.executor.memoryOverhead」參數指定,主要用於JVM自身,字符串, NIO Buffer等開銷。
  • Off-heap模式:默認狀況下Off-heap模式的內存並不啓用,能夠經過」spark.memory.offHeap.enabled」參數開啓,並由spark.memory.offHeap.size指定堆外內存的大小(佔用的空間劃歸JVM OffHeap內存)。

---備註:咱們如今未啓用Off-heap模式的內存,所以,只介紹JVM模式的Executor內存管理。如下出現有Off-heap均爲JVM中區別於Heap的內存。緩存

---OffHeap內存:存儲通過序列化的二進制數據,Spark 能夠直接操做系統堆外內存,減小了沒必要要的內存開銷,以及頻繁的 GC 掃描和回收,提高了處理性能。堆外內存能夠被精確地申請和釋放,並且序列化的數據佔用的空間能夠被精確計算,因此相比堆內內存來講下降了管理的難度,也下降了偏差。併發

2 Executor內存劃分

2.1 Executor可用內存總量

                                                                                     Executor內存模型性能

如上圖所示,Yarn集羣管理模式中,Spark 以Executor Container的形式在NodeManager中運行,其可以使用的內存上限由「yarn.scheduler.maximum-allocation-mb」 指定, ---咱們能夠稱其爲MonitorMemory測試

如前所述,Executor的內存由Heap內存和設定的Off-heap內存組成。優化

Heap: 由「spark.executor.memory」 指定, 如下稱爲ExecutorMemory
Off-heap: 由 「spark.yarn.executor.memoryOverhead」 指定, 如下稱爲MemoryOverheadspa

所以, 對現有Yarn集羣,存在:操作系統

ExecutorMemory + MemoryOverhead <= MonitorMemory線程

若應用提交之時,指定的 ExecutorMemory與MemoryOverhead 之和大於 MonitorMemory,則會致使Executor申請失敗;若運行過程當中,實際使用內存超過上限閾值,Executor進程會被Yarn終止掉(kill)。

2.2 Heap

"spark.executor.memory"指定的內存爲JVM最大分配的堆內存("-xmx"),Spark爲了更高效的使用這部份內存,對這部份內存進行了細分,下圖(備註:此圖源於互聯網)對基於spark2(1.6+)對堆內存分配比例進行了描述:

                                                                                                                                                                                                               Heap內存模型

 

其中:

  1. Reserved Memory 保留內存,系統默認值爲300,通常無需改動,不用關心此部份內存。 但若是Executor分配的內存小於 1.5 * 300 = 450M時,Executor將沒法執行。
  2. Storage Memory 存儲內存,用於存放廣播數據及RDD緩存數據。由上圖可知,Spark 2+中,初始狀態下,Storage及Execution Memory均約佔系統總內存的30%(1 * 0.6 * 0.5 = 0.3)。在UnifiedMemory管理中,這兩部份內存能夠相互借用,爲了方便描述,咱們使用storageRegionSize來表示「spark.storage.storageFraction」。當計算內存不足時,能夠改造storageRegionSize中未使用部分,且StorageMemory須要存儲內存時也不可被搶佔; 若實際StorageMemory使用量超過storageRegionSize,那麼當計算內存不足時,能夠改造(StorageMemory – storageRegionSize)部分,而storageRegionSize部分不可被搶佔。

2.3 Java Off-heap (Memory Overhead)

Executor 中,另外一塊內存爲由「spark.yarn.executor.memoryOverhead」指定的Java Off-heap內存,此部份內存主要是建立Java Object時的額外開銷,Native方法調用,線程棧, NIO Buffer等開銷(Driect Buffer)。此部分爲用戶代碼及Spark 不可操做的內存,不足時可經過調整參數解決, 無需過多關注。 具體須要調整的場景參見本文第4節。

3 任務內存管理(Task Memory Manager)

Executor中任務以線程的方式執行,各線程共享JVM的資源,任務之間的內存資源沒有強隔離(任務沒有專用的Heap區域)。所以,可能會出現這樣的狀況:先到達的任務可能佔用較大的內存,然後到的任務因得不到足夠的內存而掛起。

在Spark任務內存管理中,使用HashMap存儲任務與其消耗內存的映射關係。每一個任務可佔用的內存大小爲潛在可以使用計算內存的1/2n – 1/n , 當剩餘內存爲小於1/2n時,任務將被掛起,直至有其餘任務釋放執行內存,而知足內存下限1/2n,任務被喚醒,其中n爲當前Executor中活躍的任務數。

任務執行過程當中,若是須要更多的內存,則會進行申請,若是,存在空閒內存,則自動擴容成功,不然,將拋出OutOffMemroyError。

---備註:潛在可以使用計算內存爲:初始計算內存+可搶佔存儲內存

4 內存調整方案

Executor中可同時運行的任務數由Executor分配的CPU的核數N 和每一個任務須要的CPU核心數C決定。其中:

  • N = spark.executor.cores
  • C = spark.task.cpus

Executor的最大任務並行度可表示爲 ==TP = N / C==. 其中,C值與應用類型有關,大部分應用使用默認值1便可,所以,影響Executor中最大任務並行度的主要因素是N.

依據Task的內存使用特徵,前文所述的Executor內存模型能夠簡單抽象爲下圖所示模型:

                                     Executor內存簡化模型

 其中,Executor 向yarn申請的總內存可表示爲: M = M1 + M2 

4.1 錯誤類型及調整方案

4.1.1 Executor OOM類錯誤 (錯誤代碼 13七、143等)

該類錯誤通常是因爲Heap(M2)已達上限,Task須要更多的內存,而又得不到足夠的內存而致使。所以,解決方案要從增長每一個Task的內存使用量,知足任務需求 或 下降單個Task的內存消耗量,從而使現有內存能夠知足任務運行需求兩個角度出發。所以:

4.1.1.1 增長單個task的內存使用量
  • 增長最大Heap值, 即 上圖中M2 的值,使每一個Task可以使用內存增長。
  • 下降Executor的可用Core的數量 N , 使Executor中同時運行的任務數減小,在總資源不變的狀況下,使每一個Task得到的內存相對增長。
4.1.1.2 下降單個Task的內存消耗量

下降單個Task的內存消耗量可從配製方式和調整應用邏輯兩個層面進行優化:

  • 配製方式:

          減小每一個Task處理的數據量,可下降Task的內存開銷,在Spark中,每一個partition對應一個處理任務Task,所以,在數據總量必定的前提下,能夠經過增長partition數量的方式來減小每一個Task處理的數據量,從而下降Task的內存開銷。針對不一樣的Spark應用類型,存在不一樣的partition調整參數以下:

  • P = spark.default.parallism (非SQL應用)
  • P = spark.sql.shuffle.partition (SQL 應用)
  • P = mapred.reduce.tasks (HiveOnSpark)

經過增長P的值,可在必定程度上使Task現有內存知足任務運行
注: 當調整一個參數不能解決問題時,上述方案應進行協同調整

---備註:若應用shuffle階段 spill嚴重,則能夠經過調整「spark.shuffle.spill.numElementsForceSpillThreshold」的值,來限制spill使用的內存大小,好比設置(2000000),該值太大不足以解決OOM問題,若過小,則spill會太頻繁,影響集羣性能,所以,要依據負載類型進行合理伸縮(此處,可設法引入動態伸縮機制,待後續處理)。

  •  調整應用邏輯:

           Executor OOM 通常發生Shuffle階段,該階段需求計算內存較大,且應用邏輯對內存需求有較大影響,下面舉例就行說明:       

  • groupByKey 轉換爲 reduceByKey

         通常狀況下,groupByKey能實現的功能使用reduceByKey都可實現,而ReduceByKey存在Map端的合併,能夠有效減小傳輸帶寬佔用及Reduce端內存消耗。

                                                                                                   選擇合適的算子

  • data skew 預處理    

          Data Skew是指任務間處理的數據量存大較大的差別。
          如左圖所示,key 爲010的數據較多,當發生shuffle時,010所在分區存在大量數據,不只拖慢Job執行(Job的執行時間由最後完成的任務決定)。 並且致使010對應Task內存消耗過多,可能致使OOM. 而右圖,通過預處理(加鹽,此處僅爲舉例說明問題,解決方法不限於此)能夠有效減小Data 

          Skew致使 的問題

 

                                                                                                         Data Skew預處理

---注:上述舉例僅爲說明調整應用邏輯能夠在必定程序上解決OOM問題,解決方法不限於上述舉例

4.1.2 Beyond…… memory, killed by yarn

出現該問題緣由是因爲實際使用內存上限超過申請的內存上限而被Yarn終止掉了, 首先說明Yarn中Container內存監控機制:

  • Container進程的內存使用量:以Container進程爲根的進程樹中全部進程的內存使用總量。
  • Container被殺死的判斷依據:進程樹總內存(物理內存或虛擬內存)使用量超過向Yarn申請的內存上限值,則認爲該Container使用內存超量,能夠被「殺死」。

所以,對該異常的分析要從是否存在子進程兩個角度出發。

a 不存在子進程

根據Container進程殺死的條件可知,在不存在子進程時,出現killed by yarn問題是於由Executor(JVM)進程自身內存超過向Yarn申請的內存總量M 所致。因爲未出現4.1.1節所述的OOM異常,所以可斷定其爲 M1 (Overhead)不足, 依據Yarn內存使用狀況有以下兩種方案:

  • 若是,M未達到Yarn單個Container容許的上限時,可僅增長M1 ,從而增長M;若是,M達到Yarn單個Container容許的上限時,增長 M1, 下降 M2.

操做方法:在提交腳本中添加 --conf spark.yarn.executor.memoryOverhead=3072(或更大的值,好比4096等) --conf spark.executor.memory = 10g 或 更小的值,注意兩者之各要小於Container監控內存量,不然伸請資源將被yarn拒絕。

  • 減小可用的Core的數量 N, 使並行任務數減小,從而減小Overhead開銷

操做方法:在提交腳本中添加 --executor-cores=3 <比原來小的值> 或 --conf spark.executor.cores=3 <比原來小的值>

b 存在子進程

Spark 應用中Container以Executor(JVM進程)的形式存在,所以根進程爲Executor對應的進程, 而Spark 應用向Yarn申請的總資源M = M1  + M 2 , 都是以Executor(JVM)進程(非進程樹)可用資源的名義申請的。申請的資源並不是一次性全量分配給JVM使用,而是先爲JVM分配初始值,隨後內存不足時再按比率不斷進行擴容,直致達到Container監控的最大內存使用量M 。當Executor中啓動了子進程(調用shell等)時,子進程佔用的內存(記爲 S) 就被加入Container進程樹,此時就會影響Executor實際可以使用內存資源(Executor進程實際可以使用資源爲:M - S),然而啓動JVM時設置的可用最大資源爲M, 且JVM進程並不會感知Container中留給本身的使用量已被子進程佔用,所以,當JVM使用量達到 M - S,還會繼續開劈內存空間,這就會致使Executor進程樹使用的總內存量大於M 而被Yarn 殺死。

典形場景有:PySpark(Spark已作內存限制,通常不會佔用過大內存)、自定義Shell調用。其解決方案:

PySpark場景:

  • 若是,M未達到Yarn單個Container容許的上限時,可僅增長M1 ,從而增長M;若是,M達到Yarn單個Container容許的上限時,增長 M1, 下降 M2.
  • 減小可用的Core的數量 N, 使並行任務數減小,從而減小Overhead開銷

自定義Shell 場景:(OverHead不足爲假象)

  • 調整子進程可用內存量,(經過單機測試,內存控制在Container監控內存之內,且爲Spark保留內存等留有空間)。操做方法同4.1.2<1>中所述
相關文章
相關標籤/搜索