sparkCore源碼解析之block

block

1. 標記

Apache Spark中,對Block的查詢、存儲管理,是經過惟一的Block ID來進行區分的。html

同一個Spark Application,以及多個運行的Application之間,對應的Block都具備惟一的ID後端

1.1. 種類

須要在worker和driver間共享數據時,就須要對這個數據進行惟一的標識,經常使用的須要傳輸的block信息有如下幾類 RDDBlockId、ShuffleBlockId、ShuffleDataBlockId、ShuffleIndexBlockId、BroadcastBlockId、TaskResultBlockId、TempLocalBlockId、TempShuffleBlockId緩存

1.2. 生成規則

RDDBlockId : "rdd_" + rddId + "_" + splitIndex
ShuffleBlockId : "shuffle_" + shuffleId + "_" + mapId + "_" + reduceId
ShuffleDataBlockId:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".data"
ShuffleIndexBlockId:"shuffle_" + shuffleId + "_" + mapId + "_" + reduceId + ".index"
TaskResultBlockId:"taskresult_" + taskId
StreamBlockId:"input-" + streamId + "-" + uniqueId
...

2. 存儲

img

DiskStore是經過DiskBlockManager進行管理存儲到磁盤上的Block數據文件的,在同一個節點上的多個Executor共享相同的磁盤文件路徑,相同的Block數據文件也就會被同一個節點上的多個Executor所共享。而對應MemoryStore,由於每一個Executor對應獨立的JVM實例,從而具備獨立的Storage/Execution內存管理,因此使用MemoryStore不能共享同一個Block數據,可是同一個節點上的多個Executor之間的MemoryStore之間拷貝數據,比跨網絡傳輸要高效的多安全

2.1. MemoryStore

數據在內存中存儲的形式網絡

  1. 以序列化格式
  2. 以反序列化的形式 ​ 2.1 Block數據記錄可以徹底放到內存中 ​ 2.2 Block數據記錄只能部分放到內存中:申請Unroll內存(預佔內存)
  3. 以序列化二進制格式保存Block數據
MEMORY_ONLY
MEMORY_ONLY_2
MEMORY_ONLY_SER
MEMORY_ONLY_SER_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP

2.2. DiskStore

數據羅盤的幾種形式:分佈式

  1. 經過文件流寫Block數據
  2. 將二進制Block數據寫入文件
DISK_ONLY
DISK_ONLY_2
MEMORY_AND_DISK
MEMORY_AND_DISK_2
MEMORY_AND_DISK_SER
MEMORY_AND_DISK_SER_2
OFF_HEAP

2.2.1. DiskBlockManager

DiskStore即基於文件來存儲Block. 基於Disk來存儲,首先必需要解決一個問題就是磁盤文件的管理:磁盤目錄結構的組成,目錄的清理等,在Spark對磁盤文件的管理是經過 DiskBlockManager來進行管理的ide

DiskBlockManager管理了每一個Block數據存儲位置的信息,包括從Block ID到磁盤上文件的映射關係。DiskBlockManager主要有以下幾個功能:ui

  1. 負責建立一個本地節點上的指定磁盤目錄,用來存儲Block數據到指定文件中
  2. 若是Block數據想要落盤,須要經過調用getFile方法來分配一個惟一的文件路徑
  3. 若是想要查詢一個Block是否在磁盤上,經過調用containsBlock方法來查詢
  4. 查詢當前節點上管理的所有Block文件 經過調用createTempLocalBlock方法,生成一個惟一Block ID,並建立一個惟一的臨時文件,用來存儲中間結果數據
  5. 經過調用createTempShuffleBlock方法,生成一個惟一Block ID,並建立一個惟一的臨時文件,用來存儲Shuffle過程的中間結果數據

2.3. offHeap

堆外存儲不支持序列化和副本this

Spark中實現的OffHeap是基於Tachyon:分佈式內存文件系統來實現的spa

3. 內存管理模型

img

在Spark Application提交之後,最終會在Worker上啓動獨立的Executor JVM,Task就運行在Executor裏面。在一個Executor JVM內部,內存管理模型就是管理excutor運行所須要的內存

http://shiyanjun.cn/archives/1585.html

3.1. StaticMemoryManager

1.5以前版本使用 缺點:

  1. 沒有一個合理的默認值可以適應不一樣計算場景下的Workload
  2. 內存調優困難,須要對Spark內部原理很是熟悉才能作好
  3. 對不須要Cache的Application的計算場景,只能使用不多一部份內存

3.2. UnifiedMemoryManager

img

統一內存分配管理模型:

  1. 能夠動態的分配excution和storage的內存大小
  2. 不只能夠分配堆內內存,也能夠分配堆外內存
  3. 堆外內存和分配比例均可以經過參數配置
  4. 內存的分配和回收是經過MemoryPool控制
abstract class MemoryManager(
​    conf: SparkConf,
​    numCores: Int,
​    onHeapStorageMemory: Long,
​    onHeapExecutionMemory: Long){
​	
 // storage堆內內存
  @GuardedBy("this")
  protected val onHeapStorageMemoryPool = 		new StorageMemoryPool(this, 		MemoryMode.ON_HEAP)
 // storage堆外內存
  @GuardedBy("this")
  protected val offHeapStorageMemoryPool = 		new StorageMemoryPool(this, 		MemoryMode.OFF_HEAP)
// execution堆內內存
  @GuardedBy("this")
  protected val onHeapExecutionMemoryPool = 		new ExecutionMemoryPool(this, 		MemoryMode.ON_HEAP)
// excution堆外內存
  @GuardedBy("this")
  protected val offHeapExecutionMemoryPool = 		new ExecutionMemoryPool(this, 		MemoryMode.OFF_HEAP)
}
// 默認最大堆內存
val maxOffHeapMemory = conf.getSizeAsBytes("spark.memory.offHeap.size", 0)
 
// 默認storage和excution的內存大小各佔50%
offHeapStorageMemory =
​    (maxOffHeapMemory * conf.getDouble("spark.memory.storageFraction", 0.5)).toLong

3.2.1. 內存劃分

img

在統一內存管理模型中,storage和excution內存大小能夠動態調整,在必定程度上減小了OOM發生機率

默認內存劃分:

預留內存reservedMemory=300M 管理內存maxHeapMemory = (systemMemory - reservedMemory) * 0.6 storageMemory=excutionMemory=maxHeapMemory*0.5

非堆內存默認值0,可經過spark.memory.offHeap.size參數調整,其中storage和excution的內存佔比也均爲50%

3.2.1.1. Storage內存區

Storage內存,用來緩存Task數據、在Spark集羣中傳輸(Propagation)內部數據

3.2.1.2. Execution內存區

Execution內存,用於知足Shuffle、Join、Sort、Aggregation計算過程當中對內存的需求

3.2.1.3. 預留內存

3.2.1.4. 非堆內存

3.2.2. 內存調控

img

3.2.2.1. Storage內存

3.2.2.1.1. 申請
  1. 判斷申請內存類型:堆內仍是堆外
  2. 若是申請內存大於剩餘內存總量則申請失敗
  3. 若是申請內存大小在storage內存範圍內則直接分配
  4. 若是申請內存大於storage剩餘內存則借用excution內存
// 爲blockId申請numBytes字節大小的內存
override def acquireStorageMemory ()synchronized { 
  val (executionPool, storagePool, maxMemory) = memoryMode match { 
// 根據memoryMode值,返回對應的StorageMemoryPool與ExecutionMemoryPool
​    case MemoryMode.ON_HEAP =>
​    case MemoryMode.OFF_HEAP => 
  }
  if (numBytes > maxMemory) {
 // 申請的內存大於剩餘內存總理則申請失敗
​      s"memory limit ($maxMemory bytes)")
​    return false
  }
  if (numBytes > storagePool.memoryFree) { 
// 若是Storage內存塊中沒有足夠可用內存給blockId使用,則計算當前Storage內存區缺乏多少內存,而後從Execution內存區中借用
​    val memoryBorrowedFromExecution = Math.min(executionPool.memoryFree, numBytes)
// Execution內存區減掉借用內存量
executionPool.decrementPoolSize(memoryBorrowedFromExecution) 
// Storage內存區增長借用內存量    storagePool.incrementPoolSize(memoryBorrowedFromExecution) 
  }
// 若是Storage內存區能夠爲blockId分配內存,直接成功分配;不然,若是從Execution內存區中借用的內存可以知足blockId,則分配成功,不能知足則分配失敗。
  storagePool.acquireMemory(blockId, numBytes) 
}
3.2.2.1.2. 釋放

釋放Storage內存比較簡單,只須要更新Storage內存計量變量便可

def releaseMemory(size: Long): Unit = lock.synchronized {
  if (size > _memoryUsed) {
​    // 須要釋放內存大於已使用內存,則直接清零
​    _memoryUsed = 0
  } else {
​	// 從已使用內存中減去釋放內存大小
​    _memoryUsed -= size
  }
}

3.2.2.2. Excution內存

excution內存的獲取和釋放都是線程安全的,並且分配給每一個task的內存大小是均等的,每當有task運行完畢後,都會觸發內存的回收操做。

3.2.2.2.1. 申請

若是從storage申請內存大小比storage剩餘內存大,則申請線程會阻塞,並對storage內存發起縮小操做。直到storage釋放足夠內存。

Execution內存區內存分配的基本原則: 若是有N個活躍(Active)的Task在運行,ExecutionMemoryPool須要保證每一個Task在將中間結果數據Spill到磁盤以前,至少可以申請到當前Execution內存區對應的Pool中1/2N大小的內存量,至可能是1/N大小的內存。

這裏N是動態變化的,由於可能有新的Task被啓動,也有可能Task運行完成釋放資源,因此ExecutionMemoryPool會持續跟蹤ExecutionMemoryPool內部Task集合memoryForTask的變化,並不斷地從新計算分配給每一個Task的這兩個內存量的值:1/2N和1/N。

3.2.2.2.2. 釋放
// 同步的釋放內存
def releaseMemory(numBytes: Long, taskAttemptId: Long): Unit = lock.synchronized {
  val curMem = memoryForTask.getOrElse(taskAttemptId, 0L)
// 計算釋放內存大小
 var memoryToFree = if (curMem < numBytes) { 
​	// 沒有足夠內存須要釋放,則釋放掉當前task全部使用內存
​    curMem
  } else {
​    numBytes
  }
  if (memoryForTask.contains(taskAttemptId)) { // Task執行完成,從內部維護的memoryForTask中移除
​    memoryForTask(taskAttemptId) -= memoryToFree
​    if (memoryForTask(taskAttemptId) <= 0) {
​      memoryForTask.remove(taskAttemptId)
​    }
  }
 // 通知調用acquireMemory()方法申請內存的Task內存已經釋放
  lock.notifyAll()
}

4. BlockManager

BlockManagerMaster管理BlockManager. BlockManager在每一個Dirver和Executor上都有,用來管理Block數據,包括數據的獲取和保存等

談到Spark中的Block數據存儲,咱們很容易可以想到BlockManager,他負責管理在每一個Dirver和Executor上的Block數據,多是本地或者遠程的。具體操做包括查詢Block、將Block保存在指定的存儲中,如內存、磁盤、堆外(Off-heap)。而BlockManager依賴的後端,對Block數據進行內存、磁盤存儲訪問,都是基於前面講到的MemoryStore、DiskStore。 在Spark集羣中,當提交一個Application執行時,該Application對應的Driver以及全部的Executor上,都存在一個BlockManager、BlockManagerMaster,而BlockManagerMaster是負責管理各個BlockManager之間通訊,這個BlockManager管理集羣

4.1. 讀數據

每一個Executor上都有一個BlockManager實例,負責管理用戶提交的該Application計算過程當中產生的Block。

頗有可能當前Executor上存儲在RDD對應Partition的通過處理後獲得的Block數據,也有可能當前Executor上沒有,可是其餘Executor上已經處理過並緩存了Block數據,因此對應着本地獲取、遠程獲取兩種可能

4.2. BlockManager集羣

關於一個Application運行過程當中Block的管理,主要是基於該Application所關聯的一個Driver和多個Executor構建了一個Block管理集羣:Driver上的(BlockManagerMaster, BlockManagerMasterEndpoint)是集羣的Master角色,全部Executor上的(BlockManagerMaster, RpcEndpointRef)做爲集羣的Slave角色。當Executor上的Task運行時,會查詢對應的RDD的某個Partition對應的Block數據是否處理過,這個過程當中會觸發多個BlockManager之間的通訊交互

4.3. 狀態管理

BlockManager在進行put操做後,經過blockInfoManager來控制當前put等操做是否完成以及是否成功。

對於BlockManager中的存儲的每一個Block,不必定是對應的數據都PUT成功了,不必定能夠當即提供對外的讀取,由於PUT是一個過程,有成功仍是有失敗的狀態. ,拿ShuffleBlock來講,在shuffleMapTask須要Put一個Block到BlockManager中,在Put完成以前,該Block將處於Pending狀態,等待Put完成了不表明Block就能夠被讀取, 由於Block還可能Put"fail"了.

所以BlockManager經過BlockInfo來維護每一個Block狀態,在BlockManager的代碼中就是經過一個TimeStampedHashMap來維護BlockID和BlockInfo之間的map.

private val blockInfo = new TimeStampedHashMap[BlockId, BlockInfo] 注: 2.2中此處是經過線程安全的hashMap和一個計數器實現的

5. 讀寫控制

BlockInfoManager經過同步機制防止多個task處理同一個block數據塊

用戶提交一個Spark Application程序,若是程序對應的DAG圖相對複雜,其中不少Task計算的結果Block數據都有可能被重複使用,這種狀況下如何去控制某個Executor上的Task線程去讀寫Block數據呢?其實,BlockInfoManager就是用來控制Block數據讀寫操做,而且跟蹤Task讀寫了哪些Block數據的映射關係,這樣若是兩個Task都想去處理同一個RDD的同一個Partition數據,若是沒有鎖來控制,極可能兩個Task都會計算並寫同一個Block數據,從而形成混亂

class BlockInfoManager{
	val infos = 
		new mutable.HashMap[BlockId, BlockInfo]
	// 存放被鎖定任務列表
	val writeLocksByTask =
    	new mutable.HashMap[
			TaskAttemptId, mutable.Set[BlockId]]
	 val readLocksByTask =
    	new mutable.HashMap[TaskAttemptId, 		ConcurrentHashMultiset[BlockId]]
 
	def lockForReading(){
		infos.get(blockId) match {
			case Some(info) =>
				// 沒有寫任務
          		if (info.writerTask == 					BlockInfo.NO_WRITER) {
					// 讀task數量加一
            		info.readerCount += 1
					//	放入讀多鎖定隊列
 					readLocksByTask(
					currentTaskAttemptId).
					add(blockId)
	}
}
def lockForWriting(){
		case Some(info) =>
          if (info.writerTask == 				BlockInfo.NO_WRITER && 				info.readerCount == 0) {
            info.writerTask = currentTaskAttemptId
 			writeLocksByTask.addBinding(
			currentTaskAttemptId, blockId)
}

sparkCore源碼解析系列:

  1. sparkCore源碼解析之block
  2. sparkCore源碼解析之partition
  3. sparkCore源碼解析之Job
  4. sparkCore源碼解析之shuffle
  5. sparkCore源碼解析之完整腦圖地址
相關文章
相關標籤/搜索