Apache Spark中,對Block的查詢、存儲管理,是經過惟一的Block ID來進行區分的。html
同一個Spark Application,以及多個運行的Application之間,對應的Block都具備惟一的ID後端
須要在worker和driver間共享數據時,就須要對這個數據進行惟一的標識,經常使用的須要傳輸的block信息有如下幾類 RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId緩存
RDDBlockId : "rdd_" + rddId + "_" + splitIndex ShuffleBlockId : "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId ShuffleDataBlockId:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data" ShuffleIndexBlockId:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index" TaskResultBlockId:"taskresult_" + taskId StreamBlockId:"input-" + streamId + "-" + uniqueId ...
DiskStore是經過DiskBlockManager進行管理存儲到磁盤上的Block數據文件的,在同一個節點上的多個Executor共享相同的磁盤文件路徑,相同的Block數據文件也就會被同一個節點上的多個Executor所共享。而對應MemoryStore,由於每一個Executor對應獨立的JVM實例,從而具備獨立的Storage/Execution內存管理,因此使用MemoryStore不能共享同一個Block數據,可是同一個節點上的多個Executor之間的MemoryStore之間拷貝數據,比跨網絡傳輸要高效的多安全
數據在內存中存儲的形式網絡
MEMORY_ONLY MEMORY_ONLY_2 MEMORY_ONLY_SER MEMORY_ONLY_SER_2 MEMORY_AND_DISK MEMORY_AND_DISK_2 MEMORY_AND_DISK_SER MEMORY_AND_DISK_SER_2 OFF_HEAP
數據羅盤的幾種形式:分佈式
DISK_ONLY DISK_ONLY_2 MEMORY_AND_DISK MEMORY_AND_DISK_2 MEMORY_AND_DISK_SER MEMORY_AND_DISK_SER_2 OFF_HEAP
DiskStore即基於文件來存儲Block. 基於Disk來存儲,首先必需要解決一個問題就是磁盤文件的管理:磁盤目錄結構的組成,目錄的清理等,在Spark對磁盤文件的管理是經過 DiskBlockManager來進行管理的ide
DiskBlockManager管理了每一個Block數據存儲位置的信息,包括從Block ID到磁盤上文件的映射關係。DiskBlockManager主要有以下幾個功能:ui
堆外存儲不支持序列化和副本this
Spark中實現的OffHeap是基於Tachyon:分佈式內存文件系統來實現的spa
在Spark Application提交之後,最終會在Worker上啓動獨立的Executor JVM,Task就運行在Executor裏面。在一個Executor JVM內部,內存管理模型就是管理excutor運行所須要的內存
http://shiyanjun.cn/archives/1585.html
1.5以前版本使用 缺點:
統一內存分配管理模型:
abstract class MemoryManager( conf: SparkConf, numCores: Int, onHeapStorageMemory: Long, onHeapExecutionMemory: Long){ // storage堆內內存 @GuardedBy("this") protected val onHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.ON_HEAP) // storage堆外內存 @GuardedBy("this") protected val offHeapStorageMemoryPool = new StorageMemoryPool(this, MemoryMode.OFF_HEAP) // execution堆內內存 @GuardedBy("this") protected val onHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.ON_HEAP) // excution堆外內存 @GuardedBy("this") protected val offHeapExecutionMemoryPool = new ExecutionMemoryPool(this, MemoryMode.OFF_HEAP) } // 默認最大堆內存 val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0) // 默認storage和excution的內存大小各佔50% offHeapStorageMemory = (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong
在統一內存管理模型中,storage和excution內存大小能夠動態調整,在必定程度上減小了OOM發生機率
默認內存劃分:
預留內存reservedMemory=300M 管理內存maxHeapMemory = (systemMemory - reservedMemory) * 0.6 storageMemory=excutionMemory=maxHeapMemory*0.5
非堆內存默認值0,可經過spark.memory.offHeap.size參數調整,其中storage和excution的內存佔比也均爲50%
Storage內存,用來緩存Task數據、在Spark集羣中傳輸(Propagation)內部數據
Execution內存,用於知足Shuffle、Join、Sort、Aggregation計算過程當中對內存的需求
// 爲blockId申請numBytes字節大小的內存 override def acquireStorageMemory ()synchronized { val (executionPool, storagePool, maxMemory) = memoryMode match { // 根據memoryMode值,返回對應的StorageMemoryPool與ExecutionMemoryPool case MemoryMode.ON_HEAP => case MemoryMode.OFF_HEAP => } if (numBytes > maxMemory) { // 申請的內存大於剩餘內存總理則申請失敗 s"memory limit ($maxMemory bytes)") return false } if (numBytes > storagePool.memoryFree) { // 若是Storage內存塊中沒有足夠可用內存給blockId使用,則計算當前Storage內存區缺乏多少內存,而後從Execution內存區中借用 val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes) // Execution內存區減掉借用內存量 executionPool.decrementPoolSize(memoryBorrowedFromExecution) // Storage內存區增長借用內存量 storagePool.incrementPoolSize(memoryBorrowedFromExecution) } // 若是Storage內存區能夠爲blockId分配內存,直接成功分配;不然,若是從Execution內存區中借用的內存可以知足blockId,則分配成功,不能知足則分配失敗。 storagePool.acquireMemory(blockId, numBytes) }
釋放Storage內存比較簡單,只須要更新Storage內存計量變量便可
def releaseMemory(size: Long): Unit = lock.synchronized { if (size > _memoryUsed) { // 須要釋放內存大於已使用內存,則直接清零 _memoryUsed = 0 } else { // 從已使用內存中減去釋放內存大小 _memoryUsed -= size } }
excution內存的獲取和釋放都是線程安全的,並且分配給每一個task的內存大小是均等的,每當有task運行完畢後,都會觸發內存的回收操做。
若是從storage申請內存大小比storage剩餘內存大,則申請線程會阻塞,並對storage內存發起縮小操做。直到storage釋放足夠內存。
Execution內存區內存分配的基本原則: 若是有N個活躍(Active)的Task在運行,ExecutionMemoryPool須要保證每一個Task在將中間結果數據Spill到磁盤以前,至少可以申請到當前Execution內存區對應的Pool中1/2N大小的內存量,至可能是1/N大小的內存。
這裏N是動態變化的,由於可能有新的Task被啓動,也有可能Task運行完成釋放資源,因此ExecutionMemoryPool會持續跟蹤ExecutionMemoryPool內部Task集合memoryForTask的變化,並不斷地從新計算分配給每一個Task的這兩個內存量的值:1/2N和1/N。
// 同步的釋放內存 def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized { val curMem = memoryForTask.getOrElse(taskAttemptId, 0L) // 計算釋放內存大小 var memoryToFree = if (curMem < numBytes) { // 沒有足夠內存須要釋放,則釋放掉當前task全部使用內存 curMem } else { numBytes } if (memoryForTask.contains(taskAttemptId)) { // Task執行完成,從內部維護的memoryForTask中移除 memoryForTask(taskAttemptId) -= memoryToFree if (memoryForTask(taskAttemptId) <= 0) { memoryForTask.remove(taskAttemptId) } } // 通知調用acquireMemory()方法申請內存的Task內存已經釋放 lock.notifyAll() }
BlockManagerMaster管理BlockManager. BlockManager在每一個Dirver和Executor上都有,用來管理Block數據,包括數據的獲取和保存等
談到Spark中的Block數據存儲,咱們很容易可以想到BlockManager,他負責管理在每一個Dirver和Executor上的Block數據,多是本地或者遠程的。具體操做包括查詢Block、將Block保存在指定的存儲中,如內存、磁盤、堆外(Off-heap)。而BlockManager依賴的後端,對Block數據進行內存、磁盤存儲訪問,都是基於前面講到的MemoryStore、DiskStore。 在Spark集羣中,當提交一個Application執行時,該Application對應的Driver以及全部的Executor上,都存在一個BlockManager、BlockManagerMaster,而BlockManagerMaster是負責管理各個BlockManager之間通訊,這個BlockManager管理集羣
每一個Executor上都有一個BlockManager實例,負責管理用戶提交的該Application計算過程當中產生的Block。
頗有可能當前Executor上存儲在RDD對應Partition的通過處理後獲得的Block數據,也有可能當前Executor上沒有,可是其餘Executor上已經處理過並緩存了Block數據,因此對應着本地獲取、遠程獲取兩種可能
關於一個Application運行過程當中Block的管理,主要是基於該Application所關聯的一個Driver和多個Executor構建了一個Block管理集羣:Driver上的(BlockManagerMaster, BlockManagerMasterEndpoint)是集羣的Master角色,全部Executor上的(BlockManagerMaster, RpcEndpointRef)做爲集羣的Slave角色。當Executor上的Task運行時,會查詢對應的RDD的某個Partition對應的Block數據是否處理過,這個過程當中會觸發多個BlockManager之間的通訊交互
BlockManager在進行put操做後,經過blockInfoManager來控制當前put等操做是否完成以及是否成功。
對於BlockManager中的存儲的每一個Block,不必定是對應的數據都PUT成功了,不必定能夠當即提供對外的讀取,由於PUT是一個過程,有成功仍是有失敗的狀態. ,拿ShuffleBlock來講,在shuffleMapTask須要Put一個Block到BlockManager中,在Put完成以前,該Block將處於Pending狀態,等待Put完成了不表明Block就能夠被讀取, 由於Block還可能Put"fail"了.
所以BlockManager經過BlockInfo來維護每一個Block狀態,在BlockManager的代碼中就是經過一個TimeStampedHashMap來維護BlockID和BlockInfo之間的map.
private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] 注: 2.2中此處是經過線程安全的hashMap和一個計數器實現的
BlockInfoManager經過同步機制防止多個task處理同一個block數據塊
用戶提交一個Spark Application程序,若是程序對應的DAG圖相對複雜,其中不少Task計算的結果Block數據都有可能被重複使用,這種狀況下如何去控制某個Executor上的Task線程去讀寫Block數據呢?其實,BlockInfoManager就是用來控制Block數據讀寫操做,而且跟蹤Task讀寫了哪些Block數據的映射關係,這樣若是兩個Task都想去處理同一個RDD的同一個Partition數據,若是沒有鎖來控制,極可能兩個Task都會計算並寫同一個Block數據,從而形成混亂
class BlockInfoManager{ val infos = new mutable.HashMap[BlockId, BlockInfo] // 存放被鎖定任務列表 val writeLocksByTask = new mutable.HashMap[ TaskAttemptId, mutable.Set[BlockId]] val readLocksByTask = new mutable.HashMap[TaskAttemptId, ConcurrentHashMultiset[BlockId]] def lockForReading(){ infos.get(blockId) match { case Some(info) => // 沒有寫任務 if (info.writerTask == BlockInfo.NO_WRITER) { // 讀task數量加一 info.readerCount += 1 // 放入讀多鎖定隊列 readLocksByTask( currentTaskAttemptId). add(blockId) } } def lockForWriting(){ case Some(info) => if (info.writerTask == BlockInfo.NO_WRITER && info.readerCount == 0) { info.writerTask = currentTaskAttemptId writeLocksByTask.addBinding( currentTaskAttemptId, blockId) }
sparkCore源碼解析系列: