上一章講了 RDD 的轉換,可是沒講做業的運行,它和 Driver Program 的關係是啥,和 RDD 的關係是啥?html
官方給的例子裏面,一執行 collect 方法就能出結果,那咱們就從 collect 開始看吧,進入 RDD,找到 collect 方法。node
def collect(): Array[T] = {
val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
Array.concat(results: _*)
}
複製代碼
它進行了兩個操做:apache
一、調用 SparkContext 的 runJob 方法,把自身的引用傳入去,再傳了一個匿名函數(把 Iterator 轉換成 Array 數組)數組
二、把 result 結果合併成一個 Array,注意 results 是一個 Array[Array[T]] 類型,因此第二句的那個寫法纔會那麼奇怪。這個操做是很重的一個操做,若是結果很大的話,這個操做是會報 OOM 的,由於它是把結果保存在 Driver 程序的內存當中的 result 數組裏面。bash
咱們點進去 runJob 這個方法吧。數據結構
val callSite = getCallSite
val cleanedFunc = clean(func)
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal, resultHandler, localProperties.get)
rdd.doCheckpoint()
複製代碼
追蹤下去,咱們會發現通過多個不一樣的 runJob 同名函數調用以後,執行 job 做業靠的是 dagScheduler,最後把結果經過 resultHandler 保存返回。app
好的,咱們繼續看 DAGScheduler 的 runJob 方法,提交做業,而後等待結果,成功什麼都不作,失敗拋出錯誤,咱們接着看 submitJob 方法。dom
val jobId = nextJobId.getAndIncrement()
val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
// 記錄做業成功與失敗的數據結構,一個做業的Task數量是和分片的數量一致的,Task成功以後調用resultHandler保存結果。
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
eventProcessActor ! JobSubmitted(jobId, rdd, func2, partitions.toArray, allowLocal, callSite, waiter, properties)
複製代碼
走到這裏,感受有點兒繞了,爲何到了這裏,還不直接運行呢,還要給 eventProcessActor 發送一個 JobSubmitted 請求呢,new 一個線程和這個區別有多大?jvm
無論了,搜索一下 eventProcessActor 吧,結果發現它是一個 DAGSchedulerEventProcessActor,它的定義也在 DAGScheduler 這個類裏面。它的 receive 方法裏面定義了 12 種事件的處理方法,這裏咱們只須要看ide
JobSubmitted 的就行,它也是調用了自身的 handleJobSubmitted 方法。可是這裏很奇怪,沒辦法打斷點調試,可是它的結果卻是能返回的,所以咱們得用另一種方式,打開 test 工程,找到 scheduler 目錄下的 DAGSchedulerSuite 這個類,咱們本身寫一個 test 方法,首先咱們要在 import 那裏加上 import org.apache.spark.SparkContext._ ,而後加上這一段測試代碼。
test("run shuffle") {
val rdd1 = sc.parallelize(1 to 100, 4)
val rdd2 = rdd1.filter(_ % 2 == 0).map(_ + 1)
val rdd3 = rdd2.map(_ - 1).filter(_ < 50).map(i => (i, i))
val rdd4 = rdd3.reduceByKey(_ + _)
submit(rdd4, Array(0,1,2,3))
complete(taskSets(0), Seq(
(Success, makeMapStatus("hostA", 1)),
(Success, makeMapStatus("hostB", 1))))
complete(taskSets(1), Seq((Success, 42)))
complete(taskSets(2), Seq(
(Success, makeMapStatus("hostA", 2)),
(Success, makeMapStatus("hostB", 2))))
complete(taskSets(3), Seq((Success, 68)))
}
複製代碼
這個例子的重點仍是 shuffle 那塊,另外也包括了 map 的多個轉換,你們能夠按照這個例子去測試下。
咱們接着看 handleJobSubmitted 吧。
var finalStage: Stage = null
try {
finalStage = newStage(finalRDD, partitions.size, None, jobId, Some(callSite))
} catch {
// 錯誤處理,告訴監聽器做業失敗,返回....
}
if (finalStage != null) {
val job = new ActiveJob(jobId, finalStage, func, partitions, callSite, listener, properties)
clearCacheLocs()
if (allowLocal && finalStage.parents.size == 0 && partitions.length == 1) {
// 很短、沒有父stage的本地操做,好比 first() or take() 的操做本地執行.
listenerBus.post(SparkListenerJobStart(job.jobId, Array[Int](), properties))
runLocally(job)
} else {
// collect等操做走的是這個過程,更新相關的關係映射,用監聽器監聽,而後提交做業
jobIdToActiveJob(jobId) = job
activeJobs += job
resultStageToJob(finalStage) = job
listenerBus.post(SparkListenerJobStart(job.jobId, jobIdToStageIds(jobId).toArray, properties))
// 提交stage
submitStage(finalStage)
}
}
// 提交stage
submitWaitingStages()
複製代碼
從上面這個方法來看,咱們應該重點關注 newStage 方法、submitStage 方法和 submitWaitingStages 方法。
咱們先看 newStage,它獲得的結果叫作 finalStage,挺奇怪的哈,爲啥?先看吧
val id = nextStageId.getAndIncrement()
val stage = new Stage(id, rdd, numTasks, shuffleDep, getParentStages(rdd, jobId), jobId, callSite)
stageIdToStage(id) = stage
updateJobIdStageIdMaps(jobId, stage)
stageToInfos(stage) = StageInfo.fromStage(stage)
stage
複製代碼
能夠看出來 Stage 也沒有太多的東西可言,它就是把 rdd 給傳了進去,tasks 的數量,shuffleDep 是空,parentStage。
那它的 parentStage 是啥呢?
private def getParentStages(rdd: RDD[_], jobId: Int): List[Stage] = {
val parents = new HashSet[Stage]
val visited = new HashSet[RDD[_]]
def visit(r: RDD[_]) {
if (!visited(r)) {
visited += r
// 在visit函數裏面,只有存在ShuffleDependency的,parent才經過getShuffleMapStage計算出來
for (dep <- r.dependencies) {
dep match {
case shufDep: ShuffleDependency[_,_] =>
parents += getShuffleMapStage(shufDep, jobId)
case _ =>
visit(dep.rdd)
}
}
}
}
visit(rdd)
parents.toList
}
複製代碼
它是經過不停的遍歷它以前的 rdd,若是碰到有依賴是 ShuffleDependency 類型的,就經過 getShuffleMapStage 方法計算出來它的 Stage 來。
那咱們就開始看 submitStage 方法吧。
private def submitStage(stage: Stage) {
//...
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
if (missing == Nil) {
// 沒有父stage,執行這stage的tasks
submitMissingTasks(stage, jobId.get)
runningStages += stage
} else {
   // 提交父stage的task,這裏是個遞歸,真正的提交在上面的註釋的地方
for (parent <- missing) {
submitStage(parent)
}
// 暫時不能提交的stage,先添加到等待隊列
waitingStages += stage
}
}
}
複製代碼
這個提交 stage 的過程是一個遞歸的過程,它是先要把父 stage 先提交,而後把本身添加到等待隊列中,直到沒有父 stage 以後,就提交該 stage 中的任務。等待隊列在最後的 submitWaitingStages 方法中提交。
這裏我引用一下上一章當中我所畫的那個圖來表示這個過程哈。
從 getParentStages 方法能夠看出來,RDD 當中存在 ShuffleDependency 的 Stage 纔會有父 Stage, 也就是圖中的虛線的位置!
因此咱們只須要記住凡是涉及到 shuffle 的做業都會至少有兩個 Stage,即 shuffle 前和 shuffle 後。
那咱們接着看 submitMissingTasks 方法,下面是主體代碼。
private def submitMissingTasks(stage: Stage, jobId: Int) {
val myPending = pendingTasks.getOrElseUpdate(stage, new HashSet)
myPending.clear()
var tasks = ArrayBuffer[Task[_]]()
if (stage.isShuffleMap) {
// 這是shuffle stage的狀況
for (p <- 0 until stage.numPartitions if stage.outputLocs(p) == Nil) {
val locs = getPreferredLocs(stage.rdd, p)
tasks += new ShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
} else {
// 這是final stage的狀況
val job = resultStageToJob(stage)
for (id <- 0 until job.numPartitions if !job.finished(id)) {
val partition = job.partitions(id)
val locs = getPreferredLocs(stage.rdd, partition)
tasks += new ResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
if (tasks.size > 0) {
myPending ++= tasks
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.jobId, properties))
stageToInfos(stage).submissionTime = Some(System.currentTimeMillis())
} else {
runningStages -= stage
}
}
複製代碼
Task 也是有兩類的,一種是 ShuffleMapTask,一種是 ResultTask,咱們須要注意這兩種 Task 的 runTask 方法。最後 Task 是經過 taskScheduler.submitTasks 來提交的。
咱們找到 TaskSchedulerImpl 裏面看這個方法。
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasksthis.synchronized {
val manager = new TaskSetManager(this, taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
hasReceivedTask = true
}
backend.reviveOffers()
}
複製代碼
調度器有兩種模式,FIFO 和 FAIR,默認是 FIFO, 能夠經過 spark.scheduler.mode 來設置,schedulableBuilder 也有相應的兩種 FIFOSchedulableBuilder 和 FairSchedulableBuilder。
那 backend 是啥? 聽說是爲了給 TaskSchedulerImpl 提供插件式的調度服務的。
它是怎麼實例化出來的,這裏咱們須要追溯回到 SparkContext 的 createTaskScheduler 方法,下面我直接把經常使用的 3 中類型的 TaskScheduler 給列出來了。
mode | Scheduler | Backend |
---|---|---|
cluster | TaskSchedulerImpl | SparkDeploySchedulerBackend |
yarn-cluster | YarnClusterScheduler | CoarseGrainedSchedulerBackend |
yarn-client | YarnClientClusterScheduler | YarnClientSchedulerBackend |
好,咱們回到以前的代碼上,schedulableBuilder.addTaskSetManager 比較簡單,把做業集添加到調度器的隊列當中。
咱們接着看 backend 的 reviveOffers,裏面只有一句話 driverActor ! ReviveOffers。真是頭暈,搞那麼多 Actor,只是爲了接收消息。
照舊吧,找到它的 receive 方法,找到 ReviveOffers 這個 case,發現它調用了 makeOffers 方法,咱們繼續追殺!
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
複製代碼
從 executorHost 中隨機抽出一些來給調度器,而後調度器返回 TaskDescription,executorHost 怎麼來的,待會兒再說,咱們接着看 resourceOffers 方法。
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
SparkEnv.set(sc.env)
// 遍歷worker提供的資源,更新executor相關的映射
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
}
}
// 從worker當中隨機選出一些來,防止任務都堆在一個機器上
val shuffledOffers = Random.shuffle(offers)
// worker的task列表
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
// 隨機遍歷抽出來的worker,經過TaskSetManager的resourceOffer,把本地性最高的Task分給Worker
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- TaskLocality.values) {
do {
launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
// 把本地性最高的Task分給Worker
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetId(tid) = taskSet.taskSet.id
taskIdToExecutorId(tid) = execId
activeExecutorIds += execId
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert (availableCpus(i) >= 0)
launchedTask = true
}
}
}
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
複製代碼
resourceOffers 主要作了 3 件事:
咱們繼續看 TaskSetManager 的 resourceOffer,看看它是怎麼找到和 host 再起的 Task,而且包裝成 TaskDescription。
經過查看代碼,我發現以前我解釋的和它具體實現的差異比較大,它所謂的本地性是根據當前的等待時間來肯定的任務本地性的級別。
它的本地性主要是包括四類:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。
private def getAllowedLocalityLevel(curTime: Long): TaskLocality.TaskLocality = {
while (curTime - lastLaunchTime >= localityWaits(currentLocalityIndex) &&
currentLocalityIndex < myLocalityLevels.length - 1)
{
// 成立條件是當前時間-上次發佈任務的時間 > 當前本地性級別的,條件成立就跳到下一個級別
lastLaunchTime += localityWaits(currentLocalityIndex)
currentLocalityIndex += 1
}
myLocalityLevels(currentLocalityIndex)
}
複製代碼
等待時間是能夠經過參數去設置的,具體的本身查下面的代碼。
private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
val defaultWait = conf.get("spark.locality.wait", "3000")
level match {
case TaskLocality.PROCESS_LOCAL =>
conf.get("spark.locality.wait.process", defaultWait).toLong
case TaskLocality.NODE_LOCAL =>
conf.get("spark.locality.wait.node", defaultWait).toLong
case TaskLocality.RACK_LOCAL =>
conf.get("spark.locality.wait.rack", defaultWait).toLong
case TaskLocality.ANY =>
0L
}
}
複製代碼
下面繼續看 TaskSetManager 的 resourceOffer 的方法,經過 findTask 來從 Task 集合裏面找到相應的 Task。
findTask(execId, host, allowedLocality) match {
case Some((index, taskLocality)) => {
val task = tasks(index)
val serializedTask = Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
  val timeTaken = clock.getTime() - startTime
  addRunningTask(taskId)
  val taskName = "task %s:%d".format(taskSet.id, index)
  sched.dagScheduler.taskStarted(task, info)
  return Some(new TaskDescription(taskId, execId, taskName, index, serializedTask))
}
複製代碼
它的 findTask 方法以下:
private def findTask(execId: String, host: String, locality: TaskLocality.Value)
: Option[(Int, TaskLocality.Value)] =
{
   // 同一個Executor,經過execId來查找相應的等待的task
for (index <- findTaskFromList(execId, getPendingTasksForExecutor(execId))) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
   // 經過主機名找到相應的Task,不過比以前的多了一步判斷
if (TaskLocality.isAllowed(locality, TaskLocality.NODE_LOCAL)) {
for (index <- findTaskFromList(execId, getPendingTasksForHost(host))) {
return Some((index, TaskLocality.NODE_LOCAL))
}
}
  // 經過Rack的名稱查找Task
if (TaskLocality.isAllowed(locality, TaskLocality.RACK_LOCAL)) {
for {
rack <- sched.getRackForHost(host)
index <- findTaskFromList(execId, getPendingTasksForRack(rack))
} {
return Some((index, TaskLocality.RACK_LOCAL))
}
}
   // 查找那些preferredLocations爲空的,不指定在哪裏執行的Task來執行
for (index <- findTaskFromList(execId, pendingTasksWithNoPrefs)) {
return Some((index, TaskLocality.PROCESS_LOCAL))
}
  // 查找那些preferredLocations爲空的,不指定在哪裏執行的Task來執行
if (TaskLocality.isAllowed(locality, TaskLocality.ANY)) {
for (index <- findTaskFromList(execId, allPendingTasks)) {
return Some((index, TaskLocality.ANY))
}
}
// 最後沒辦法了,拖的時間太長了,只能啓動推測執行了
findSpeculativeTask(execId, host, locality)
}
複製代碼
從這個方面能夠看得出來,Spark 對運行時間仍是很注重的,等待的時間越長,它就可能越飢不擇食,從 PROCESS_LOCAL 一直讓步到 ANY,最後的最後,推測執行都用到了。
找到任務以後,它就調用 dagScheduler.taskStarted 方法,通知 dagScheduler 任務開始了,taskStarted 方法就不詳細講了,它觸發 dagScheduler 的 BeginEvent 事件,裏面只作了 2 件事:
一、檢查 Task 序列化的大小,超過 100K 就警告。
二、提交等待的 Stage。
好,咱們繼續回到發佈 Task 上面來,中間過程講完了,咱們應該是要回到 CoarseGrainedSchedulerBackend 的launchTasks 方法了。
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorHost.toArray.map {case (id, host) => new WorkerOffer(id, host, freeCores(id))}))
}
複製代碼
它的方法體是:
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
freeCores(task.executorId) -= scheduler.CPUS_PER_TASK
executorActor(task.executorId) ! LaunchTask(task)
}
}
複製代碼
經過 executorId 找到相應的 executorActor,而後發送 LaunchTask 過去,一個 Task 佔用一個 Cpu。
那這個 executorActor 是怎麼來的呢?找唄,最後發現它是在 receive 方法裏面接受到 RegisterExecutor 消息的時候註冊的。經過搜索,咱們找到 CoarseGrainedExecutorBackend 這個類,在它的 preStart 方法裏面赫然找到了 driver ! RegisterExecutor(executorId, hostPort, cores) 帶的這三個參數都是在初始化的時候傳入的,那是誰實例化的它呢,再逆向搜索找到 SparkDeploySchedulerBackend!以前的 backend 一直都是它,咱們看 reviveOffers 是在它的父類 CoarseGrainedSchedulerBackend 裏面。
關係清楚了,在這個 backend 的 start 方法裏面啓動了一個 AppClient,AppClient 的其中一個參數 ApplicationDescription 就是封裝的運行 CoarseGrainedExecutorBackend 的命令。AppClient 內部啓動了一個 ClientActor,這個 ClientActor 啓動以後,會嘗試向 Master 發送一個指令 actor ! RegisterApplication(appDescription) 註冊一個 Application。
別廢話了,Ctrl +Shift + N 吧,定位到 Master 吧。
case RegisterApplication(description) => {
val app = createApplication(description, sender)
registerApplication(app)
persistenceEngine.addApplication(app)
sender ! RegisteredApplication(app.id, masterUrl)
schedule()
}
複製代碼
它作了 5 件事:
一、createApplication 爲這個 app 構建一個描述 App 數據結構的 ApplicationInfo。
二、註冊該 Application,更新相應的映射關係,添加到等待隊列裏面。
三、用 persistenceEngine 持久化 Application 信息,默認是不保存的,另外還有兩種方式,保存在文件或者 Zookeeper 當中。
四、經過發送方註冊成功。
五、開始做業調度。
關於調度的問題,在第一章《spark-submit 提交做業過程》已經介紹過了,建議回去再看看,搞清楚 Application 和 Executor 之間的關係。
Application 一旦得到資源,Master 會發送 launchExecutor 指令給 Worker 去啓動 Executor。
進到 Worker 裏面搜索 LaunchExecutor。
  val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host,
appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, ExecutorState.RUNNING)
 executors(appId + "/" + execId) = manager
  manager.start()
coresUsed += cores_
memoryUsed += memory_
masterLock.synchronized {
master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
}
複製代碼
原來 ExecutorRunner 還不是傳說中的 Executor,它內部是執行了 appDesc 內部的那個命令,啓動了 CoarseGrainedExecutorBackend,它纔是咱們的真命天子 Executor。
啓動以後 ExecutorRunner 報告 ExecutorStateChanged 事件給 Master。
Master 幹了兩件事:
一、轉發給 Driver,這個 Driver 是以前註冊 Application 的那個 AppClient
二、若是是 Executor 運行結束,從相應的映射關係裏面刪除
上面又花了那麼多時間講 Task 的運行環境 ExecutorRunner 是怎麼註冊,那咱們仍是回到咱們的主題,Task 的發佈。
發佈任務是發送 LaunchTask 指令給 CoarseGrainedExecutorBackend,接受到指令以後,讓它內部的 executor 來發布這個任務。
這裏咱們看一下 Executor 的 launchTask。
def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {
val tr = new TaskRunner(context, taskId, serializedTask)
runningTasks.put(taskId, tr)
threadPool.execute(tr)
}
複製代碼
TaskRunner 是這裏的重頭戲啊!看它的 run 方法吧。
override def run() {
// 準備工做若干...那天咱們放學回家通過一片玉米地,以上省略一百字
try {
// 反序列化Task
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
   // 命令爲嘗試運行,和hadoop的mapreduce做業是一致的
attemptedTask = Some(task)
logDebug("Task " + taskId + "'s epoch is " + task.epoch)
env.mapOutputTracker.updateEpoch(task.epoch)
// 運行Task, 具體能夠去看以前讓你們關注的ResultTask和ShuffleMapTask
taskStart = System.currentTimeMillis()
val value = task.run(taskId.toInt)
val taskFinish = System.currentTimeMillis()
     // 對結果進行序列化
val resultSer = SparkEnv.get.serializer.newInstance()
val beforeSerialization = System.currentTimeMillis()
val valueBytes = resultSer.serialize(value)
val afterSerialization = System.currentTimeMillis()
     // 更新任務的相關監控信息,會反映到監控頁面上的
for (m <- task.metrics) {
m.hostname = Utils.localHostName()
m.executorDeserializeTime = taskStart - startTime
m.executorRunTime = taskFinish - taskStart
m.jvmGCTime = gcTime - startGCTime
m.resultSerializationTime = afterSerialization - beforeSerialization
}
val accumUpdates = Accumulators.values
     // 對結果進行再包裝,包裝完再進行序列化
val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.getOrElse(null))
val serializedDirectResult = ser.serialize(directResult)
// 若是中間結果的大小超過了spark.akka.frameSize(默認是10M)的大小,就要提高序列化級別了,超過內存的部分要保存到硬盤的
val serializedResult = {
if (serializedDirectResult.limit >= akkaFrameSize - 1024) {
val blockId = TaskResultBlockId(taskId)
env.blockManager.putBytes(blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
ser.serialize(new IndirectTaskResult[Any](blockId))
} else {
serializedDirectResult
}
}
     // 返回結果
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
} catch {
// 這部分是錯誤處理,被我省略掉了,主要內容是通關相關負責人處理後事
} finally {
// 清理爲ResultTask註冊的shuffle內存,最後把task從正在運行的列表當中刪除
val shuffleMemoryMap = env.shuffleMemoryMap
shuffleMemoryMap.synchronized {
shuffleMemoryMap.remove(Thread.currentThread().getId)
}
runningTasks.remove(taskId)
}
}
}
複製代碼
以上代碼被我這些了,可是建議你們看看註釋吧。
最後結果是經過 statusUpdate 返回的。
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
driver ! StatusUpdate(executorId, taskId, state, data)
}
複製代碼
這回這個 Driver 又不是剛纔那個 AppClient,而是它的家長 SparkDeploySchedulerBackend,是在 SparkDeploySchedulerBackend 的父類 CoarseGrainedSchedulerBackend 接受了這個 StatusUpdate 消息。
這關係真他娘夠亂的。。
繼續,Task 裏面走的是 TaskSchedulerImpl 這個方法。
scheduler.statusUpdate(taskId, state, data.value)
複製代碼
到這裏,一個 Task 就運行結束了,後面就再也不擴展了,做業運行這塊是 Spark 的核心,再擴展基本就能寫出來一本書了,限於文章篇幅,這裏就再也不深究了。
以上的過程應該是和下面的圖一致的。
看完這篇文章,估計你們會雲裏霧裏的,在下一章《做業生命週期》會把剛纔描述的整個過程從新梳理出來,便於你們記憶,敬請期待!