原文連接:http://jerryshao.me/architecture/2013/10/08/spark-storage-module-analysis/java
前段時間雜事頗多,一直沒有時間整理本身的博客,Spark源碼分析寫到一半也擱置了。以前介紹了deploy和scheduler兩大模塊,此次介紹Spark中的另外一大模塊 - storage模塊。node
在寫Spark程序的時候咱們經常和RDD ( Resilient Distributed Dataset ) 打交道,經過RDD爲咱們提供的各類transformation和action接口實現咱們的應用,RDD的引入提升了抽象層次,在接口和實現上進行有效地隔離,使用戶無需關心底層的實現。可是RDD提供給咱們的僅僅是一個「形」, 咱們所操做的數據究竟放在哪裏,如何存取?它的「體」是怎麼樣的?這是由storage模塊來實現和管理的,接下來咱們就要剖析一下storage模塊。網絡
Storage模塊主要分爲兩層:架構
而其餘模塊若要和storage模塊進行交互,storage模塊提供了統一的操做類BlockManager
,外部類與storage模塊打交道都須要經過調用BlockManager
相應接口來實現。app
首先來看一下通訊層的UML類圖:less
其次咱們來看看各個類在master和slave上所扮演的不一樣角色:dom
對於master和slave,BlockManager
的建立有所不一樣:ide
Master (client driver)函數
BlockManagerMaster
擁有BlockManagerMasterActor
的actor和全部BlockManagerSlaveActor
的ref。源碼分析
Slave (executor)
對於slave,BlockManagerMaster
則擁有BlockManagerMasterActor
的ref和自身BlockManagerSlaveActor
的actor。
BlockManagerMasterActor
在ref和actor之間進行通訊;BlockManagerSlaveActor
在ref和actor之間通訊。
actor和ref:
actor和ref是Akka中的兩個不一樣的actor reference,分別由
actorOf
和actorFor
所建立。actor相似於網絡服務中的server端,它保存全部的狀態信息,接收client端的請求執行並返回給客戶端;ref相似於網絡服務中的client端,經過向server端發起請求獲取結果。
BlockManager
wrap了BlockManagerMaster
,經過BlockManagerMaster
進行通訊。Spark會在client driver和executor端建立各自的BlockManager
,經過BlockManager
對storage模塊進行操做。
BlockManager
對象在SparkEnv
中被建立,建立的過程以下所示:
def registerOrLookup(name:String, newActor:=>Actor):ActorRef={
if(isDriver){
logInfo("Registering "+ name)
actorSystem.actorOf(Props(newActor), name = name)
}else{
val driverHost:String=System.getProperty("spark.driver.host","localhost")
val driverPort:Int=System.getProperty("spark.driver.port","7077").toInt
Utils.checkHost(driverHost,"Expected hostname")
val url ="akka://spark@%s:%s/user/%s".format(driverHost, driverPort, name)
logInfo("Connecting to "+ name +": "+ url)
actorSystem.actorFor(url)
}
}
val blockManagerMaster =newBlockManagerMaster(registerOrLookup(
"BlockManagerMaster",
newBlockManagerMasterActor(isLocal)))
val blockManager =newBlockManager(executorId, actorSystem, blockManagerMaster, serializer)
能夠看到對於client driver和executor,Spark分別建立了BlockManagerMasterActor
actor和ref,並被wrap到BlockManager
中。
BlockManagerMasterActor
executor to client driver
RegisterBlockManager (executor建立BlockManager之後向client driver發送請求註冊自身) HeartBeat UpdateBlockInfo (更新block信息) GetPeers (請求得到其餘BlockManager的id) GetLocations (獲取block所在的BlockManager的id) GetLocationsMultipleBlockIds (獲取一組block所在的BlockManager id)
client driver to client driver
GetLocations (獲取block所在的BlockManager的id) GetLocationsMultipleBlockIds (獲取一組block所在的BlockManager id) RemoveExecutor (刪除所保存的已經死亡的executor上的BlockManager) StopBlockManagerMaster (中止client driver上的BlockManagerMasterActor)
有些消息例如
GetLocations
在executor端和client driver端都會向actor請求,而其餘的消息好比RegisterBlockManager
只會由executor端的ref向client driver端的actor發送,於此同時例如RemoveExecutor
則只會由client driver端的ref向client driver端的actor發送。具體消息是從哪裏發送,哪裏接收和處理請看代碼細節,在這裏就再也不贅述了。
BlockManagerSlaveActor
client driver to executor
RemoveBlock (刪除block) RemoveRdd (刪除RDD)
通訊層中涉及許多控制消息和狀態消息的傳遞以及處理,這些細節能夠直接查看源碼,這裏就不在一一羅列。下面就只簡單介紹一下exeuctor端的BlockManager
是如何啓動以及向client driver發送註冊請求完成註冊。
前面已經介紹了BlockManager
對象是如何被建立出來的,當BlockManager
被建立出來之後須要向client driver註冊本身,下面咱們來看一下這個流程:
首先BlockManager
會調用initialize()
初始化本身
privatedef initialize(){
master.registerBlockManager(blockManagerId, maxMemory, slaveActor)
...
if(!BlockManager.getDisableHeartBeatsForTesting){
heartBeatTask = actorSystem.scheduler.schedule(0.seconds, heartBeatFrequency.milliseconds){
heartBeat()
}
}
}
在initialized()
函數中首先調用BlockManagerMaster
向client driver註冊本身,同時設置heartbeat定時器,定時發送heartbeat報文。能夠看到在註冊自身的時候向client driver傳遞了自身的slaveActor
,client driver收到slaveActor
之後會將其與之對應的BlockManagerInfo
存儲到hash map中,以便後續經過slaveActor
向executor發送命令。
BlockManagerMaster
會將註冊請求包裝成RegisterBlockManager
報文發送給client driver的BlockManagerMasterActor
,BlockManagerMasterActor
調用register()
函數註冊BlockManager
:
privatedefregister(id:BlockManagerId, maxMemSize:Long, slaveActor:ActorRef){
if(id.executorId =="<driver>"&&!isLocal){
// Got a register message from the master node; don't register it
}elseif(!blockManagerInfo.contains(id)){
blockManagerIdByExecutor.get(id.executorId) match {
caseSome(manager)=>
// A block manager of the same executor already exists.
// This should never happen. Let's just quit.
logError("Got two different block manager registrations on "+ id.executorId)
System.exit(1)
caseNone=>
blockManagerIdByExecutor(id.executorId)= id
}
blockManagerInfo(id)=newBlockManagerMasterActor.BlockManagerInfo(
id,System.currentTimeMillis(), maxMemSize, slaveActor)
}
}
須要注意的是在client driver端也會執行上述過程,只是在最後註冊的時候若是判斷是"<driver>"
就不進行任何操做。能夠看到對應的BlockManagerInfo
對象被建立並保存在hash map中。
在RDD層面上咱們瞭解到RDD是由不一樣的partition組成的,咱們所進行的transformation和action是在partition上面進行的;而在storage模塊內部,RDD又被視爲由不一樣的block組成,對於RDD的存取是以block爲單位進行的,本質上partition和block是等價的,只是看待的角度不一樣。在Spark storage模塊中中存取數據的最小單位是block,全部的操做都是以block爲單位進行的。
首先咱們來看一下存儲層的UML類圖:
BlockManager
對象被建立的時候會建立出MemoryStore
和DiskStore
對象用以存取block,同時在initialize()
函數中建立BlockManagerWorker
對象用以監聽遠程的block存取請求來進行相應處理。
private[storage] val memoryStore:BlockStore=newMemoryStore(this, maxMemory)
private[storage] val diskStore:DiskStore=
newDiskStore(this,System.getProperty("spark.local.dir",System.getProperty("java.io.tmpdir")))
privatedef initialize(){
...
BlockManagerWorker.startBlockManagerWorker(this)
...
}
下面就具體介紹一下對於DiskStore
和MemoryStore
,block的存取操做是怎樣進行的。
DiskStore
能夠配置多個folder,Spark會在不一樣的folder下面建立Spark文件夾,文件夾的命名方式爲(spark-local-yyyyMMddHHmmss-xxxx, xxxx是一個隨機數),全部的block都會存儲在所建立的folder裏面。DiskStore
會在對象被建立時調用createLocalDirs()
來建立文件夾:
privatedef createLocalDirs():Array[File]={
logDebug("Creating local directories at root dirs '"+ rootDirs +"'")
val dateFormat =newSimpleDateFormat("yyyyMMddHHmmss")
rootDirs.split(",").map { rootDir =>
var foundLocalDir =false
var localDir:File=null
var localDirId:String=null
var tries =0
val rand =newRandom()
while(!foundLocalDir && tries < MAX_DIR_CREATION_ATTEMPTS){
tries +=1
try{
localDirId ="%s-%04x".format(dateFormat.format(newDate), rand.nextInt(65536))
localDir =newFile(rootDir,"spark-local-"+ localDirId)
if(!localDir.exists){
foundLocalDir = localDir.mkdirs()
}
}catch{
case e:Exception=>
logWarning("Attempt "+ tries +" to create local dir "+ localDir +" failed", e)
}
}
if(!foundLocalDir){
logError("Failed "+ MAX_DIR_CREATION_ATTEMPTS +
" attempts to create local dir in "+ rootDir)
System.exit(ExecutorExitCode.DISK_STORE_FAILED_TO_CREATE_DIR)
}
logInfo("Created local directory at "+ localDir)
localDir
}
}
在DiskStore
裏面,每個block都被存儲爲一個file,經過計算block id的hash值將block映射到文件中,block id與文件路徑的映射關係以下所示:
privatedef getFile(blockId:String):File={
logDebug("Getting file for block "+ blockId)
// Figure out which local directory it hashes to, and which subdirectory in that
val hash =Utils.nonNegativeHash(blockId)
val dirId = hash % localDirs.length
val subDirId =(hash / localDirs.length)% subDirsPerLocalDir
// Create the subdirectory if it doesn't already exist
var subDir = subDirs(dirId)(subDirId)
if(subDir ==null){
subDir = subDirs(dirId).synchronized{
val old = subDirs(dirId)(subDirId)
if(old !=null){
old
}else{
val newDir =newFile(localDirs(dirId),"%02x".format(subDirId))
newDir.mkdir()
subDirs(dirId)(subDirId)= newDir
newDir
}
}
}
newFile(subDir, blockId)
}
根據block id計算出hash值,將hash取模得到dirId
和subDirId
,在subDirs
中找出相應的subDir
,若沒有則新建一個subDir
,最後以subDir
爲路徑、block id爲文件名建立file handler,DiskStore
使用此file handler將block寫入文件內,代碼以下所示:
overridedef putBytes(blockId:String, _bytes:ByteBuffer, level:StorageLevel){
// So that we do not modify the input offsets !
// duplicate does not copy buffer, so inexpensive
val bytes = _bytes.duplicate()
logDebug("Attempting to put block "+ blockId)
val startTime =System.currentTimeMillis
val file = createFile(blockId)
val channel =newRandomAccessFile(file,"rw").getChannel()
while(bytes.remaining >0){
channel.write(bytes)
}
channel.close()
val finishTime =System.currentTimeMillis
logDebug("Block %s stored as %s file on disk in %d ms".format(
blockId,Utils.bytesToString(bytes.limit),(finishTime - startTime)))
}
而獲取block則很是簡單,找到相應的文件並讀取出來便可:
overridedef getBytes(blockId:String):Option[ByteBuffer]={
val file = getFile(blockId)
val bytes = getFileBytes(file)
Some(bytes)
}
所以在DiskStore
中存取block首先是要將block id映射成相應的文件路徑,接着存取文件就能夠了。
相對於DiskStore
須要根據block id hash計算出文件路徑並將block存放到對應的文件裏面,MemoryStore
管理block就顯得很是簡單:MemoryStore
內部維護了一個hash map來管理全部的block,以block id爲key將block存放到hash map中。
caseclassEntry(value:Any, size:Long, deserialized:Boolean)
private val entries =newLinkedHashMap[String,Entry](32,0.75f,true)
在MemoryStore
中存放block必須確保內存足夠容納下該block,若內存不足則會將block寫到文件中,具體的代碼以下所示:
overridedef putBytes(blockId:String, _bytes:ByteBuffer, level:StorageLevel){
// Work on a duplicate - since the original input might be used elsewhere.
val bytes = _bytes.duplicate()
bytes.rewind()
if(level.deserialized){
val values = blockManager.dataDeserialize(blockId, bytes)
val elements =newArrayBuffer[Any]
elements ++= values
val sizeEstimate =SizeEstimator.estimate(elements.asInstanceOf[AnyRef])
tryToPut(blockId, elements, sizeEstimate,true)
}else{
tryToPut(blockId, bytes, bytes.limit,false)
}
}
在tryToPut()
中,首先調用ensureFreeSpace()
確保空閒內存是否足以容納block,若能夠則將該block放入hash map中進行管理;若不足以容納則經過調用dropFromMemory()
將block寫入文件。
privatedef tryToPut(blockId:String, value:Any, size:Long, deserialized:Boolean):Boolean={
// TODO: Its possible to optimize the locking by locking entries only when selecting blocks
// to be dropped. Once the to-be-dropped blocks have been selected, and lock on entries has been
// released, it must be ensured that those to-be-dropped blocks are not double counted for
// freeing up more space for another block that needs to be put. Only then the actually dropping
// of blocks (and writing to disk if necessary) can proceed in parallel.
putLock.synchronized{
if(ensureFreeSpace(blockId, size)){
val entry =newEntry(value, size, deserialized)
entries.synchronized{
entries.put(blockId, entry)
currentMemory += size
}
if(deserialized){
logInfo("Block %s stored as values to memory (estimated size %s, free %s)".format(
blockId,Utils.bytesToString(size),Utils.bytesToString(freeMemory)))
}else{
logInfo("Block %s stored as bytes to memory (size %s, free %s)".format(
blockId,Utils.bytesToString(size),Utils.bytesToString(freeMemory)))
}
true
}else{
// Tell the block manager that we couldn't put it in memory so that it can drop it to
// disk if the block allows disk storage.
val data =if(deserialized){
Left(value.asInstanceOf[ArrayBuffer[Any]])
}else{
Right(value.asInstanceOf[ByteBuffer].duplicate())
}
blockManager.dropFromMemory(blockId, data)
false
}
}
}
而從MemoryStore
中取得block則很是簡單,只需從hash map中取出block id對應的value便可。
overridedef getValues(blockId:String):Option[Iterator[Any]]={
val entry = entries.synchronized{
entries.get(blockId)
}
if(entry ==null){
None
}elseif(entry.deserialized){
Some(entry.value.asInstanceOf[ArrayBuffer[Any]].iterator)
}else{
val buffer = entry.value.asInstanceOf[ByteBuffer].duplicate()// Doesn't actually copy data
Some(blockManager.dataDeserialize(blockId, buffer))
}
}
上面介紹了DiskStore
和MemoryStore
對於block的存取操做,那麼咱們是要直接與它們交互存取數據嗎,仍是封裝了更抽象的接口使咱們無需關心底層?
BlockManager
爲咱們提供了put()
和get()
函數,用戶可使用這兩個函數對block進行存取而無需關心底層實現。
首先咱們來看一下put()
函數的實現:
def put(blockId:String, values:ArrayBuffer[Any], level:StorageLevel,
tellMaster:Boolean=true):Long={
...
// Remember the block's storage level so that we can correctly drop it to disk if it needs
// to be dropped right after it got put into memory. Note, however, that other threads will
// not be able to get() this block until we call markReady on its BlockInfo.
val myInfo ={
val tinfo =newBlockInfo(level, tellMaster)
// Do atomically !
val oldBlockOpt = blockInfo.putIfAbsent(blockId, tinfo)
if(oldBlockOpt.isDefined){
if(oldBlockOpt.get.waitForReady()){
logWarning("Block "+ blockId +" already exists on this machine; not re-adding it")
return oldBlockOpt.get.size
}
// TODO: So the block info exists - but previous attempt to load it (?) failed. What do we do now ? Retry on it ?
oldBlockOpt.get
}else{
tinfo
}
}
val startTimeMs =System.currentTimeMillis
// If we need to replicate the data, we'll want access to the values, but because our
// put will read the whole iterator, there will be no values left. For the case where
// the put serializes data, we'll remember the bytes, above; but for the case where it
// doesn't, such as deserialized storage, let's rely on the put returning an Iterator.
var valuesAfterPut:Iterator[Any]=null
// Ditto for the bytes after the put
var bytesAfterPut:ByteBuffer=null
// Size of the block in bytes (to return to caller)
var size =0L
myInfo.synchronized{
logTrace("Put for block "+ blockId +" took "+Utils.getUsedTimeMs(startTimeMs)
+" to get into synchronized block")
var marked =false
try{
if(level.useMemory){
// Save it just to memory first, even if it also has useDisk set to true; we will later
// drop it to disk if the memory store can't hold it.
val res = memoryStore.putValues(blockId, values, level,true)
size = res.size
res.data match {
caseRight(newBytes)=> bytesAfterPut = newBytes
caseLeft(newIterator)=> valuesAfterPut = newIterator
}
}else{
// Save directly to disk.
// Don't get back the bytes unless we replicate them.
val askForBytes = level.replication >1
val res = diskStore.putValues(blockId, values, level, askForBytes)
size = res.size
res.data match {
caseRight(newBytes)=> bytesAfterPut = newBytes
case _ =>
}
}
// Now that the block is in either the memory or disk store, let other threads read it,
// and tell the master about it.
marked =true
myInfo.markReady(size)
if(tellMaster){
reportBlockStatus(blockId, myInfo)
}
}finally{
// If we failed at putting the block to memory/disk, notify other possible readers
// that it has failed, and then remove it from the block info map.
if(! marked){
// Note that the remove must happen before markFailure otherwise another thread
// could've inserted a new BlockInfo before we remove it.
blockInfo.remove(blockId)
myInfo.markFailure()
logWarning("Putting block "+ blockId +" failed")
}
}
}
logDebug("Put block "+ blockId +" locally took "+Utils.getUsedTimeMs(startTimeMs))
// Replicate block if required
if(level.replication >1){
val remoteStartTime =System.currentTimeMillis
// Serialize the block if not already done
if(bytesAfterPut ==null){
if(valuesAfterPut ==null){
thrownewSparkException(
"Underlying put returned neither an Iterator nor bytes! This shouldn't happen.")
}
bytesAfterPut = dataSerialize(blockId, valuesAfterPut)
}
replicate(blockId, bytesAfterPut, level)
logDebug("Put block "+ blockId +" remotely took "+Utils.getUsedTimeMs(remoteStartTime))
}
BlockManager.dispose(bytesAfterPut)
return size
}
對於put()
操做,主要分爲如下3個步驟:
BlockInfo
結構體存儲block相關信息,同時將其加鎖使其不能被訪問。接着咱們來看一下get()
函數的實現:
defget(blockId:String):Option[Iterator[Any]]={
val local= getLocal(blockId)
if(local.isDefined){
logInfo("Found block %s locally".format(blockId))
returnlocal
}
val remote = getRemote(blockId)
if(remote.isDefined){
logInfo("Found block %s remotely".format(blockId))
return remote
}
None
}
get()
首先會從local的BlockManager
中查找block,若是找到則返回相應的block,若local沒有找到該block,則發起請求從其餘的executor上的BlockManager
中查找block。在一般狀況下Spark任務的分配是根據block的分佈決定的,任務每每會被分配到擁有block的節點上,所以getLocal()
就能找到所需的block;可是在資源有限的狀況下,Spark會將任務調度到與block不一樣的節點上,這樣就必須經過getRemote()
來得到block。
咱們先來看一下getLocal()
:
def getLocal(blockId:String):Option[Iterator[Any]]={
logDebug("Getting local block "+ blockId)
val info = blockInfo.get(blockId).orNull
if(info !=null){
info.synchronized{
// In the another thread is writing the block, wait for it to become ready.
if(!info.waitForReady()){
// If we get here, the block write failed.
logWarning("Block "+ blockId +" was marked as failure.")
returnNone
}
val level = info.level
logDebug("Level for block "+ blockId +" is "+ level)
// Look for the block in memory
if(level.useMemory){
logDebug("Getting block "+ blockId +" from memory")
memoryStore.getValues(blockId) match {
caseSome(iterator)=>
returnSome(iterator)
caseNone=>
logDebug("Block "+ blockId +" not found in memory")
}
}
// Look for block on disk, potentially loading it back into memory if required
if(level.useDisk){
logDebug("Getting block "+ blockId +" from disk")
if(level.useMemory && level.deserialized){
diskStore.getValues(blockId) match {
caseSome(iterator)=>
// Put the block back in memory before returning it
// TODO: Consider creating a putValues that also takes in a iterator ?
val elements =newArrayBuffer[Any]
elements ++= iterator
memoryStore.putValues(blockId, elements, level,true).data match {
caseLeft(iterator2)=>
returnSome(iterator2)
case _ =>
thrownewException("Memory store did not return back an iterator")
}
caseNone=>
thrownewException("Block "+ blockId +" not found on disk, though it should be")
}
}elseif(level.useMemory &&!level.deserialized){
// Read it as a byte buffer into memory first, then return it
diskStore.getBytes(blockId) match {
caseSome(bytes)=>
// Put a copy of the block back in memory before returning it. Note that we can't
// put the ByteBuffer returned by the disk store as that's a memory-mapped file.
// The use of rewind assumes this.
assert(0== bytes.position())
val copyForMemory =ByteBuffer.allocate(bytes.limit)
copyForMemory.put(bytes)
memoryStore.putBytes(blockId, copyForMemory, level)
bytes.rewind()
returnSome(dataDeserialize(blockId, bytes))
caseNone=>
thrownewException("Block "+ blockId +" not found on disk, though it should be")
}
}else{
diskStore.getValues(blockId) match {
caseSome(iterator)=>
returnSome(iterator)
caseNone=>
thrownewException("Block "+ blockId +" not found on disk, though it should be")
}
}
}
}
}else{
logDebug("Block "+ blockId +" not registered locally")
}
returnNone
}
getLocal()
首先會根據block id得到相應的BlockInfo
並從中取出該block的storage level,根據storage level的不一樣getLocal()
又進入如下不一樣分支:
接下來咱們來看一下getRemote()
:
def getRemote(blockId:String):Option[Iterator[Any]]={
if(blockId ==null){
thrownewIllegalArgumentException("Block Id is null")
}
logDebug("Getting remote block "+ blockId)
// Get locations of block
val locations = master.getLocations(blockId)
// Get block from remote locations
for(loc <- locations){
logDebug("Getting remote block "+ blockId +" from "+ loc)
val data =BlockManagerWorker.syncGetBlock(
GetBlock(blockId),ConnectionManagerId(loc.host, loc.port))
if(data !=null){
returnSome(dataDeserialize(blockId, data))
}
logDebug("The value of block "+ blockId +" is null")
}
logDebug("Block "+ blockId +" not found")
returnNone
}
getRemote()
首先取得該block的全部location信息,而後根據location向遠端發送請求獲取block,只要有一個遠端返回block該函數就返回而不繼續發送請求。
至此咱們簡單介紹了BlockManager
類中的get()
和put()
函數,使用這兩個函數外部類能夠輕易地存取block數據。
在storage模塊裏面全部的操做都是和block相關的,可是在RDD裏面全部的運算都是基於partition的,那麼partition是如何與block對應上的呢?
RDD計算的核心函數是iterator()
函數:
finaldef iterator(split:Partition, context:TaskContext):Iterator[T]={
if(storageLevel !=StorageLevel.NONE){
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
}else{
computeOrReadCheckpoint(split, context)
}
}
若是當前RDD的storage level不是NONE的話,表示該RDD在BlockManager
中有存儲,那麼調用CacheManager
中的getOrCompute()
函數計算RDD,在這個函數中partition和block發生了關係:
首先根據RDD id和partition index構造出block id (rdd_xx_xx),接着從BlockManager
中取出相應的block。
BlockManager
中,所以取出便可,無需再從新計算。computeOrReadCheckpoint()
函數計算出新的block,並將其存儲到BlockManager
中。須要注意的是block的計算和存儲是阻塞的,若另外一線程也須要用到此block則需等到該線程block的loading結束。
def getOrCompute[T](rdd: RDD[T], split:Partition, context:TaskContext, storageLevel:StorageLevel)
:Iterator[T]={
val key ="rdd_%d_%d".format(rdd.id, split.index)
logDebug("Looking for partition "+ key)
blockManager.get(key) match {
caseSome(values)=>
// Partition is already materialized, so just return its values
return values.asInstanceOf[Iterator[T]]
caseNone=>
// Mark the split as loading (unless someone else marks it first)
loading.synchronized{
if(loading.contains(key)){
logInfo("Another thread is loading %s, waiting for it to finish...".format (key))
while(loading.contains(key)){
try{loading.wait()}catch{case _ :Throwable=>}
}
logInfo("Finished waiting for %s".format(key))
// See whether someone else has successfully loaded it. The main way this would fail
// is for the RDD-level cache eviction policy if someone else has loaded the same RDD
// partition but we didn't want to make space for it. However, that case is unlikely
// because it's unlikely that two threads would work on the same RDD partition. One
// downside of the current code is that threads wait serially if this does happen.
blockManager.get(key) match {
caseSome(values)=>
return values.asInstanceOf[Iterator[T]]
caseNone=>
logInfo("Whoever was loading %s failed; we'll try it ourselves".format (key))
loading.add(key)
}
}else{
loading.add(key)
}
}
try{
// If we got here, we have to load the split
logInfo("Partition %s not found, computing it".format(key))
val computedValues = rdd.computeOrReadCheckpoint(split, context)
// Persist the result, so long as the task is not running locally
if(context.runningLocally){return computedValues }
val elements =newArrayBuffer[Any]
elements ++= computedValues
blockManager.put(key, elements, storageLevel,true)
return elements.iterator.asInstanceOf[Iterator[T]]
}finally{
loading.synchronized{
loading.remove(key)
loading.notifyAll()
}
}
}
}
這樣RDD的transformation、action就和block數據創建了聯繫,雖然抽象上咱們的操做是在partition層面上進行的,可是partition最終仍是被映射成爲block,所以實際上咱們的全部操做都是對block的處理和存取。
本文就storage模塊的兩個層面進行了介紹-通訊層和存儲層。通訊層中簡單介紹了類結構和組成以及類在通訊層中所扮演的不一樣角色,還有不一樣角色之間通訊的報文,同時簡單介紹了通訊層的啓動和註冊細節。存儲層中分別介紹了DiskStore
和MemoryStore
中對於block的存和取的實現代碼,同時分析了BlockManager
中put()
和get()
接口,最後簡單介紹了Spark RDD中的partition與BlockManager
中的block之間的關係,以及如何交互存取block的。
本文從總體上分析了storage模塊的實現,並未就具體實現作很是細節的分析,相信在看完本文對storage模塊有一個總體的印象之後再去分析細節的實現會有事半功倍的效果。