Spark 做爲一個基於內存的分佈式計算引擎,其內存管理模塊在整個系統中扮演着很是重要的角色。理解 Spark 內存管理的基本原理,有助於更好地開發 Spark 應用程序和進行性能調優。本文旨在梳理出 Spark 內存管理的脈絡,拋磚引玉,引出讀者對這個話題的深刻探討。本文中闡述的原理基於 Spark 2.1 版本,閱讀本文須要讀者有必定的 Spark 和 Java 基礎,瞭解 RDD、Shuffle、JVM 等相關概念。
在執行 Spark 的應用程序時,Spark 集羣會啓動 Driver 和 Executor 兩種 JVM 進程,前者爲主控進程,負責建立 Spark 上下文,提交 Spark 做業(Job),並將做業轉化爲計算任務(Task),在各個 Executor 進程間協調任務的調度,後者負責在工做節點上執行具體的計算任務,並將結果返回給 Driver,同時爲須要持久化的 RDD 提供存儲功能[1]。因爲 Driver 的內存管理相對來講較爲簡單,本文主要對 Executor 的內存管理進行分析,下文中的 Spark 內存均特指 Executor 的內存。
0. Spark Shuffle進化史
在MapReduce框架中,shuffle是鏈接Map和Reduce之間的橋樑,Map的輸出要用到Reduce中必須通過shuffle這個環節,shuffle的性能高低直接影響了整個程序的性能和吞吐量。Spark做爲MapReduce框架的一種實現,天然也實現了shuffle的邏輯。
Shuffle是MapReduce框架中的一個特定的phase,介於Map phase和Reduce phase之間,當Map的輸出結果要被Reduce使用時,輸出結果須要按key哈希,而且分發到每個Reducer上去,這個過程就是shuffle。因爲shuffle涉及到了磁盤的讀寫和網絡的傳輸,所以shuffle性能的高低直接影響到了整個程序的運行效率。
下面這幅圖清晰地描述了MapReduce算法的整個流程,其中shuffle phase是介於Map phase和Reduce phase之間。
概念上shuffle就是一個溝通數據鏈接的橋樑,那麼實際上shuffle(partition)這一部分是如何實現的的呢,下面咱們就以Spark爲例講一下shuffle在Spark中的實現。
先以圖爲例簡單描述一下Spark中shuffle的整一個流程:
- 首先每個Mapper會根據Reducer的數量建立出相應的bucket,bucket的數量是MM×RR,其中MM是Map的個數,RR是Reduce的個數。
- 其次Mapper產生的結果會根據設置的partition算法填充到每一個bucket中去。這裏的partition算法是能夠自定義的,固然默認的算法是根據key哈希到不一樣的bucket中去。
- 當Reducer啓動時,它會根據本身task的id和所依賴的Mapper的id從遠端或是本地的block manager中取得相應的bucket做爲Reducer的輸入進行處理。
這裏的bucket是一個抽象概念,在實現中每一個bucket能夠對應一個文件,能夠對應文件的一部分或是其餘等。
Apache Spark 的 Shuffle 過程與 Apache Hadoop 的 Shuffle 過程有着諸多相似,一些概念可直接套用,例如,Shuffle 過程當中,提供數據的一端,被稱做 Map 端,Map 端每一個生成數據的任務稱爲 Mapper,對應的,接收數據的一端,被稱做 Reduce 端,Reduce 端每一個拉取數據的任務稱爲 Reducer,Shuffle 過程本質上都是將 Map 端得到的數據使用分區器進行劃分,並將數據發送給對應的 Reducer 的過程。
1. 堆內和堆外內存規劃
做爲一個 JVM 進程,Executor 的內存管理創建在 JVM 的內存管理之上,Spark 對 JVM 的堆內(On-heap)空間進行了更爲詳細的分配,以充分利用內存。同時,Spark 引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,進一步優化了內存的使用。
圖 1 . 堆內和堆外內存示意圖
1.1 堆內內存
堆內內存的大小,由 Spark 應用程序啓動時的 –executor-memory 或 spark.executor.memory 參數配置。Executor 內運行的併發任務共享 JVM 堆內內存,這些任務在緩存 RDD 數據和廣播(Broadcast)數據時佔用的內存被規劃爲存儲(Storage)內存,而這些任務在執行 Shuffle 時佔用的內存被規劃爲執行(Execution)內存,剩餘的部分不作特殊規劃,那些 Spark 內部的對象實例,或者用戶定義的 Spark 應用程序中的對象實例,均佔用剩餘的空間。不一樣的管理模式下,這三部分佔用的空間大小各不相同(下面第 2 小節會進行介紹)。
Spark 對堆內內存的管理是一種邏輯上的"規劃式"的管理,由於對象實例佔用內存的申請和釋放都由 JVM 完成,Spark 只能在申請後和釋放前記錄這些內存,咱們來看其具體流程:
申請內存:
Spark 在代碼中 new 一個對象實例
JVM 從堆內內存分配空間,建立對象並返回對象引用
Spark 保存該對象的引用,記錄該對象佔用的內存
釋放內存:
Spark 記錄該對象釋放的內存,刪除該對象的引用
等待 JVM 的垃圾回收機制釋放該對象佔用的堆內內存
咱們知道,JVM 的對象能夠以序列化的方式存儲,序列化的過程是將對象轉換爲二進制字節流,本質上能夠理解爲將非連續空間的鏈式存儲轉化爲連續空間或塊存儲,在訪問時則須要進行序列化的逆過程——反序列化,將字節流轉化爲對象,序列化的方式能夠節省存儲空間,但增長了存儲和讀取時候的計算開銷。
對於 Spark 中序列化的對象,因爲是字節流的形式,其佔用的內存大小可直接計算,而對於非序列化的對象,其佔用的內存是經過週期性地採樣近似估算而得,即並非每次新增的數據項都會計算一次佔用的內存大小,這種方法下降了時間開銷可是有可能偏差較大,致使某一時刻的實際內存有可能遠遠超出預期[2]。此外,在被 Spark 標記爲釋放的對象實例,頗有可能在實際上並無被 JVM 回收,致使實際可用的內存小於 Spark 記錄的可用內存。因此 Spark 並不能準確記錄實際可用的堆內內存,從而也就沒法徹底避免內存溢出(OOM, Out of Memory)的異常。
雖然不能精準控制堆內內存的申請和釋放,但 Spark 經過對存儲內存和執行內存各自獨立的規劃管理,能夠決定是否要在存儲內存裏緩存新的 RDD,以及是否爲新的任務分配執行內存,在必定程度上能夠提高內存的利用率,減小異常的出現。
1.2 堆外內存
爲了進一步優化內存的使用以及提升 Shuffle 時排序的效率,Spark 引入了堆外(Off-heap)內存,使之能夠直接在工做節點的系統內存中開闢空間,存儲通過序列化的二進制數據。利用 JDK Unsafe API(從 Spark 2.0 開始,在管理堆外的存儲內存時再也不基於 Tachyon,而是與堆外的執行內存同樣,基於 JDK Unsafe API 實現[3]),Spark 能夠直接操做系統堆外內存,減小了沒必要要的內存開銷,以及頻繁的 GC 掃描和回收,提高了處理性能。堆外內存能夠被精確地申請和釋放,並且序列化的數據佔用的空間能夠被精確計算,因此相比堆內內存來講下降了管理的難度,也下降了偏差。
在默認狀況下堆外內存並不啓用,可經過配置 spark.memory.offHeap.enabled 參數啓用,並由 spark.memory.offHeap.size 參數設定堆外空間的大小。除了沒有 other 空間,堆外內存與堆內內存的劃分方式相同,全部運行中的併發任務共享存儲內存和執行內存。
1.3 內存管理接口
Spark 爲存儲內存和執行內存的管理提供了統一的接口——MemoryManager,同一個 Executor 內的任務都調用這個接口的方法來申請或釋放內存:
清單 1 . 內存管理接口的主要方法
1
2
3
4
5
6
7
8
9
10
11
12
|
//申請存儲內存
def acquireStorageMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
//申請展開內存
def acquireUnrollMemory(blockId: BlockId, numBytes: Long, memoryMode: MemoryMode): Boolean
//申請執行內存
def acquireExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Long
//釋放存儲內存
def releaseStorageMemory(numBytes: Long, memoryMode: MemoryMode): Unit
//釋放執行內存
def releaseExecutionMemory(numBytes: Long, taskAttemptId: Long, memoryMode: MemoryMode): Unit
//釋放展開內存
def releaseUnrollMemory(numBytes: Long, memoryMode: MemoryMode): Unit
|
咱們看到,在調用這些方法時都須要指定其內存模式(MemoryMode),這個參數決定了是在堆內仍是堆外完成此次操做。
2 . 內存空間分配
2.1 靜態內存管理
在 Spark 最初採用的靜態內存管理機制下,存儲內存、執行內存和其餘內存的大小在 Spark 應用程序運行期間均爲固定的,但用戶能夠應用程序啓動前進行配置,堆內內存的分配如圖 2 所示:
圖 2 . 靜態內存管理圖示——堆內
能夠看到,可用的堆內內存的大小須要按照下面的方式計算:
清單 2 . 可用堆內內存空間
1
2
|
可用的存儲內存 = systemMaxMemory * spark.storage.memoryFraction * spark.storage.safetyFraction
可用的執行內存 = systemMaxMemory * spark.shuffle.memoryFraction * spark.shuffle.safetyFraction
|
其中 systemMaxMemory 取決於當前 JVM 堆內內存的大小,最後可用的執行內存或者存儲內存要在此基礎上與各自的 memoryFraction 參數和 safetyFraction 參數相乘得出。上述計算公式中的兩個 safetyFraction 參數,其意義在於在邏輯上預留出 1-safetyFraction 這麼一塊保險區域,下降因實際內存超出當前預設範圍而致使 OOM 的風險(上文提到,對於非序列化對象的內存採樣估算會產生偏差)。值得注意的是,這個預留的保險區域僅僅是一種邏輯上的規劃,在具體使用時 Spark 並無區別對待,和"其它內存"同樣交給了 JVM 去管理。
堆外的空間分配較爲簡單,只有存儲內存和執行內存,如圖 3 所示。可用的執行內存和存儲內存佔用的空間大小直接由參數 spark.memory.storageFraction 決定,因爲堆外內存佔用的空間能夠被精確計算,因此無需再設定保險區域。
圖 3 . 靜態內存管理圖示——堆外
靜態內存管理機制實現起來較爲簡單,但若是用戶不熟悉 Spark 的存儲機制,或沒有根據具體的數據規模和計算任務或作相應的配置,很容易形成"一半海水,一半火焰"的局面,即存儲內存和執行內存中的一方剩餘大量的空間,而另外一方卻早早被佔滿,不得不淘汰或移出舊的內容以存儲新的內容。因爲新的內存管理機制的出現,這種方式目前已經不多有開發者使用,出於兼容舊版本的應用程序的目的,Spark 仍然保留了它的實現。
2.2 統一內存管理
Spark 1.6 以後引入的統一內存管理機制,與靜態內存管理的區別在於存儲內存和執行內存共享同一塊空間,能夠動態佔用對方的空閒區域,如圖 4 和圖 5 所示
圖 4 . 統一內存管理圖示——堆內
圖 5 . 統一內存管理圖示——堆外
其中最重要的優化在於動態佔用機制,其規則以下:
設定基本的存儲內存和執行內存區域(spark.storage.storageFraction 參數),該設定肯定了雙方各自擁有的空間的範圍
雙方的空間都不足時,則存儲到硬盤;若己方空間不足而對方空餘時,可借用對方的空間;(存儲空間不足是指不足以放下一個完整的 Block)
執行內存的空間被對方佔用後,可以讓對方將佔用的部分轉存到硬盤,而後"歸還"借用的空間
存儲內存的空間被對方佔用後,沒法讓對方"歸還",由於須要考慮 Shuffle 過程當中的不少因素,實現起來較爲複雜[4]
圖 6 . 動態佔用機制圖示
憑藉統一內存管理機制,Spark 在必定程度上提升了堆內和堆外內存資源的利用率,下降了開發者維護 Spark 內存的難度,但並不意味着開發者能夠高枕無憂。譬如,因此若是存儲內存的空間太大或者說緩存的數據過多,反而會致使頻繁的全量垃圾回收,下降任務執行時的性能,由於緩存的 RDD 數據一般都是長期駐留內存的 [5] 。因此要想充分發揮 Spark 的性能,須要開發者進一步瞭解存儲內存和執行內存各自的管理方式和實現原理。
3. 存儲內存管理
3.1 RDD 的持久化機制
彈性分佈式數據集(RDD)做爲 Spark 最根本的數據抽象,是隻讀的分區記錄(Partition)的集合,只能基於在穩定物理存儲中的數據集上建立,或者在其餘已有的 RDD 上執行轉換(Transformation)操做產生一個新的 RDD。轉換後的 RDD 與原始的 RDD 之間產生的依賴關係,構成了血統(Lineage)。憑藉血統,Spark 保證了每個 RDD 均可以被從新恢復。但 RDD 的全部轉換都是惰性的,即只有當一個返回結果給 Driver 的行動(Action)發生時,Spark 纔會建立任務讀取 RDD,而後真正觸發轉換的執行。
Task 在啓動之初讀取一個分區時,會先判斷這個分區是否已經被持久化,若是沒有則須要檢查 Checkpoint 或按照血統從新計算。因此若是一個 RDD 上要執行屢次行動,能夠在第一次行動中使用 persist 或 cache 方法,在內存或磁盤中持久化或緩存這個 RDD,從而在後面的行動時提高計算速度。事實上,cache 方法是使用默認的 MEMORY_ONLY 的存儲級別將 RDD 持久化到內存,故緩存是一種特殊的持久化。 堆內和堆外存儲內存的設計,即可以對緩存 RDD 時使用的內存作統一的規劃和管理 (存儲內存的其餘應用場景,如緩存 broadcast 數據,暫時不在本文的討論範圍以內)。
RDD 的持久化由 Spark 的 Storage 模塊 [7] 負責,實現了 RDD 與物理存儲的解耦合。Storage 模塊負責管理 Spark 在計算過程當中產生的數據,將那些在內存或磁盤、在本地或遠程存取數據的功能封裝了起來。在具體實現時 Driver 端和 Executor 端的 Storage 模塊構成了主從式的架構,即 Driver 端的 BlockManager 爲 Master,Executor 端的 BlockManager 爲 Slave。Storage 模塊在邏輯上以 Block 爲基本存儲單位,RDD 的每一個 Partition 通過處理後惟一對應一個 Block(BlockId 的格式爲 rdd_RDD-ID_PARTITION-ID )。Master 負責整個 Spark 應用程序的 Block 的元數據信息的管理和維護,而 Slave 須要將 Block 的更新等狀態上報到 Master,同時接收 Master 的命令,例如新增或刪除一個 RDD。
圖 7 . Storage 模塊示意圖
在對 RDD 持久化時,Spark 規定了 MEMORY_ONLY、MEMORY_AND_DISK 等 7 種不一樣的
存儲級別 ,而存儲級別是如下 5 個變量的組合:
清單 3 . 存儲級別
1
2
3
4
5
6
7
|
class StorageLevel private(
private var _useDisk: Boolean, //磁盤
private var _useMemory: Boolean, //這裏實際上是指堆內內存
private var _useOffHeap: Boolean, //堆外內存
private var _deserialized: Boolean, //是否爲非序列化
private var _replication: Int = 1 //副本個數
)
|
經過對數據結構的分析,能夠看出存儲級別從三個維度定義了 RDD 的 Partition(同時也就是 Block)的存儲方式:
存儲位置:磁盤/堆內內存/堆外內存。如 MEMORY_AND_DISK 是同時在磁盤和堆內內存上存儲,實現了冗餘備份。OFF_HEAP 則是隻在堆外內存存儲,目前選擇堆外內存時不能同時存儲到其餘位置。
存儲形式:Block 緩存到存儲內存後,是否爲非序列化的形式。如 MEMORY_ONLY 是非序列化方式存儲,OFF_HEAP 是序列化方式存儲。
副本數量:大於 1 時須要遠程冗餘備份到其餘節點。如 DISK_ONLY_2 須要遠程備份 1 個副本。
3.2 RDD 緩存的過程
RDD 在緩存到存儲內存以前,Partition 中的數據通常以迭代器(
Iterator)的數據結構來訪問,這是 Scala 語言中一種遍歷數據集合的方法。經過 Iterator 能夠獲取分區中每一條序列化或者非序列化的數據項(Record),這些 Record 的對象實例在邏輯上佔用了 JVM 堆內內存的 other 部分的空間,同一 Partition 的不一樣 Record 的空間並不連續。
RDD 在緩存到存儲內存以後,Partition 被轉換成 Block,Record 在堆內或堆外存儲內存中佔用一塊連續的空間。將Partition由不連續的存儲空間轉換爲連續存儲空間的過程,Spark稱之爲"展開"(Unroll)。Block 有序列化和非序列化兩種存儲格式,具體以哪一種方式取決於該 RDD 的存儲級別。非序列化的 Block 以一種 DeserializedMemoryEntry 的數據結構定義,用一個數組存儲全部的對象實例,序列化的 Block 則以 SerializedMemoryEntry的數據結構定義,用字節緩衝區(ByteBuffer)來存儲二進制數據。每一個 Executor 的 Storage 模塊用一個鏈式 Map 結構(LinkedHashMap)來管理堆內和堆外存儲內存中全部的 Block 對象的實例[6],對這個 LinkedHashMap 新增和刪除間接記錄了內存的申請和釋放。
由於不能保證存儲空間能夠一次容納 Iterator 中的全部數據,當前的計算任務在 Unroll 時要向 MemoryManager 申請足夠的 Unroll 空間來臨時佔位,空間不足則 Unroll 失敗,空間足夠時能夠繼續進行。對於序列化的 Partition,其所需的 Unroll 空間能夠直接累加計算,一次申請。而非序列化的 Partition 則要在遍歷 Record 的過程當中依次申請,即每讀取一條 Record,採樣估算其所需的 Unroll 空間並進行申請,空間不足時能夠中斷,釋放已佔用的 Unroll 空間。若是最終 Unroll 成功,當前 Partition 所佔用的 Unroll 空間被轉換爲正常的緩存 RDD 的存儲空間,以下圖 8 所示。
圖 8. Spark Unroll 示意圖
在圖 3 和圖 5 中能夠看到,在靜態內存管理時,Spark 在存儲內存中專門劃分了一塊 Unroll 空間,其大小是固定的,統一內存管理時則沒有對 Unroll 空間進行特別區分,當存儲空間不足時會根據動態佔用機制進行處理。
3.3 淘汰和落盤
因爲同一個 Executor 的全部的計算任務共享有限的存儲內存空間,當有新的 Block 須要緩存可是剩餘空間不足且沒法動態佔用時,就要對 LinkedHashMap 中的舊 Block 進行淘汰(Eviction),而被淘汰的 Block 若是其存儲級別中同時包含存儲到磁盤的要求,則要對其進行落盤(Drop),不然直接刪除該 Block。
被淘汰的舊 Block 要與新 Block 的 MemoryMode 相同,即同屬於堆外或堆內內存
新舊 Block 不能屬於同一個 RDD,避免循環淘汰
舊 Block 所屬 RDD 不能處於被讀狀態,避免引起一致性問題
遍歷 LinkedHashMap 中 Block,按照最近最少使用(LRU)的順序淘汰,直到知足新 Block 所需的空間。其中 LRU 是 LinkedHashMap 的特性。
落盤的流程則比較簡單,若是其存儲級別符合_useDisk 爲 true 的條件,再根據其_deserialized 判斷是不是非序列化的形式,如果則對其進行序列化,最後將數據存儲到磁盤,在 Storage 模塊中更新其信息。
4. 執行內存管理
4.1 多任務間內存分配
Executor 內運行的任務一樣共享執行內存,Spark 用一個 HashMap 結構保存了任務到內存耗費的映射。每一個任務可佔用的執行內存大小的範圍爲 1/2N ~ 1/N,其中 N 爲當前 Executor 內正在運行的任務的個數。每一個任務在啓動之時,要向 MemoryManager 請求申請最少爲 1/2N 的執行內存,若是不能被知足要求則該任務被阻塞,直到有其餘任務釋放了足夠的執行內存,該任務才能夠被喚醒。
4.2 Shuffle 的內存佔用
執行內存主要用來存儲任務在執行 Shuffle 時佔用的內存,Shuffle 是按照必定規則對 RDD 數據從新分區的過程,咱們來看 Shuffle 的 Write 和 Read 兩階段對執行內存的使用:
Shuffle Write
若在 map 端選擇普通的排序方式,會採用 ExternalSorter 進行外排,在內存中存儲數據時主要佔用堆內執行空間。
若在 map 端選擇 Tungsten 的排序方式,則採用 ShuffleExternalSorter 直接對以序列化形式存儲的數據排序,在內存中存儲數據時能夠佔用堆外或堆內執行空間,取決於用戶是否開啓了堆外內存以及堆外執行內存是否足夠。
Shuffle Read
在對 reduce 端的數據進行聚合時,要將數據交給 Aggregator 處理,在內存中存儲數據時佔用堆內執行空間。
若是須要進行最終結果排序,則要將再次將數據交給 ExternalSorter 處理,佔用堆內執行空間。
在 ExternalSorter 和 Aggregator 中,Spark 會使用一種叫 AppendOnlyMap 的哈希表在堆內執行內存中存儲數據,但在 Shuffle 過程當中全部數據並不能都保存到該哈希表中,當這個哈希表佔用的內存會進行週期性地採樣估算,當其大到必定程度,沒法再從 MemoryManager 申請到新的執行內存時,Spark 就會將其所有內容存儲到磁盤文件中,這個過程被稱爲溢存(Spill),溢存到磁盤的文件最後會被歸併(Merge)。
Shuffle Write 階段中用到的 Tungsten 是 Databricks 公司提出的對 Spark 優化內存和 CPU 使用的計劃[9],解決了一些 JVM 在性能上的限制和弊端。Spark 會根據 Shuffle 的狀況來自動選擇是否採用 Tungsten 排序。Tungsten 採用的頁式內存管理機制創建在 MemoryManager 之上,即 Tungsten 對執行內存的使用進行了一步的抽象,這樣在 Shuffle 過程當中無需關心數據具體存儲在堆內仍是堆外。每一個內存頁用一個 MemoryBlock 來定義,並用 Object obj 和 long offset 這兩個變量統一標識一個內存頁在系統內存中的地址。堆內的 MemoryBlock 是以 long 型數組的形式分配的內存,其 obj 的值爲是這個數組的對象引用,offset 是 long 型數組的在 JVM 中的初始偏移地址,二者配合使用能夠定位這個數組在堆內的絕對地址;堆外的 MemoryBlock 是直接申請到的內存塊,其 obj 爲 null,offset 是這個內存塊在系統內存中的 64 位絕對地址。Spark 用 MemoryBlock 巧妙地將堆內和堆外內存頁統一抽象封裝,並用頁表(pageTable)管理每一個 Task 申請到的內存頁。
Tungsten 頁式管理下的全部內存用 64 位的邏輯地址表示,由頁號和頁內偏移量組成:
頁號:佔 13 位,惟一標識一個內存頁,Spark 在申請內存頁以前要先申請空閒頁號。
頁內偏移量:佔 51 位,是在使用內存頁存儲數據時,數據在頁內的偏移地址。
有了統一的尋址方式,Spark 能夠用 64 位邏輯地址的指針定位到堆內或堆外的內存,整個 Shuffle Write 排序的過程只須要對指針進行排序,而且無需反序列化,整個過程很是高效,對於內存訪問效率和 CPU 使用效率帶來了明顯的提高[10]。
Spark 的存儲內存和執行內存有着大相徑庭的管理方式:對於存儲內存來講,Spark 用一個 LinkedHashMap 來集中管理全部的 Block,Block 由須要緩存的 RDD 的 Partition 轉化而成;而對於執行內存,Spark 用 AppendOnlyMap 來存儲 Shuffle 過程當中的數據,在 Tungsten 排序中甚至抽象成爲頁式內存管理,開闢了全新的 JVM 內存管理機制。
結束語
Spark 的內存管理是一套複雜的機制,且 Spark 的版本更新比較快,筆者水平有限,不免有敘述不清、錯誤的地方,若讀者有好的建議和更深的理解,還望不吝賜教。