架構
前三章從 job 的角度介紹了用戶寫的 program 如何一步步地被分解和執行。這一章主要從架構的角度來討論 master,worker,driver 和 executor 之間怎麼協調來完成整個 job 的運行。
實在不想在文檔中貼過多的代碼,這章貼這麼多,只是爲了方面本身回頭 debug 的時候能夠迅速定位,不想看代碼的話,直接看圖和描述便可。
部署圖
從新貼一下 Overview 中給出的部署圖:
接下來分階段討論並細化這個圖。
Job 提交
下圖展現了driver program(假設在 master node 上運行)如何生成 job,並提交到 worker node 上執行。
Driver 端的邏輯若是用代碼表示:
finalRDD.action()
=> sc.runJob()
// generate job, stages and tasks
=> dagScheduler.runJob()
=> dagScheduler.submitJob()
=> dagSchedulerEventProcessActor ! JobSubmitted
=> dagSchedulerEventProcessActor.JobSubmitted()
=> dagScheduler.handleJobSubmitted()
=> finalStage = newStage()
=> mapOutputTracker.registerShuffle(shuffleId, rdd.partitions.size)
=> dagScheduler.submitStage()
=> missingStages = dagScheduler.getMissingParentStages()
=> dagScheduler.subMissingTasks(readyStage)
// add tasks to the taskScheduler
=> taskScheduler.submitTasks(new TaskSet(tasks))
=> fifoSchedulableBuilder.addTaskSetManager(taskSet)
// send tasks
=> sparkDeploySchedulerBackend.reviveOffers()
=> driverActor ! ReviveOffers
=> sparkDeploySchedulerBackend.makeOffers()
=> sparkDeploySchedulerBackend.launchTasks()
=> foreach task
CoarseGrainedExecutorBackend(executorId) ! LaunchTask(serializedTask)
代碼的文字描述: 當用戶的 program 調用
val sc = new SparkContext(sparkConf)
時,這個語句會幫助 program 啓動諸多有關 driver 通訊、job 執行的對象、線程、actor等,
該語句確立了 program 的 driver 地位。
生成 Job 邏輯執行圖
Driver program 中的 transformation() 創建 computing chain(一系列的 RDD),每一個 RDD 的 compute() 定義數據來了怎麼計算獲得該 RDD 中 partition 的結果,getDependencies() 定義 RDD 之間 partition 的數據依賴。
生成 Job 物理執行圖
每一個 action() 觸發生成一個 job,在 dagScheduler.runJob() 的時候進行 stage 劃分,在 submitStage() 的時候生成該 stage 包含的具體的 ShuffleMapTasks 或者 ResultTasks,而後將 tasks 打包成 TaskSet 交給 taskScheduler,若是 taskSet 能夠運行就將 tasks 交給 sparkDeploySchedulerBackend 去分配執行。
分配 Task
sparkDeploySchedulerBackend 接收到 taskSet 後,會經過自帶的 DriverActor 將 serialized tasks 發送到調度器指定的 worker node 上的 CoarseGrainedExecutorBackend Actor上。
Job 接收
Worker 端接收到 tasks 後,執行以下操做
coarseGrainedExecutorBackend ! LaunchTask(serializedTask)
=> executor.launchTask()
=> executor.threadPool.execute(new TaskRunner(taskId, serializedTask))
executor 將 task 包裝成 taskRunner,並從線程池中抽取出一個空閒線程運行 task。一個 CoarseGrainedExecutorBackend 進程有且僅有一個 executor 對象。
Task 運行
下圖展現了 task 被分配到 worker node 上後的執行流程及 driver 如何處理 task 的 result。
Executor 收到 serialized 的 task 後,先 deserialize 出正常的 task,而後運行 task 獲得其執行結果 directResult,這個結果要送回到 driver 那裏。可是經過 Actor 發送的數據包不易過大,
若是 result 比較大(好比 groupByKey 的 result)先把 result 存放到本地的「內存+磁盤」上,由 blockManager 來管理,只把存儲位置信息(indirectResult)發送給 driver,driver 須要實際的 result 的時候,會經過 HTTP 去 fetch。若是 result 不大(小於
spark.akka.frameSize = 10MB
),那麼直接發送給 driver。 上面的描述還有一些細節:若是 task 運行結束生成的 directResult > akka.frameSize,directResult 會被存放到由 blockManager 管理的本地「內存+磁盤」上。
BlockManager 中的 memoryStore 開闢了一個 LinkedHashMap 來存儲要存放到本地內存的數據。LinkedHashMap 存儲的數據總大小不超過
Runtime.getRuntime.maxMemory * spark.storage.memoryFraction(default 0.6)
。若是 LinkedHashMap 剩餘空間不足以存放新來的數據,就將數據交給 diskStore 存放到磁盤上,但前提是該數據的 storageLevel 中包含「磁盤」。
In TaskRunner.run()
// deserialize task, run it and then send the result to
=> coarseGrainedExecutorBackend.statusUpdate()
=> task = ser.deserialize(serializedTask)
=> value = task.run(taskId)
=> directResult = new DirectTaskResult(ser.serialize(value))
=> if( directResult.size() > akkaFrameSize() )
indirectResult = blockManager.putBytes(taskId, directResult, MEMORY+DISK+SER)
else
return directResult
=> coarseGrainedExecutorBackend.statusUpdate(result)
=> driver ! StatusUpdate(executorId, taskId, result)
ShuffleMapTask 和 ResultTask 生成的 result 不同。
ShuffleMapTask 生成的是 MapStatus,MapStatus 包含兩項內容:一是該 task 所在的 BlockManager 的 BlockManagerId(實際是 executorId + host, port, nettyPort),二是 task 輸出的每一個 FileSegment 大小。
ResultTask 生成的 result 的是 func 在 partition 上的執行結果。好比 count() 的 func 就是統計 partition 中 records 的個數。因爲 ShuffleMapTask 須要將 FileSegment 寫入磁盤,所以須要輸出流 writers,這些 writers 是由 blockManger 裏面的 shuffleBlockManager 產生和控制的。
In task.run(taskId)
// if the task is ShuffleMapTask
=> shuffleMapTask.runTask(context)
=> shuffleWriterGroup = shuffleBlockManager.forMapTask(shuffleId, partitionId, numOutputSplits)
=> shuffleWriterGroup.writers(bucketId).write(rdd.iterator(split, context))
=> return MapStatus(blockManager.blockManagerId, Array[compressedSize(fileSegment)])
//If the task is ResultTask
=> return func(context, rdd.iterator(split, context))
Driver 收到 task 的執行結果 result 後會進行一系列的操做:首先告訴 taskScheduler 這個 task 已經執行完,而後去分析 result。因爲 result 多是 indirectResult,須要先調用 blockManager.getRemoteBytes() 去 fech 實際的 result,這個過程下節會詳解。獲得實際的 result 後,須要分狀況分析,
若是是 ResultTask 的 result,那麼可使用 ResultHandler 對 result 進行 driver 端的計算(好比 count() 會對全部 ResultTask 的 result 做 sum),若是 result 是 ShuffleMapTask 的 MapStatus,那麼須要將 MapStatus(ShuffleMapTask 輸出的 FileSegment 的位置和大小信息)
存放到 mapOutputTrackerMaster 中的 mapStatuses 數據結構中以便之後 reducer shuffle 的時候查詢。若是 driver 收到的 task 是該 stage 中的最後一個 task,那麼能夠 submit 下一個 stage,若是該 stage 已是最後一個 stage,那麼告訴 dagScheduler job 已經完成。
After driver receives StatusUpdate(result)
=> taskScheduler.statusUpdate(taskId, state, result.value)
=> taskResultGetter.enqueueSuccessfulTask(taskSet, tid, result)
=> if result is IndirectResult
serializedTaskResult = blockManager.getRemoteBytes(IndirectResult.blockId)
=> scheduler.handleSuccessfulTask(taskSetManager, tid, result)
=> taskSetManager.handleSuccessfulTask(tid, taskResult)
=> dagScheduler.taskEnded(result.value, result.accumUpdates)
=> dagSchedulerEventProcessActor ! CompletionEvent(result, accumUpdates)
=> dagScheduler.handleTaskCompletion(completion)
=> Accumulators.add(event.accumUpdates)
// If the finished task is ResultTask
=> if (job.numFinished == job.numPartitions)
listenerBus.post(SparkListenerJobEnd(job.jobId, JobSucceeded))
=> job.listener.taskSucceeded(outputId, result)
=> jobWaiter.taskSucceeded(index, result)
=> resultHandler(index, result)
// if the finished task is ShuffleMapTask
=> stage.addOutputLoc(smt.partitionId, status)
=> if (all tasks in current stage have finished)
mapOutputTrackerMaster.registerMapOutputs(shuffleId, Array[MapStatus])
mapStatuses.put(shuffleId, Array[MapStatus]() ++ statuses)
=> submitStage(stage)
Shuffle read
上一節描述了 task 運行過程及 result 的處理過程,這一節描述 reducer(須要 shuffle 的 task )是如何獲取到輸入數據的。關於 reducer 如何處理輸入數據已經在上一章的 shuffle read 中解釋了。
問題:reducer 怎麼知道要去哪裏 fetch 數據?
reducer 首先要知道 parent stage 中 ShuffleMapTask 輸出的 FileSegments 在哪一個節點。
這個信息在 ShuffleMapTask 完成時已經送到了 driver 的 mapOutputTrackerMaster,並存放到了 mapStatuses: HashMap<stageid, array[mapstatus]=""> 裏面,給定 stageId,能夠獲取該 stage 中 ShuffleMapTasks 生成的 FileSegments 信息 Array[MapStatus],經過 Array(taskId) 就能夠獲得某個 task 輸出的 FileSegments 位置(blockManagerId)及每一個 FileSegment 大小。 當 reducer 須要 fetch 輸入數據的時候,會首先調用 blockStoreShuffleFetcher 去獲取輸入數據(FileSegments)的位置。blockStoreShuffleFetcher 經過調用本地的 MapOutputTrackerWorker 去完成這個任務,MapOutputTrackerWorker 使用 mapOutputTrackerMasterActorRef 來與 mapOutputTrackerMasterActor 通訊獲取 MapStatus 信息。blockStoreShuffleFetcher 對獲取到的 MapStatus 信息進行加工,提取出該 reducer 應該去哪些節點上獲取哪些 FileSegment 的信息,這個信息存放在 blocksByAddress 裏面。以後,blockStoreShuffleFetcher 將獲取 FileSegment 數據的任務交給 basicBlockFetcherIterator。
rdd.iterator()
=> rdd(e.g., ShuffledRDD/CoGroupedRDD).compute()
=> SparkEnv.get.shuffleFetcher.fetch(shuffledId, split.index, context, ser)
=> blockStoreShuffleFetcher.fetch(shuffleId, reduceId, context, serializer)
=> statuses = MapOutputTrackerWorker.getServerStatuses(shuffleId, reduceId)
=> blocksByAddress: Seq[(BlockManagerId, Seq[(BlockId, Long)])] = compute(statuses)
=> basicBlockFetcherIterator = blockManager.getMultiple(blocksByAddress, serializer)
=> itr = basicBlockFetcherIterator.flatMap(unpackBlock)
basicBlockFetcherIterator 收到獲取數據的任務後,會生成一個個 fetchRequest,
每一個 fetchRequest 包含去某個節點獲取若干個 FileSegments 的任務。圖中展現了 reducer-2 須要從三個 worker node 上獲取所需的白色 FileSegment (FS)。總的數據獲取任務由 blocksByAddress 表示,要從第一個 node 獲取 4 個,從第二個 node 獲取 3 個,從第三個 node 獲取 4 個。 爲了加快任務獲取過程,顯然要將總任務劃分爲子任務(fetchRequest),而後爲每一個任務分配一個線程去 fetch。Spark 爲每一個 reducer 啓動 5 個並行 fetch 的線程(Hadoop 也是默認啓動 5 個)。因爲 fetch 來的數據會先被放到內存做緩衝,所以一次 fetch 的數據不能太多,Spark 設定不能超過
spark.reducer.maxMbInFlight=48MB
。
注意這 48MB 的空間是由這 5 個 fetch 線程共享的,所以在劃分子任務時,儘可能使得 fetchRequest 不超過
48MB / 5 = 9.6MB
。如圖在 node 1 中,Size(FS0-2) + Size(FS1-2) < 9.6MB 可是 Size(FS0-2) + Size(FS1-2) + Size(FS2-2) > 9.6MB,所以要在 t1-r2 和 t2-r2 處斷開,因此圖中有兩個 fetchRequest 都是要去 node 1 fetch。
那麼會不會有 fetchRequest 超過 9.6MB?固然會有,若是某個 FileSegment 特別大,仍然須要一次性將這個 FileSegment fetch 過來。另外,若是 reducer 須要的某些 FileSegment 就在本節點上,那麼直接進行 local read。最後,將 fetch 來的 FileSegment 進行 deserialize,將裏面的 records 以 iterator 的形式提供給 rdd.compute(),整個 shuffle read 結束。
In basicBlockFetcherIterator:
// generate the fetch requests
=> basicBlockFetcherIterator.initialize()
=> remoteRequests = splitLocalRemoteBlocks()
=> fetchRequests ++= Utils.randomize(remoteRequests)
// fetch remote blocks
=> sendRequest(fetchRequests.dequeue()) until Size(fetchRequests) > maxBytesInFlight
=> blockManager.connectionManager.sendMessageReliably(cmId,
blockMessageArray.toBufferMessage)
=> fetchResults.put(new FetchResult(blockId, sizeMap(blockId)))
=> dataDeserialize(blockId, blockMessage.getData, serializer)
// fetch local blocks
=> getLocalBlocks()
=> fetchResults.put(new FetchResult(id, 0, () => iter))
下面再討論一些細節問題:
reducer 如何將 fetchRequest 信息發送到目標節點?目標節點如何處理 fetchRequest 信息,如何讀取 FileSegment 並回送給 reducer?
rdd.iterator() 碰到 ShuffleDependency 時會調用 BasicBlockFetcherIterator 去獲取 FileSegments。BasicBlockFetcherIterator 使用 blockManager 中的 connectionManager 將 fetchRequest 發送給其餘節點的 connectionManager。connectionManager 之間使用 NIO 模式通訊。其餘節點,好比 worker node 2 上的 connectionManager 收到消息後,會交給 blockManagerWorker 處理,blockManagerWorker 使用 blockManager 中的 diskStore 去本地磁盤上讀取 fetchRequest 要求的 FileSegments,而後仍然經過 connectionManager 將 FileSegments 發送回去。若是使用了 FileConsolidation,diskStore 還須要 shuffleBlockManager 來提供 blockId 所在的具體位置。若是 FileSegment 不超過
spark.storage.memoryMapThreshold=8KB
,那麼 diskStore 在讀取 FileSegment 的時候會直接將 FileSegment 放到內存中,不然,會使用 RandomAccessFile 中 FileChannel 的內存映射方法來讀取 FileSegment(這樣能夠將大的 FileSegment 加載到內存)。 當 BasicBlockFetcherIterator 收到其餘節點返回的 serialized FileSegments 後會將其放到 fetchResults: Queue 裏面,並進行 deserialization,因此
fetchResults: Queue 就至關於在 Shuffle details 那一章提到的 softBuffer。若是 BasicBlockFetcherIterator 所需的某些 FileSegments 就在本地,會經過 diskStore 直接從本地文件讀取,並放到 fetchResults 裏面。最後 reducer 一邊從 FileSegment 中邊讀取 records 一邊處理。
After the blockManager receives the fetch request
=> connectionManager.receiveMessage(bufferMessage)
=> handleMessage(connectionManagerId, message, connection)
// invoke blockManagerWorker to read the block (FileSegment)
=> blockManagerWorker.onBlockMessageReceive()
=> blockManagerWorker.processBlockMessage(blockMessage)
=> buffer = blockManager.getLocalBytes(blockId)
=> buffer = diskStore.getBytes(blockId)
=> fileSegment = diskManager.getBlockLocation(blockId)
=> shuffleManager.getBlockLocation()
=> if(fileSegment < minMemoryMapBytes)
buffer = ByteBuffer.allocate(fileSegment)
else
channel.map(MapMode.READ_ONLY, segment.offset, segment.length)
每一個 reducer 都持有一個 BasicBlockFetcherIterator,一個 BasicBlockFetcherIterator 理論上能夠持有 48MB 的 fetchResults。每當 fetchResults 中有一個 FileSegment 被讀取完,就會一會兒去 fetch 不少個 FileSegment,直到 48MB 被填滿。
BasicBlockFetcherIterator.next()
=> result = results.task()
=> while (!fetchRequests.isEmpty &&
(bytesInFlight == 0 || bytesInFlight + fetchRequests.front.size <= maxBytesInFlight)) {
sendRequest(fetchRequests.dequeue())
}
=> result.deserialize()