上一章講了RDD的轉換,可是沒講做業的運行,它和Driver Program的關係是啥,和RDD的關係是啥?html
官方給的例子裏面,一執行collect方法就能出結果,那咱們就從collect開始看吧,進入RDD,找到collect方法。node
def collect(): Array[T] = { val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray) Array.concat(results: _*) }
它進行了兩個操做:apache
一、調用SparkContext的runJob方法,把自身的引用傳入去,再傳了一個匿名函數(把Iterator轉換成Array數組)數組
二、把result結果合併成一個Array,注意results是一個Array[Array[T]]類型,因此第二句的那個寫法纔會那麼奇怪。這個操做是很重的一個操做,若是結果很大的話,這個操做是會報OOM的,由於它是把結果保存在Driver程序的內存當中的result數組裏面。數據結構
咱們點進去runJob這個方法吧。app
val callSite = getCallSite val cleanedFunc = clean(func) dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get) rdd.doCheckpoint()
追蹤下去,咱們會發現通過多個不一樣的runJob同名函數調用以後,執行job做業靠的是dagScheduler,最後把結果經過resultHandler保存返回。dom
好的,咱們繼續看DAGScheduler的runJob方法,提交做業,而後等待結果,成功什麼都不作,失敗拋出錯誤,咱們接着看submitJob方法。jvm
val jobId = nextJobId.getAndIncrement() val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 記錄做業成功與失敗的數據結構,一個做業的Task數量是和分片的數量一致的,Task成功以後調用resultHandler保存結果。 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
走到這裏,感受有點兒繞了,爲何到了這裏,還不直接運行呢,還要給eventProcessActor發送一個JobSubmitted請求呢,new一個線程和這個區別有多大?ide
無論了,搜索一下eventProcessActor吧,結果發現它是一個DAGSchedulerEventProcessActor,它的定義也在DAGScheduler這個類裏面。它的receive方法裏面定義了12種事件的處理方法,這裏咱們只須要看函數
JobSubmitted的就行,它也是調用了自身的handleJobSubmitted方法。可是這裏很奇怪,沒辦法打斷點調試,可是它的結果卻是能返回的,所以咱們得用另一種方式,打開test工程,找到scheduler目錄下的DAGSchedulerSuite這個類,咱們本身寫一個test方法,首先咱們要在import那裏加上import org.apache.spark.SparkContext._ ,而後加上這一段測試代碼。
test("run shuffle") { val rdd1 = sc.parallelize(1 to 100, 4) val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1) val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i)) val rdd4 = rdd3.reduceByKey(_ + _) submit(rdd4, Array(0,1,2,3)) complete(taskSets(0), Seq( (Success, makeMapStatus("hostA", 1)), (Success, makeMapStatus("hostB", 1)))) complete(taskSets(1), Seq((Success, 42))) complete(taskSets(2), Seq( (Success, makeMapStatus("hostA", 2)), (Success, makeMapStatus("hostB", 2)))) complete(taskSets(3), Seq((Success, 68))) }
這個例子的重點仍是shuffle那塊,另外也包括了map的多個轉換,你們能夠按照這個例子去測試下。
咱們接着看handleJobSubmitted吧。
var finalStage: Stage = null try { finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite)) } catch { // 錯誤處理,告訴監聽器做業失敗,返回.... } if (finalStage != null) { val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties) clearCacheLocs() if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) { // 很短、沒有父stage的本地操做,好比 first() or take() 的操做本地執行. listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties)) runLocally(job) } else { // collect等操做走的是這個過程,更新相關的關係映射,用監聽器監聽,而後提交做業 jobIdToActiveJob(jobId) = job activeJobs += job resultStageToJob(finalStage) = job listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties)) // 提交stage submitStage(finalStage) } } // 提交stage submitWaitingStages()
從上面這個方法來看,咱們應該重點關注newStage方法、submitStage方法和submitWaitingStages方法。
咱們先看newStage,它獲得的結果叫作finalStage,挺奇怪的哈,爲啥?先看吧
val id = nextStageId.getAndIncrement() val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite) stageIdToStage(id) = stage updateJobIdStageIdMaps(jobId, stage) stageToInfos(stage) = StageInfo.fromStage(stage) stage
能夠看出來Stage也沒有太多的東西可言,它就是把rdd給傳了進去,tasks的數量,shuffleDep是空,parentStage。
那它的parentStage是啥呢?
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = { val parents = new HashSet[Stage] val visited = new HashSet[RDD[_]] def visit(r: RDD[_]) { if (!visited(r)) { visited += r // 在visit函數裏面,只有存在ShuffleDependency的,parent才經過getShuffleMapStage計算出來 for (dep <- r.dependencies) { dep match { case shufDep: ShuffleDependency[_,_] => parents += getShuffleMapStage(shufDep, jobId) case _ => visit(dep.rdd) } } } } visit(rdd) parents.toList }
它是經過不停的遍歷它以前的rdd,若是碰到有依賴是ShuffleDependency類型的,就經過getShuffleMapStage方法計算出來它的Stage來。
那咱們就開始看submitStage方法吧。
private def submitStage(stage: Stage) { //... val missing = getMissingParentStages(stage).sortBy(_.id) logDebug("missing: " + missing) if (missing == Nil) { // 沒有父stage,執行這stage的tasks submitMissingTasks(stage, jobId.get) runningStages += stage } else { // 提交父stage的task,這裏是個遞歸,真正的提交在上面的註釋的地方 for (parent <- missing) { submitStage(parent) } // 暫時不能提交的stage,先添加到等待隊列 waitingStages += stage } } }
這個提交stage的過程是一個遞歸的過程,它是先要把父stage先提交,而後把本身添加到等待隊列中,直到沒有父stage以後,就提交該stage中的任務。等待隊列在最後的submitWaitingStages方法中提交。
這裏我引用一下上一章當中我所畫的那個圖來表示這個過程哈。
從getParentStages方法能夠看出來,RDD當中存在ShuffleDependency的Stage纔會有父Stage, 也就是圖中的虛線的位置!
因此咱們只須要記住凡是涉及到shuffle的做業都會至少有兩個Stage,即shuffle前和shuffle後。
那咱們接着看submitMissingTasks方法,下面是主體代碼。
private def submitMissingTasks(stage: Stage, jobId: Int) { val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet) myPending.clear() var tasks = ArrayBuffer[Task[_]]() if (stage.isShuffleMap) { // 這是shuffle stage的狀況 for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) { val locs = getPreferredLocs(stage.rdd, p) tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs) } } else { // 這是final stage的狀況 val job = resultStageToJob(stage) for (id <- 0 until job.numPartitions if !job.finished(id)) { val partition = job.partitions(id) val locs = getPreferredLocs(stage.rdd, partition) tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id) } } if (tasks.size > 0) { myPending ++= tasks taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties)) stageToInfos(stage).submissionTime = Some(System.currentTimeMillis()) } else { runningStages -= stage } }
Task也是有兩類的,一種是ShuffleMapTask,一種是ResultTask,咱們須要注意這兩種Task的runTask方法。最後Task是經過taskScheduler.submitTasks來提交的。
咱們找到TaskSchedulerImpl裏面看這個方法。
override def submitTasks(taskSet: TaskSet) { val tasks = taskSet.tasksthis.synchronized { val manager = new TaskSetManager(this, taskSet, maxTaskFailures) activeTaskSets(taskSet.id) = manager schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties) hasReceivedTask = true } backend.reviveOffers() }
調度器有兩種模式,FIFO和FAIR,默認是FIFO, 能夠經過spark.scheduler.mode來設置,schedulableBuilder也有相應的兩種FIFOSchedulableBuilder和FairSchedulableBuilder。
那backend是啥?聽說是爲了給TaskSchedulerImpl提供插件式的調度服務的。
它是怎麼實例化出來的,這裏咱們須要追溯回到SparkContext的createTaskScheduler方法,下面我直接把經常使用的3中類型的TaskScheduler給列出來了。
mode Scheduler Backend
cluster TaskSchedulerImpl SparkDeploySchedulerBackend
yarn-cluster YarnClusterScheduler CoarseGrainedSchedulerBackend
yarn-client YarnClientClusterScheduler YarnClientSchedulerBackend
好,咱們回到以前的代碼上,schedulableBuilder.addTaskSetManager比較簡單,把做業集添加到調度器的隊列當中。
咱們接着看backend的reviveOffers,裏面只有一句話driverActor ! ReviveOffers。真是頭暈,搞那麼多Actor,只是爲了接收消息。。。
照舊吧,找到它的receive方法,找到ReviveOffers這個case,發現它調用了makeOffers方法,咱們繼續追殺!
def makeOffers() { launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) }
從executorHost中隨機抽出一些來給調度器,而後調度器返回TaskDescription,executorHost怎麼來的,待會兒再說,咱們接着看resourceOffers方法。
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized { SparkEnv.set(sc.env) // 遍歷worker提供的資源,更新executor相關的映射 for (o <- offers) { executorIdToHost(o.executorId) = o.host if (!executorsByHost.contains(o.host)) { executorsByHost(o.host) = new HashSet[String]() executorAdded(o.executorId, o.host) } } // 從worker當中隨機選出一些來,防止任務都堆在一個機器上 val shuffledOffers = Random.shuffle(offers) // worker的task列表 val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores)) val availableCpus = shuffledOffers.map(o => o.cores).toArray val sortedTaskSets = rootPool.getSortedTaskSetQueue // 隨機遍歷抽出來的worker,經過TaskSetManager的resourceOffer,把本地性最高的Task分給Worker var launchedTask = false for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) { do { launchedTask = false for (i <- 0 until shuffledOffers.size) { val execId = shuffledOffers(i).executorId val host = shuffledOffers(i).host if (availableCpus(i) >= CPUS_PER_TASK) { // 把本地性最高的Task分給Worker for (task <- taskSet.resourceOffer(execId, host, maxLocality)) { tasks(i) += task val tid = task.taskId taskIdToTaskSetId(tid) = taskSet.taskSet.id taskIdToExecutorId(tid) = execId activeExecutorIds += execId executorsByHost(host) += execId availableCpus(i) -= CPUS_PER_TASK assert (availableCpus(i) >= 0) launchedTask = true } } } } while (launchedTask) } if (tasks.size > 0) { hasLaunchedTask = true } return tasks }
resourceOffers主要作了3件事:
一、從Workers裏面隨機抽出一些來執行任務。
二、經過TaskSetManager找出和Worker在一塊兒的Task,最後編譯打包成TaskDescription返回。
三、將Worker-->Array[TaskDescription]的映射關係返回。
咱們繼續看TaskSetManager的resourceOffer,看看它是怎麼找到和host再起的Task,而且包裝成TaskDescription。
經過查看代碼,我發現以前我解釋的和它具體實現的差異比較大,它所謂的本地性是根據當前的等待時間來肯定的任務本地性的級別。
它的本地性主要是包括四類:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = { while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) && currentLocalityIndex < myLocalityLevels.length - 1) { // 成立條件是當前時間-上次發佈任務的時間 > 當前本地性級別的,條件成立就跳到下一個級別 lastLaunchTime += localityWaits(currentLocalityIndex) currentLocalityIndex += 1 } myLocalityLevels(currentLocalityIndex) }
等待時間是能夠經過參數去設置的,具體的本身查下面的代碼。
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = { val defaultWait = conf.get("spark.locality.wait", "3000") level match { case TaskLocality.PROCESS_LOCAL => conf.get("spark.locality.wait.process", defaultWait).toLong case TaskLocality.NODE_LOCAL => conf.get("spark.locality.wait.node", defaultWait).toLong case TaskLocality.RACK_LOCAL => conf.get("spark.locality.wait.rack", defaultWait).toLong case TaskLocality.ANY => 0L } }
下面繼續看TaskSetManager的resourceOffer的方法,經過findTask來從Task集合裏面找到相應的Task。
findTask(execId, host, allowedLocality) match { case Some((index, taskLocality)) => {
val task = tasks(index) val serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser) val timeTaken = clock.getTime() - startTime addRunningTask(taskId) val taskName = "task %s:%d".format(taskSet.id, index) sched.dagScheduler.taskStarted(task, info) return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask)) }
它的findTask方法以下:
private def findTask(execId: String, host: String, locality: TaskLocality.Value) : Option[(Int, TaskLocality.Value)] = { // 同一個Executor,經過execId來查找相應的等待的task for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) { return Some((index, TaskLocality.PROCESS_LOCAL)) } // 經過主機名找到相應的Task,不過比以前的多了一步判斷 if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) { for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) { return Some((index, TaskLocality.NODE_LOCAL)) } } // 經過Rack的名稱查找Task if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) { for { rack <- sched.getRackForHost(host) index <- findTaskFromList(execId, getPendingTasksForRack(rack)) } { return Some((index, TaskLocality.RACK_LOCAL)) } } // 查找那些preferredLocations爲空的,不指定在哪裏執行的Task來執行 for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) { return Some((index, TaskLocality.PROCESS_LOCAL)) } // 查找那些preferredLocations爲空的,不指定在哪裏執行的Task來執行 if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) { for (index <- findTaskFromList(execId, allPendingTasks)) { return Some((index, TaskLocality.ANY)) } } // 最後沒辦法了,拖的時間太長了,只能啓動推測執行了 findSpeculativeTask(execId, host, locality) }
從這個方面能夠看得出來,Spark對運行時間仍是很注重的,等待的時間越長,它就可能越飢不擇食,從PROCESS_LOCAL一直讓步到ANY,最後的最後,推測執行都用到了。
找到任務以後,它就調用dagScheduler.taskStarted方法,通知dagScheduler任務開始了,taskStarted方法就不詳細講了,它觸發dagScheduler的BeginEvent事件,裏面只作了2件事:
一、檢查Task序列化的大小,超過100K就警告。
二、提交等待的Stage。
好,咱們繼續回到發佈Task上面來,中間過程講完了,咱們應該是要回到CoarseGrainedSchedulerBackend的launchTasks方法了。
def makeOffers() { launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))})) }
它的方法體是:
def launchTasks(tasks: Seq[Seq[TaskDescription]]) { for (task <- tasks.flatten) { freeCores(task.executorId) -= scheduler.CPUS_PER_TASK executorActor(task.executorId) ! LaunchTask(task) } }
經過executorId找到相應的executorActor,而後發送LaunchTask過去,一個Task佔用一個Cpu。
那這個executorActor是怎麼來的呢?找唄,最後發現它是在receive方法裏面接受到RegisterExecutor消息的時候註冊的。經過搜索,咱們找到CoarseGrainedExecutorBackend這個類,在它的preStart方法裏面赫然找到了driver ! RegisterExecutor(executorId, hostPort, cores) 帶的這三個參數都是在初始化的時候傳入的,那是誰實例化的它呢,再逆向搜索找到SparkDeploySchedulerBackend!以前的backend一直都是它,咱們看reviveOffers是在它的父類CoarseGrainedSchedulerBackend裏面。
關係清楚了,在這個backend的start方法裏面啓動了一個AppClient,AppClient的其中一個參數ApplicationDescription就是封裝的運行CoarseGrainedExecutorBackend的命令。AppClient內部啓動了一個ClientActor,這個ClientActor啓動以後,會嘗試向Master發送一個指令actor ! RegisterApplication(appDescription) 註冊一個Application。
別廢話了,Ctrl +Shift + N吧,定位到Master吧。
case RegisterApplication(description) => { val app = createApplication(description, sender) registerApplication(app) persistenceEngine.addApplication(app) sender ! RegisteredApplication(app.id, masterUrl) schedule() }
它作了5件事:
一、createApplication爲這個app構建一個描述App數據結構的ApplicationInfo。
二、註冊該Application,更新相應的映射關係,添加到等待隊列裏面。
三、用persistenceEngine持久化Application信息,默認是不保存的,另外還有兩種方式,保存在文件或者Zookeeper當中。
四、經過發送方註冊成功。
五、開始做業調度。
關於調度的問題,在第一章《spark-submit提交做業過程》已經介紹過了,建議回去再看看,搞清楚Application和Executor之間的關係。
Application一旦得到資源,Master會發送launchExecutor指令給Worker去啓動Executor。
進到Worker裏面搜索LaunchExecutor。
val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, manager.state, None, None) }
原來ExecutorRunner還不是傳說中的Executor,它內部是執行了appDesc內部的那個命令,啓動了CoarseGrainedExecutorBackend,它纔是咱們的真命天子Executor。
啓動以後ExecutorRunner報告ExecutorStateChanged事件給Master。
Master幹了兩件事:
一、轉發給Driver,這個Driver是以前註冊Application的那個AppClient
二、若是是Executor運行結束,從相應的映射關係裏面刪除
上面又花了那麼多時間講Task的運行環境ExecutorRunner是怎麼註冊,那咱們仍是回到咱們的主題,Task的發佈。
發佈任務是發送LaunchTask指令給CoarseGrainedExecutorBackend,接受到指令以後,讓它內部的executor來發布這個任務。
這裏咱們看一下Executor的launchTask。
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) { val tr = new TaskRunner(context, taskId, serializedTask) runningTasks.put(taskId, tr) threadPool.execute(tr) }
TaskRunner是這裏的重頭戲啊!看它的run方法吧。
override def run() { // 準備工做若干...那天咱們放學回家通過一片玉米地,以上省略一百字 try { // 反序列化Task SparkEnv.set(env) Accumulators.clear() val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask) updateDependencies(taskFiles, taskJars) task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader) // 命令爲嘗試運行,和hadoop的mapreduce做業是一致的 attemptedTask = Some(task) logDebug("Task " + taskId + "'s epoch is " + task.epoch) env.mapOutputTracker.updateEpoch(task.epoch) // 運行Task, 具體能夠去看以前讓你們關注的ResultTask和ShuffleMapTask taskStart = System.currentTimeMillis() val value = task.run(taskId.toInt) val taskFinish = System.currentTimeMillis() // 對結果進行序列化 val resultSer = SparkEnv.get.serializer.newInstance() val beforeSerialization = System.currentTimeMillis() val valueBytes = resultSer.serialize(value) val afterSerialization = System.currentTimeMillis() // 更新任務的相關監控信息,會反映到監控頁面上的 for (m <- task.metrics) { m.hostname = Utils.localHostName() m.executorDeserializeTime = taskStart - startTime m.executorRunTime = taskFinish - taskStart m.jvmGCTime = gcTime - startGCTime m.resultSerializationTime = afterSerialization - beforeSerialization } val accumUpdates = Accumulators.values // 對結果進行再包裝,包裝完再進行序列化 val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null)) val serializedDirectResult = ser.serialize(directResult) // 若是中間結果的大小超過了spark.akka.frameSize(默認是10M)的大小,就要提高序列化級別了,超過內存的部分要保存到硬盤的 val serializedResult = { if (serializedDirectResult.limit >= akkaFrameSize - 1024) { val blockId = TaskResultBlockId(taskId) env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER) ser.serialize(new IndirectTaskResult[Any](blockId)) } else { serializedDirectResult } } // 返回結果 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult) } catch { // 這部分是錯誤處理,被我省略掉了,主要內容是通關相關負責人處理後事 } finally { // 清理爲ResultTask註冊的shuffle內存,最後把task從正在運行的列表當中刪除 val shuffleMemoryMap = env.shuffleMemoryMap shuffleMemoryMap.synchronized { shuffleMemoryMap.remove(Thread.currentThread().getId) } runningTasks.remove(taskId) } } }
以上代碼被我這些了,可是建議你們看看註釋吧。
最後結果是經過statusUpdate返回的。
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { driver ! StatusUpdate(executorId, taskId, state, data) }
這回這個Driver又不是剛纔那個AppClient,而是它的家長SparkDeploySchedulerBackend,是在SparkDeploySchedulerBackend的父類CoarseGrainedSchedulerBackend接受了這個StatusUpdate消息。
這關係真他娘夠亂的。。
繼續,Task裏面走的是TaskSchedulerImpl這個方法。
scheduler.statusUpdate(taskId, state, data.value)
到這裏,一個Task就運行結束了,後面就再也不擴展了,做業運行這塊是Spark的核心,再擴展基本就能寫出來一本書了,限於文章篇幅,這裏就再也不深究了。
以上的過程應該是和下面的圖一致的。
看完這篇文章,估計你們會雲裏霧裏的,在下一章《做業生命週期》會把剛纔描述的整個過程從新梳理出來,便於你們記憶,敬請期待!
岑玉海
轉載請註明出處,謝謝!