本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法
一級調度:Cluster Manger (YARN模式下爲ResourceManger , Standalone 模式下爲 Master )負責將資源分配給Application。這裏的資源如:cpu核數,內存,磁盤空間等。緩存
二級調度:Application 進一步將資源分配給Application的各個Task。DAG任務調度,延遲調度等。架構
任務(Task): Task分爲ResultTask和ShuffleMapTask兩種,每個Stage會根據未完成的Partion的多少,建立零到多個Task,DAGScheduer最後將每一個Stage中的Task以任務集合(TaskSet)的形式提交給TaskScheduler繼續處理。app
TaskSchedulerImpl :手握StandaloneSchedulerBackend(StandaloneAppClient)和 CoarseGrainedSchedulerBackend(DriverEndpoint)兩大通信端點神器。操控了整個集羣資源的彙報和資源調度。框架
CoarseGrainedSchedulerBackend:維護了一個executorDataMap,用於實時拿到最新的且活着的executor用於資源分配。ide
StandaloneSchedulerBackend: 持有TaskSchedulerImpl的引用,目前來看,就是爲了初始化啓動StandaloneAppClient和DriverEndpoint終端,經過接受消息,實際幹活的仍是TaskSchedulerImpl。oop
StandaloneAppClient:在Standalone模式下StandaloneSchedulerBackend在啓動的時候構造AppClient實例並在該實例start的時候啓動了ClientEndpoint這個消息循環體。ClientEndpoint在啓動的時候會向Master註冊當前程序。(Interface allowing applications to speak with a Spark standalone cluster manager.)post
SparkContext -> createTaskScheduler -> new TaskSchedulerImpl(sc) ->
new StandaloneSchedulerBackend(scheduler, sc, masterUrls)-> scheduler.initialize(backend)
_taskScheduler.start()-> backend.start()
複製代碼
StandaloneSchedulerBackend->start()
-> super.start() [CoarseGrainedSchedulerBackend]->
createDriverEndpointRef(properties)-> createDriverEndpoint(properties) ->
-> new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf).start()
複製代碼
onStart -> registerWithMaster -> tryRegisterAllMasters -> rpcEnv.setupEndpointRef(masterAddress,
Master.ENDPOINT_NAME) -> masterRef.send(RegisterApplication(appDescription, self))
複製代碼
RegisteredApplication(啓動時向Master註冊)
ApplicationRemoved
ExecutorAdded
ExecutorUpdated
WorkerRemoved
MasterChanged
複製代碼
DriverEndpoint:而StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start的時候會實例化類型爲DriverEndpoint(這就是咱們程序運行時候的經典對象的Driver)的消息循環體,StandaloneSchedulerBackend專門負責收集Worker上的資源信息, 當Worker端的ExecutorBackend啓動的時候會發送RegisteredExecutor信息向DriverEndpoint註冊,學習
StatusUpdate
ReviveOffers -->
KillTask
KillExecutorsOnHost
RemoveExecutor
RegisterExecutor()
StopDriver
StopExecutors
RemoveWorker
複製代碼
Spark調度系統整體規律:在Standalone模式下StandaloneSchedulerBackend在啓動的時候構造AppClient實例並在該實例start的時候啓動了ClientEndpoint這個消息循環體。ClientEndpoint在啓動的時候會向Master註冊當前程序。而StandaloneSchedulerBackend的父類CoarseGrainedSchedulerBackend在start的時候會實例化類型爲DriverEndpoint(這就是咱們程序運行時候的經典對象的Driver)的消息循環體,StandaloneSchedulerBackend專門負責收集Worker上的資源信息,當ExecutorBackend啓動的時候會發送RegisteredExecutor信息向DriverEndpoint註冊,此時StandaloneSchedulerBackend就掌握了當前應用程序擁有的計算資源,TaskScheduler就是經過StandaloneSchedulerBackend擁有的計算資源來具體運行Task。fetch
TaskSchdulerImpl
LiveListenerBus
MapoutTrackerMaster
BlockManagerMaster
SparkEnv
cacheLocas:緩存每一個RDD的全部分區的位置信息,最終創建分區號和位置信息序列映射。爲何是位置序列? 這裏着重講解一下:每個分區可能存在多個副本機制,所以RDD的每個分區的BLock可能存在多個節點的BlockManager上,所以是序列。聽懂了嗎??
new HashMap[Int, IndexedSeq[Seq[TaskLocation]]]
複製代碼
MessageScheduler:職責是對失敗的Stage進行重試,以下面的執行線程代碼段:
private val messageScheduler =
ThreadUtils.newDaemonSingleThreadScheduledExecutor("dag-scheduler-message")
case FetchFailed -> messageScheduler.schedule(new Runnable {
override def run(): Unit = eventProcessLoop.post(ResubmitFailedStages)
}, DAGScheduler.RESUBMIT_TIMEOUT, TimeUnit.MILLISECONDS)
}
複製代碼
getPreferredLocs:重量級方法,分區最大偏好位置獲取。最終把分區最佳偏好位置序列放在cacheLocas中,獲取不到,調用rdd.preferredLocations方法獲取。
getPreferredLocs
-> getPreferredLocsInternal
-> getCacheLocs(rdd)(partition) ---> cacheLocs(rdd.id) = locs() -> 返回 cached
(取不到直接放進內存後,再返回偏好序列)
-> val rddPrefs = rdd.preferredLocations(rdd.partitions(partition)).toList
複製代碼
getCacheLocs 方法代碼段:
def getCacheLocs(rdd: RDD[_]): IndexedSeq[Seq[TaskLocation]] =
cacheLocs.synchronized {
if (!cacheLocs.contains(rdd.id)) {
val locs: IndexedSeq[Seq[TaskLocation]] = if (rdd.getStorageLevel == StorageLevel.NONE) {
IndexedSeq.fill(rdd.partitions.length)(Nil)
} else {
val blockIds =
rdd.partitions.indices.map(index => RDDBlockId(rdd.id, index)).toArray[BlockId]
blockManagerMaster.getLocations(blockIds).map { bms =>
bms.map(bm => TaskLocation(bm.host, bm.executorId))
}
}
cacheLocs(rdd.id) = locs
}
cacheLocs(rdd.id)
}
複製代碼
eventProccessLoop:大名鼎鼎的DAGSchedulerEventProcessLoop,事件處理線程,負責處理各類事件,如:
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener,
properties)
case MapStageSubmitted(jobId, dependency, callSite, listener, properties) =>
dagScheduler.handleMapStageSubmitted(jobId, dependency, callSite, listener, properties)
case StageCancelled(stageId) =>
dagScheduler.handleStageCancellation(stageId)
case JobCancelled(jobId) =>
dagScheduler.handleJobCancellation(jobId)
case JobGroupCancelled(groupId) =>
dagScheduler.handleJobGroupCancelled(groupId)
case AllJobsCancelled =>
dagScheduler.doCancelAllJobs()
case ExecutorAdded(execId, host) =>
dagScheduler.handleExecutorAdded(execId, host)
case ExecutorLost(execId) =>
dagScheduler.handleExecutorLost(execId, fetchFailed = false)
case BeginEvent(task, taskInfo) =>
dagScheduler.handleBeginEvent(task, taskInfo)
case GettingResultEvent(taskInfo) =>
dagScheduler.handleGetTaskResult(taskInfo)
case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
case TaskSetFailed(taskSet, reason, exception) =>
dagScheduler.handleTaskSetFailed(taskSet, reason, exception)
case ResubmitFailedStages =>
dagScheduler.resubmitFailedStages()
複製代碼
-> DAGScheduler.runJob->submitJob(rdd, func, partitions, callSite, resultHandler,
properties)
-> DAGScheduler.eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
-> DAGSchedulerEventProcessLoop-> case JobSubmitted(jobId, dependency, callSite, listener, properties)->
-> DAGScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
-> DAGScheduler.createResultStage(finalRDD, func, partitions, jobId, callSite) (廣度優先算法,創建無環圖)
-> DAGScheduler.submitStage(finalStage)
-> DAGScheduler.submitMissingTasks(stage, jobId.get) (提交最前面的Stage0)
-> taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId, properties))
-> taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptNumber, jobId,
properties))
-> createTaskSetManager(taskSet, maxTaskFailures) (TaskSchedulerImpl.submitTasks方法內)
-> taskScheduler ->initialize -> FIFOSchedulableBuilder ->buildPools (TaskSchedulerImpl初始化構建)
-> schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
(TaskSchedulerImpl.submitTasks方法內, TaskSetManager pool ,遵循FIFO)
-> backend.reviveOffers()(CoarseGrainedSchedulerBackend)
-> driverEndpoint.send(ReviveOffers)
-> DriverEndpoint.receive -> case ReviveOffers makeOffers() (makeOffers封裝公共調度任務調度方法)
-> makeOffers(得到活着的Executor及executorHost和freeCores) -> workOffers(CoarseGrainedSchedulerBackend內部)
-> TaskSchedulerImpl.resourceOffers(workOffers)(CoarseGrainedSchedulerBackend引用TaskSchedulerImpl)
-> val sortedTaskSets = TaskSchedulerImpl.rootPool.getSortedTaskSetQueue (TaskSetManager上場)
-> TaskSchedulerImpl.resourceOfferSingleTaskSet(
taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)t
-> TaskSetManager.resourceOffer(execId, host, maxLocality))
-> TaskSchedulerImpl.getAllowedLocalityLevel(curTime) (延遲調度,期待個人下一篇精彩講解)
-> TaskSetManager.dequeueTask(execId, host, allowedLocality)
-> sched.dagScheduler.taskStarted(task, info)
-> new TaskInfo(taskId, index, attemptNum, curTime, execId, host, taskLocality, speculative)
複製代碼
反向驅動:長江後浪推前浪,這裏我發現一個奇怪的事情,Spark 2.0版本stage的反向驅動算法和Spark 2.3的竟然不同,這裏以Spark 2.3爲準:
DAGScheduler進行submitStage提交後使命就結束了,最終實現submitStage正向提交任務集合便可:
-> DAGScheduler. handleJobSubmitted
-> finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
(封裝 finalRDD 和func爲最後的Stage)
-> submitStage(finalStage) -> submitMissingTasks
複製代碼
可是createResultStage反向驅動算法精彩開始了:
-> createResultStage
-> getOrCreateParentStages -> getShuffleDependencies (獲取父依賴)
-> getOrCreateShuffleMapStage -> getShuffleDependencies (不斷循環,造成遞歸)
-> createShuffleMapStage
複製代碼
如下代碼段做用是獲取父stage,有的話直接返回,沒有就從新根據final RDD父依賴來建立Stage。注意這裏會不斷遞歸調用getOrCreateParentStages,最終創建Stage,也所以
private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
getShuffleDependencies(rdd).map { shuffleDep =>
getOrCreateShuffleMapStage(shuffleDep, firstJobId)
}.toList
複製代碼
}
getOrCreateShuffleMapStage:建立依賴的依賴的所在Stage,有的話會直接獲取,沒有就優先建立 父Stage,而後執行子Stage.
private def getOrCreateShuffleMapStage(
shuffleDep: ShuffleDependency[_, _, _],
firstJobId: Int): ShuffleMapStage = {
shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
case Some(stage) =>
stage
case None =>
// Create stages for all missing ancestor shuffle dependencies.
getMissingAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>
// Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
// that were not already in shuffleIdToMapStage, it's possible that by the time we
// get to a particular dependency in the foreach loop, it's been added to
// shuffleIdToMapStage by the stage creation process for an earlier dependency. See
// SPARK-13902 for more information.
if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
createShuffleMapStage(dep, firstJobId)
}
}
// Finally, create a stage for the given shuffle dependency.
createShuffleMapStage(shuffleDep, firstJobId)
複製代碼
} }
createShuffleMapStage:最終落地方法,就是要返回須要的stage,注意阻塞點就在getOrCreateParentStages,從而一直遞歸到最頂層。
def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
val rdd = shuffleDep.rdd
val numTasks = rdd.partitions.length
val parents = getOrCreateParentStages(rdd, jobId)
val id = nextStageId.getAndIncrement()
val stage = new ShuffleMapStage(
id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep, mapOutputTracker)
stageIdToStage(id) = stage
shuffleIdToMapStage(shuffleDep.shuffleId) = stage
updateJobIdStageIdMaps(jobId, stage)
if (!mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
}
stage
}
複製代碼
總流程是:先建立最頂層Satge,慢慢遞歸執行建立子stage,相似於堆棧模型。
總流程爲何是這樣呢?如無環圖: G -> F A -> H -> M L->N,由於由createShuffleMapStage作反向驅動,阻塞點在就在方法內的getOrCreateParentStages,所以先把建立(Final Stage的父stage的建立方法 F, A)放在堆棧底部,不斷向上存放(F, A以上的父Stage建立方法)依次到堆棧,但卻不執行Stage建立,直到最後最頂層Stage建立方法放到堆棧時,在獲得rdd的頂級父親時,開始執行最頂層Stage建立方法,也即createShuffleMapStage開始從阻塞點遞歸創建依賴關係,註冊Shuffle,執行createShuffleMapStage方法體,而後依次從阻塞點遞歸向下執行。
注意 getOrCreateShuffleMapStage的緩存機制,即shuffleIdToMapStage,實現了依賴的直接獲取,不用再重複執行,如F,A的父Stage的獲取。
G -> F A -> H -> M L->N
1 先從 F A 的父依賴開始 開始遞歸構建stage
2 進而開始創建H以上的stagey以及依賴關係。
3 後創建 F A 與 H stage 的關係,注意H Stage是從shuffleIdToMapStage拿到的,
最後返回F,A stage ,創建 final RDD(G)與F A的依賴關係
4 最終提交submitStage(finalStage)
最終stage構建順序爲: N -> M L -> H-> F A -> G
複製代碼
本節內容是做者投入大量時間優化後的內容,採用最平實的語言來剖析Spark的任務調度,如今時間爲凌晨1:22,最後放張圖DAGScheduler.handleTaskCompletion,正向提交任務,放飛吧,spark。