spark能夠運行在standalone,yarn,mesos等多種模式下,當前咱們用的最廣泛的是yarn模式,在yarn模式下又分爲client和cluster。本文接下來將分析yarn cluster下任務提交的過程。也就是回答,在yarn cluster模式下,任務是怎麼提交的問題。在yarn cluster模式下,spark任務提交涉及四個角色(client, application, driver以及executor)之間的交互。接下來,將詳細分析這四個角色在任務提交過程當中都作了那些事。apache
Step 1:咱們知道:在咱們寫完任務準備向集羣提交spark任務時,通常是調用bin下的spark-submit腳本進行任務的提交。在完成一些環境變量和參數的準備後,最終調用spark代碼庫中的SparkSubmit類。api
Step 2:在SparkSubmit的main函數中,經過submit,runMain而後經過YarnClusterApplication啓動org.apache.spark.deploy.yarn.Client.app
Step 3:在Client中,經過main,run,而後在submitApplication中,利用yarnClient向ResourceManager提交新應用以啓動ApplicationMaster,其中在yarn cluster模式下啓動ApplicationMaster的類是函數
org.apache.spark.deploy.yarn.ApplicationMaster。 至此,client完成全部的工做。
Step1:yarn分配container運行ApplicationMaster。經過main,run,runDriver,調用startUserApplication,新建線程,運行在spark-submit --class參數指定的應用類用戶代碼。ui
Step2:ApplicationMaster等待driver完成sparkContext的初始化後,獲取driver的一個ref。調用registerAM函數,利用YarnRMClient向yarn申請資源運行executor。一旦獲取到container資源,在yarnAllocator中,url
launcherPool線程池會將container,driver等相關信息封裝成ExecutorRunnable對象,經過ExecutorRunnable啓動新的container以運行executor。在次過程當中,指定啓動executor的類是
org.apache.spark.executor.CoarseGrainedExecutorBackend。
在ApplicationMaster的步驟1中,會新建線程運行用戶端代碼,而且在完成sparkcontext的初始化,其中包括dagScheduler完成job stage的切分,每一個stage的任務轉成化一系列的task,封裝成taskset。交由taskScheduler去調用。因爲這個過程比較複雜,並且很是的重要,準備稍後會單獨對這個部分進行詳細講解。spa
在ApplicationMaster的步驟2中提到,新的container將會運行executor。在executor啓動之後,會向driver發送RegisterExecutor消息告訴driver註冊當前運行的executor。在driver端的CoarseGrainedSchedulerBackend中,能夠看到對該消息的處理過程。在driver段感知到該消息後,driver將向executor發送RegisteredExecutor消息。executor和driver更多的細節,在稍後spark任務計算解析中,會將進行更詳細的描述。線程
至此,client在完成使命後退出。其餘三個部分也已啓動起來。接下來將以spark example中的sparkPi例子來看看日常咱們寫的spark任務是怎麼計算的。調試
首先把sparkPi中的代碼貼出來:對象
在前文中 ApplicationMaster流程中第一步提到:yarn分配container運行ApplicationMaster。經過main,run,runDriver,調用startUserApplication,新建線程,運行在spark-submit --class參數指定的應用類用戶代碼。也就是說,在這一步將運行用戶寫入的代碼。
1,在SparkSession...getOrCreate函數中主要作的事情是完成sparkContext的初始化,這其中主要包括DAGScheduler,TaskSchedule的初始化等。(注:在調試過程當中使用的standalone模式,而且加入extraJavaOption主要是爲了便於調試executor的代碼)。
2,上述代碼的核心是sparkContext.parallelize(....).map(....).reduce。在parallelize函數中將新建ParallelCollectionRDD。在map中將新建MapPartitionsRDD。最後reduce是一個action(一個action對應一個Job),觸發實際的計算。
3,在reduce函數中,經過調用sc.runJob->dagScheduler.runJob→submitJob提交JobSubmitted事件到DAGScheduler本身。而後調用handleJobSubmitted來處理Job提交。在handleJobSubmitted函數中,將建立ResultStage,而後根據shuffle將Job劃分爲不一樣的stage。在本例中,因爲沒有shuffle,將只有一個stage。最終經過submitMissingTasks將stage中的task封裝成taskset,交由taskschuduler(taskScheduler.submitTasks)進行task級別的調度。
4,在TaskSchdulerImpl的submitTasks中,能夠看到taskset會被進一步封裝成TasksetManager,加入到schedulableBuilder中(默認使用FIFO隊列進行調度)。而後driver向本身發送ReviveOffers消息。driver接收到該信息後,若是發現有空閒的executor,將該Task序列後,發送LaunchTask消息給executor。讓executor去執行。
5,executor處理LaunchTask消息的代碼以下:
launchTask會將task信息TaskRunner,啓用線程池運行。
6,在TaskRunner的run方法中,將運行
val res = task.run(
taskAttemptId = taskId,
attemptNumber = taskDescription.attemptNumber,
metricsSystem = env.metricsSystem)
threwException = false
而後調用runTask進行運行,有兩種類型的Task(ShuffleMapTask,ResultTask),本例中將運行ResultTask中的runTask方法,而後在該方法中,調用用戶傳入的函數代碼。
7,在TaskRunner的run方法中,在完成計算後,將調用execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult),該函數將向driver發送信息,告訴改Task已完成。
8,在driver端,若是任務正常結束,將調用taskResultGetter.enqueueSuccessfulTask。在該函數中,接着調用handleSuccessfulTask,最終DAGScheduler將向本身發送CompletionEvent事件,而後使用handleTaskCompletion來處理。若是任務正常結束,將經過
job.listener.taskSucceeded通知JobWaiter,JobWaiter完成任務結果的合併。在全部的JobWaiter中的Task都完成後,任務退出。