一、 資源分配html
經過SparkSubmit進行提交應用後,首先會建立Client將應用程序(字節碼文件.class)包裝成Driver,並將其註冊到Master。Master收到Client的註冊請求後將其加入待調度隊列waitingDrivers,並等待分配執行資源。apache
1.1 Dirver調度(分配Driver執行容器,1個)app
Master中調度程序執行時會爲Driver分配一知足其執行要求的Worker, 並通知Worker啓動將Driver。Worker接到執行Driver指令後建立DriverRunner執行Driver(應用程序mainClass,mainClass執行時其會建立Spark執行上下文環境:SparkContext。伴隨SparkContext會建立DAGScheduler和TaskScheduler分別用於Stage調度和任務調度,並會觸發RDD的Action算子提交job)。spa
1.2 APP調度(分配Executor, 多個)線程
若想Job運行就須要獲得執行資源,Dirver成功執行後,會經過SparkDeployScheduler-Backend建立AppClient(包裝App信息,包含能夠建立CoarseGrainedExecutorBackend實例Command),用於向Master彙報資源需求。Master接到AppClient的彙報後,將其加入waittingApps隊列,等待調度。htm
App調度時會爲app分配知足條件的資源-----Worker(State是Alive,其上並無該Application的executor,可用內存知足要求(spark.executor.memory指定,默認512), 核知足要求(spark.cores.max, 最大可用core數,若未指定,則爲所有資源)),而後通知Woker啓動Excutor. 及向AppClient發送ExecutorAdded消息。blog
進行調度時,調度程序會根據配製SpreadOutApps = spark.deploy.spreadOut狀況決定資源分配方式,若排序
SpreadOutApps方式:將每一個app分配到儘量多的worker中執行。遞歸
1 從列表中取下一app,根據CPU狀況找出合適的woker,按核從小到大排序隊列
2 若是worker節點存在能夠分配的core 則進行預分配處理(輪循一次分一個直至知足app需求),並在分配列表(assigned = Array[Int](numUsable))中記數。
3根據assinged列表中的預分配信息,進行分配Executor(真實分配)
4 啓動Executor並設置app.state = ApplicationState.RUNNING
非SpreadOutApps方式: 將每一個app分配到儘量少的worker中執行。
1 從可用的worker列表中取下一work. (worker <- workers if worker.coresFree > 0)
2 遍歷waitingApps 找到知足app運行條件的app,進行分配
3啓動Executor(launchExecutor(w,e))並設置app.state = ApplicationState.RUNNING
其中:launchExcutor(worker, exec) 具體內容以下:
向executor分配給worker
通知worker啓動executor
由分配過程可知, 分配的Excutor個數與CPU核心數有關。當指定完Worker節點後,會在Worker節點建立ExecutorRunner,並啓動,執行App中的Command 去建立並啓動CoarseGrainedExecutorBackend。CoarseGrainedExecutorBackend啓動後,會首先經過傳入的driverUrl這個參數向在CoarseGrainedSchedulerBackend::DriverActor(用於與Master通訊,及調度任務)發送RegisterExecutor(executorId, hostPort, cores),DriverActor會建立executorData(executor信息)加入executorDataMap供後續task使用,並回復RegisteredExecutor,此時CoarseGrainedExecutorBackend會建立一個org.apache.spark.executor.Executor。至此,Executor建立完畢。Executor是直接用於task執行, 是集羣中的直接勞動者。
至此,資源分配結束。當分配完資源後,就能夠爲依本地性爲任務分配具體的執行資源。
二、Stage劃分
當執行mainClass時,執行到RDD的action算子時,會觸發執行做業(sc.runJob),最終經過調用DAGScheduler的runJob方法根據RDD信息及action算子要作的操做建立ResultStage(FinalStage)及ActiveJob。
若ResultStage建立成功的話,根據配製信息及RDD特徵可分爲本地執行,集羣執行。
若「spark.localExecution.enable」指定容許本地運行(默認爲:false,不容許),具RDD的action算了容許本地運行allowLocal=true,且RDD只有一個partition的話能夠直接以本地線程執行job,無需劃分stage。不然要將job分紅多個Stage提交到集羣去執行(經過提交ResultStage進行)。
由於ResultStage提交時,首先會去判斷其是否存在缺失的ParentStage(也就是說是否存在未完成的父Stage)。如有,則其須要等待其父Stage執行完成,才能進行提交執行。
判斷是否存在Stage的標準是看是否存在ShuffeDependency(Stage的分界線)。提交ResultStage時會根據其finalRDD 的依賴遞歸的尋找其DAG圖中是否存在ShuffeDependency, 若存在,則建立ShuffleMapStage作爲finalStage的父Stage以此相似。但至此,只能說存在父Stage並不能說存在缺失的父Stage. 判斷缺失的標準是看其結果成功的輸出信息(status)個數與其處理的分區個數是否相同,如若相同,則說明父Stage已經執行完成, 不存在missing;不然,說明還未完成,存在missing. 由於將ShuffleMapStage劃分紅maptask時,每一個Partition對應一個maptask, 每一個task會獲得一個status輸出結果信息,並在執行結束時將輸出結果上報mapOutputTracker,並更新shuffleStage狀態(將status增長進行其outputLocs列表,並將numAvailableOutputs加1),若numAvailableOutputs 與 Stage所要處理的partitions一致,說明全部的task都已經執行完成,即Stage執行完成;不然,說明還有task未完成,即Stage未完成。
由上述分析可知,存在依賴關係的兩個Stage,若是父Stage未執行完成,子Stage不能提交,也就是不能轉變爲Taskset加入任務調度隊列。所以其前後順序是嚴格控制的。咱們知道只有存在ShuffleDependency時,纔會劃分Stage,這也就是說兩個Stage之間是要作Shuffle操做的。根據上述分析可知Shuffle時ShuffleWrite作不完,ShuffleRead不能進行.
3. Task調度
當Stage不存在缺失的ParentStage時,會將其轉換爲TaskSet並提交。轉換時依Stage類型進行轉換:將ResultStage轉換成ResultTask, ShuffleMapStage轉換成ShuffleMapTask. Task個數由Stage中finalRDD 的分區數決定。
當轉換成的TaskSet提交以後,將其經過taskScheduler包裝成TaskSetManager並添加至調度隊列中(Pool),等待調度。在包裝成TaskSetManager時,根據task的preferredLocatitions將任務分類存放在pendingTasksForExecutor, pendingTaskForHost, pendingTasksForRack, pendingTaskWithNoPrefs及allPendingTasks中, 前三個列表是是包含關係(本地性愈來愈低),範圍起來越大,例如:在pendingTasksForExecutor也在pendingTaskForHost,pendingTasksForRack中, 分類的目的是在調度時,依次由本地性高à低的查找task。
在進行Task調度時,首先根據調度策略將可調度全部taskset進行排序,而後對排好序的taskset待調度列表中的taskset,按序進行分配Executor。再分配Executor時,而後逐個爲Executor列表中可用的Executor在這次選擇的taskset中按本地性由高到低查找適配任務。此處任務調度爲延遲調度,即若本次調度時間距上一任務結束時間小於當前本地性配製時間則等待,若過了配製時間,本地性要求逐漸下降,再去查找適配的task。當選定某一task後後將其加入runningtask列表,當其執行完成時會加入success列表,下次調度時就會過濾過存在這兩個列表中的任務,避免重複調度。
當一個任務執行結束時,會將其從runningtask中移除,並加入success,並會適放其佔用的執行資源,供後序task使用, 將判斷其執行成功的task數與此taskset任務總數相等時,意爲taskset中全部任務執行結束,也就是taskset結束。此時會將taskset移除出可調度隊列。
重複上述過程直到taskset待調度列表爲空。即全部做業(job)執行完成。
3.1 spark調度策略
上文任務調度時提到,在調度任務時,首前後依據調度策略對任務按優先級進行排序。下面就調度策略就行介紹。
Spark現有的調度策略有FIFO 及 Fair兩種。採用何種調度策略由「spark.scheduler.mode」參數指定,默認爲FIFO類型。
下小節進行分析……
……………………
文章出處:http://www.cnblogs.com/barrenlake/p/4550800.html
……………………