Spark的核心是根據RDD來實現的,Spark Scheduler則爲Spark核心實現的重要一環,其做用就是任務調度。Spark的任務調度就是如何組織任務去處理RDD中每一個分區的數據,根據RDD的依賴關係構建DAG,基於DAG劃分Stage,將每一個Stage中的任務發到指定節點運行。基於Spark的任務調度原理,咱們能夠合理規劃資源利用,作到儘量用最少的資源高效地完成任務計算。html
Spark能夠部署在多種資源管理平臺,例如Yarn、Mesos等,Spark自己也實現了一個簡易的資源管理機制,稱之爲Standalone模式。因爲工做中接觸較多的是Saprk on Yarn,不作特別說明,如下所述均表示Spark-on-Yarn。Spark部署在Yarn上有兩種運行模式,分別爲yarn-client和yarn-cluster模式,它們的區別僅僅在於Spark Driver是運行在Client端仍是ApplicationMater端。以下圖所示爲Spark部署在Yarn上,以yarn-cluster模式運行的分佈式計算框架。node
其中藍色部分是Spark裏的概念,包括Client、ApplicationMaster、Driver和Executor,其中Client和ApplicationMaster主要是負責與Yarn進行交互;Driver做爲Spark應用程序的總控,負責分發任務以及監控任務運行狀態;Executor負責執行任務,並上報狀態信息給Driver,從邏輯上來看Executor是進程,運行在其中的任務是線程,因此說Spark的任務是線程級別的。經過下面的時序圖能夠更清晰地理解一個Spark應用程序從提交到運行的完整流程。算法
提交一個Spark應用程序,首先經過Client向ResourceManager請求啓動一個Application,同時檢查是否有足夠的資源知足Application的需求,若是資源條件知足,則準備ApplicationMaster的啓動上下文,交給ResourceManager,並循環監控Application狀態。apache
當提交的資源隊列中有資源時,ResourceManager會在某個NodeManager上啓動ApplicationMaster進程,ApplicationMaster會單獨啓動Driver後臺線程,當Driver啓動後,ApplicationMaster會經過本地的RPC鏈接Driver,並開始向ResourceManager申請Container資源運行Executor進程(一個Executor對應與一個Container),當ResourceManager返回Container資源,則在對應的Container上啓動Executor。緩存
Driver線程主要是初始化SparkContext對象,準備運行所需的上下文,而後一方面保持與ApplicationMaster的RPC鏈接,經過ApplicationMaster申請資源,另外一方面根據用戶業務邏輯開始調度任務,將任務下發到已有的空閒Executor上。併發
當ResourceManager向ApplicationMaster返回Container資源時,ApplicationMaster就嘗試在對應的Container上啓動Executor進程,Executor進程起來後,會向Driver註冊,註冊成功後保持與Driver的心跳,同時等待Driver分發任務,當分發的任務執行完畢後,將任務狀態上報給Driver。框架
Driver把資源申請的邏輯給抽象出來,以適配不一樣的資源管理系統,因此才間接地經過ApplicationMaster去和Yarn打交道。分佈式
從上述時序圖可知,Client只管提交Application並監控Application的狀態。對於Spark的任務調度主要是集中在兩個方面: 資源申請和任務分發,其主要是經過ApplicationMaster、Driver以及Executor之間來完成,下面詳細剖析Spark任務調度每一個細節。ide
當Driver起來後,Driver則會根據用戶程序邏輯準備任務,並根據Executor資源狀況逐步分發任務。在詳細闡述任務調度前,首先說明下Spark裏的幾個概念。一個Spark應用程序包括Job、Stage以及Task三個概念:oop
Spark的任務調度整體來講分兩路進行,一路是Stage級的調度,一路是Task級的調度,整體調度流程以下圖所示。
Spark RDD經過其Transactions操做,造成了RDD血緣關係圖,即DAG,最後經過Action的調用,觸發Job並調度執行。
DAGScheduler負責Stage級的調度,主要是將DAG切分紅若干Stages,並將每一個Stage打包成TaskSet交給TaskScheduler調度。
TaskScheduler(trait)負責爲建立它的SparkContext進行Task級的調度,將DAGScheduler給過來的TaskSet按照指定的調度策略分發到Executor上執行,併爲執行特別慢的任務啓動備份任務,當前TaskScheduler的惟一實現爲org.apache.spark.scheduler.TaskSchedulerImpl,TaskSchedulerImpl會在如下幾種場景下調用org.apache.spark.scheduler.SchedulerBackend#receiveOffers:
調度過程當中SchedulerBackend(trait)負責爲Task分配可用計算資源(即Executor),並在分配的Executor上啓動Task,完成計算的調度過程,使用receiveOffers完成上述調度過程。SchedulerBackend有多種實現,分別對接不一樣的資源管理系統(YARN、Standalone、Mesos),但都是基於SchedulerBackend的一個實現org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend加入了自身特有的邏輯。每一個SchedulerBackend都會對應一個惟一的TaskScheduler,而它們都會被SparkContext建立和持有。
有了上述感性的認識後,下面這張圖描述了Spark-On-Yarn模式下在任務調度期間,ApplicationMaster、Driver以及Executor內部模塊的交互過程。
Driver初始化SparkContext過程當中,會分別初始化DAGScheduler、TaskScheduler、SchedulerBackend以及HeartbeatReceiver,並啓動SchedulerBackend以及HeartbeatReceiver。SchedulerBackend經過ApplicationMaster申請資源,並不斷從TaskScheduler中拿到合適的Task分發到Executor執行。HeartbeatReceiver負責接收Executor的心跳信息,監控Executor的存活情況,並通知到TaskScheduler。下面着重剖析DAGScheduler負責的Stage調度以及TaskScheduler負責的Task調度。
Spark的任務調度是從DAG切割開始,主要是由DAGScheduler來完成。當遇到一個Action操做後就會觸發一個Job的計算,並交給DAGScheduler來提交,下圖是涉及到Job提交的相關方法調用流程圖。
Job由最終的RDD和Action方法封裝而成,SparkContext將Job交給DAGScheduler提交,它會根據RDD的血緣關係構成的DAG進行切分,將一個Job劃分爲若干Stages,具體劃分策略是,由最終的RDD不斷經過依賴回溯判斷父依賴是不是寬依賴,即以Shuffle爲界,劃分Stage,窄依賴的RDD之間被劃分到同一個Stage中,能夠進行pipeline式的計算,以下圖紫色流程部分。劃分的Stages分兩類,一類叫作ResultStage,爲DAG最下游的Stage,由Action方法決定,另外一類叫作ShuffleMapStage,爲下游Stage準備數據,下面看一個簡單的例子WordCount。
Job由saveAsTextFile
觸發,該Job由RDD-3和saveAsTextFile
方法組成,根據RDD之間的依賴關係從RDD-3開始回溯搜索,直到沒有依賴的RDD-0,在回溯搜索過程當中,RDD-3依賴RDD-2,而且是寬依賴,因此在RDD-2和RDD-3之間劃分Stage,RDD-3被劃到最後一個Stage,即ResultStage中,RDD-2依賴RDD-1,RDD-1依賴RDD-0,這些依賴都是窄依賴,因此將RDD-0、RDD-1和RDD-2劃分到同一個Stage,即ShuffleMapStage中,實際執行的時候,數據記錄會一鼓作氣地執行RDD-0到RDD-2的轉化。不難看出,其本質上是一個深度優先搜索算法。
一個Stage是否被提交,須要判斷它的父Stage是否執行,只有在父Stage執行完畢才能提交當前Stage,若是一個Stage沒有父Stage,那麼從該Stage開始提交。Stage提交時會將Task信息(分區信息以及方法等)序列化並被打包成TaskSet交給TaskScheduler,一個Partition對應一個Task,另外一方面監控Stage的運行狀態,只有Executor丟失或者Task因爲Fetch失敗才須要從新提交失敗的Stage以調度運行失敗的任務,其餘類型的Task失敗會在TaskScheduler的調度過程當中重試。
相對來講DAGScheduler作的事情較爲簡單,僅僅是在Stage層面上劃分DAG,提交Stage並監控相關狀態信息。TaskScheduler則相對較爲複雜,下面詳細闡述其細節。
Spark Task的調度是由TaskScheduler來完成,由前文可知,DAGScheduler將Stage打包到TaskSet交給TaskScheduler,TaskScheduler會將其封裝爲TaskSetManager加入到調度隊列中,TaskSetManager負責監控管理同一個Stage中的Tasks,TaskScheduler就是以TaskSetManager爲單元來調度任務。前面也提到,TaskScheduler初始化後會啓動SchedulerBackend,它負責跟外界打交道,接收Executor的註冊信息,並維護Executor的狀態,因此說SchedulerBackend是管「糧食」的,同時它在啓動後會按期地去「詢問」TaskScheduler有沒有任務要運行,也就是說,它會按期地「問」TaskScheduler「我有這麼多餘量,你要不要啊」,TaskScheduler在SchedulerBackend「問」它的時候,會從調度隊列中按照指定的調度策略選擇TaskSetManager去調度運行,大體方法調用流程以下圖所示。
前面講到,TaskScheduler會先把DAGScheduler給過來的TaskSet封裝成TaskSetManager扔到任務隊列裏,而後再從任務隊列裏按照必定的規則把它們取出來在SchedulerBackend給過來的Executor上運行。這個調度過程實際上仍是比較粗粒度的,是面向TaskSetManager的。
TaskScheduler是以樹的方式來管理任務隊列,樹中的節點類型爲Schedulable,葉子節點爲TaskSetManager,非葉子節點爲Pool,下圖是它們之間的繼承關係。
TaskScheduler支持兩種調度策略,一種是FIFO,也是默認的調度策略,另外一種是FAIR。在TaskScheduler初始化過程當中會實例化rootPool,表示樹的根節點,是Pool類型。若是是採用FIFO調度策略,則直接簡單地將TaskSetManager按照先來先到的方式入隊,出隊時直接拿出最早進隊的TaskSetManager,其樹結構大體以下圖所示,TaskSetManager保存在一個FIFO隊列中。
在闡述FAIR調度策略前,先貼一段使用FAIR調度策略的應用程序代碼,後面針對該代碼邏輯來詳細闡述FAIR調度的實現細節。
object MultiJobTest { // spark.scheduler.mode=FAIR def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() val rdd = spark.sparkContext.textFile(...) .map(_.split("\\s+")) .map(x => (x(0), x(1))) val jobExecutor = Executors.newFixedThreadPool(2) jobExecutor.execute(new Runnable { override def run(): Unit = { spark.sparkContext.setLocalProperty("spark.scheduler.pool", "count-pool") val cnt = rdd.groupByKey().count() println(s"Count: $cnt") } }) jobExecutor.execute(new Runnable { override def run(): Unit = { spark.sparkContext.setLocalProperty("spark.scheduler.pool", "take-pool") val data = rdd.sortByKey().take(10) println(s"Data Samples: ") data.foreach { x => println(x.mkString(", ")) } } }) jobExecutor.shutdown() while (!jobExecutor.isTerminated) {} println("Done!") } }
上述應用程序中使用兩個線程分別調用了Action方法,即有兩個Job會併發提交,可是無論怎樣,這兩個Job被切分紅若干TaskSet後終究會被交到TaskScheduler這裏統一管理,其調度樹大體以下圖所示。
在出隊時,則會對全部TaskSetManager排序,具體排序過程是從根節點rootPool
開始,遞歸地去排序子節點,最後合併到一個ArrayBuffer
裏,代碼邏輯以下。
def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = { var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager] val sortedSchedulableQueue = schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator) for (schedulable <- sortedSchedulableQueue) { sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue } sortedTaskSetQueue }
使用FAIR調度策略時,上面代碼中的taskSetSchedulingAlgorithm
的類型爲FairSchedulingAlgorithm
,排序過程的比較是基於Fair-share來比較的,每一個要排序的對象包含三個屬性: runningTasks
值(正在運行的Task數)、minShare
值、weight
值,比較時會綜合考量runningTasks
值,minShare
以及weight
值。若是A對象的runningTasks
大於它的minShare
,B對象的runningTasks
小於它的minShare
,那麼B排在A前面;若是A、B對象的runningTasks
都小於它們的minShare
,那麼就比較runningTasks
與minShare
的比值,誰小誰排前面;若是A、B對象的runningTasks
都大於它們的minShare
,那麼就比較runningTasks
與weight
的比值,誰小誰排前面。總體上來講就是經過minShare
和weight
這兩個參數控制比較過程,能夠作到不讓資源被某些長時間Task給一直佔了。
從調度隊列中拿到TaskSetManager後,那麼接下來的工做就是TaskSetManager按照必定的規則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。前面也提到,TaskSetManager封裝了一個Stage的全部Task,並負責管理調度這些Task。
從調度隊列中拿到TaskSetManager後,那麼接下來的工做就是TaskSetManager按照必定的規則一個個取出Task給TaskScheduler,TaskScheduler再交給SchedulerBackend去發到Executor上執行。前面也提到,TaskSetManager封裝了一個Stage的全部Task,並負責管理調度這些Task。
在TaskSetManager初始化過程當中,會對Tasks按照Locality級別進行分類,Task的Locality有五種,優先級由高到低順序:PROCESS_LOCAL(指定的Executor),NODE_LOCAL(指定的主機節點),NO_PREF(無所謂),RACK_LOCAL(指定的機架),ANY(知足不了Task的Locality就隨便調度)。這五種Locality級別存在包含關係,RACK_LOCAL包含NODE_LOCAL,NODE_LOCAL包含PROCESS_LOCAL,然而ANY包含其餘全部四種。初始化階段在對Task分類時,根據Task的preferredLocations判斷它屬於哪一個Locality級別,屬於PROCESS_LOCAL的Task同時也會被加入到NODE_LOCAL、RACK_LOCAL類別中,好比,一個Task的preferredLocations指定了在Executor-2上執行,那麼它屬於Executor-2對應的PROCESS_LOCAL類別,同時也把他加入到Executor-2所在的主機對應的NODE_LOCAL類別,Executor-2所在的主機的機架對應的RACK_LOCAL類別中,以及ANY類別,這樣在調度執行時,知足不了PROCESS_LOCAL,就逐步退化到NODE_LOCAL,RACK_LOCAL,ANY。
TaskSetManager在決定調度哪些Task時,是經過上面流程圖中的resourceOffer方法來實現,爲了儘量地將Task調度到它的preferredLocations上,它採用一種延遲調度算法。resourceOffer方法原型以下,參數包括要調度任務的Executor Id、主機地址以及最大可容忍的Locality級別。
def resourceOffer( execId: String, host: String, maxLocality: TaskLocality.TaskLocality) : Option[TaskDescription]
延遲調度算法的大體流程以下圖所示。
首先看是否存在execId對應的PROCESS_LOCAL類別的任務,若是存在,取出來調度,不然根據當前時間,判斷是否超過了PROCESS_LOCAL類別最大容忍的延遲,若是超過,則退化到下一個級別NODE_LOCAL,不然等待不調度。退化到下一個級別NODE_LOCAL後調度流程也相似,看是否存在host對應的NODE_LOCAL類別的任務,若是存在,取出來調度,不然根據當前時間,判斷是否超過了NODE_LOCAL類別最大容忍的延遲,若是超過,則退化到下一個級別RACK_LOCAL,不然等待不調度,以此類推…..。當不知足Locatity類別會選擇等待,直到下一輪調度重複上述流程,若是你比較激進,能夠調大每一個類別的最大容忍延遲時間,若是不知足Locatity時就會等待多個調度週期,直到知足或者超過延遲時間退化到下一個級別爲止。
除了選擇合適的Task調度運行外,還須要監控Task的執行狀態,前面也提到,與外部打交道的是SchedulerBackend,Task被提交到Executor啓動執行後,Executor會將執行狀態上報給SchedulerBackend,SchedulerBackend則告訴TaskScheduler,TaskScheduler找到該Task對應的TaskSetManager,並通知到該TaskSetManager,這樣TaskSetManager就知道Task的失敗與成功狀態,對於失敗的Task,會記錄它失敗的次數,若是失敗次數尚未超過最大重試次數,那麼就把它放回待調度的Task池子中,不然整個Application失敗。
在記錄Task失敗次數過程當中,會記錄它上一次失敗所在的Executor Id和Host,這樣下次再調度這個Task時,會使用黑名單機制,避免它被調度到上一次失敗的節點上,起到必定的容錯做用。黑名單記錄Task上一次失敗所在的Executor Id和Host,以及其對應的「黑暗」時間,「黑暗」時間是指這段時間內不要再往這個節點上調度這個Task了。
TaskScheduler在啓動SchedulerBackend後,還會啓動一個後臺線程專門負責推測任務的調度,推測任務是指對一個Task在不一樣的Executor上啓動多個實例,若是有Task實例運行成功,則會幹掉其餘Executor上運行的實例。推測調度線程會每隔固定時間檢查是否有Task須要推測執行,若是有,則會調用SchedulerBackend的reviveOffers去嘗試拿資源運行推測任務。
檢查是否有Task須要推測執行的邏輯最後會交到TaskSetManager,TaskSetManager採用基於統計的算法,檢查Task是否須要推測執行,算法流程大體以下圖所示。
TaskSetManager首先會統計成功的Task數,當成功的Task數超過75%(可經過參數spark.speculation.quantile
控制)時,再統計全部成功的Tasks的運行時間,獲得一箇中位數,用這個中位數乘以1.5(可經過參數spark.speculation.multiplier
控制)獲得運行時間門限,若是在運行的Tasks的運行時間超過這個門限,則對它啓用推測。算法邏輯較爲簡單,其實就是對那些拖慢總體進度的Tasks啓用推測,以加速整個TaskSet即Stage的運行。
在前文已經提過,ApplicationMaster和SchedulerBackend起來後,SchedulerBackend經過ApplicationMaster申請資源,ApplicationMaster就是用來專門適配YARN申請Container資源的,當申請到Container,會在相應Container上啓動Executor進程,其餘事情就交給SchedulerBackend。Spark早期版本只支持靜態資源申請,即一開始就指定用多少資源,在整個Spark應用程序運行過程當中資源都不能改變,後來支持動態Executor申請,用戶不須要指定確切的Executor數量,Spark會動態調整Executor的數量以達到資源利用的最大化。
靜態資源申請是用戶在提交Spark應用程序時,就要提早估計應用程序須要使用的資源,包括Executor數(num_executor)、每一個Executor上的core數(executor_cores)、每一個Executor的內存(executor_memory)以及Driver的內存(driver_memory)。
在估計資源使用時,應當首先了解這些資源是怎麼用的。任務的並行度由分區數(Partitions)決定,一個Stage有多少分區,就會有多少Task。每一個Task默認佔用一個Core,一個Executor上的全部core共享Executor上的內存,一次並行運行的Task數等於num_executor*executor_cores,若是分區數超過該值,則須要運行多個輪次,通常來講建議運行3~5輪較爲合適,不然考慮增長num_executor或executor_cores。因爲一個Executor的全部tasks會共享內存executor_memory,因此建議executor_cores不宜過大。executor_memory的設置則須要綜合每一個分區的數據量以及是否有緩存等邏輯。下圖描繪了一個應用程序內部資源利用狀況。
動態資源申請目前只支持到Executor,便可以不用指定num_executor,經過參數spark.dynamicAllocation.enabled來控制。因爲許多Spark應用程序一開始可能不須要那麼多Executor或者其自己就不須要太多Executor,因此沒必要一次性申請那麼多Executor,根據具體的任務數動態調整Executor的數量,儘量作到資源的不浪費。因爲動態Executor的調整會致使Executor動態的添加與刪除,若是刪除Executor,其上面的中間Shuffle結果可能會丟失,這就須要藉助第三方的ShuffleService了,若是Spark是部署在Yarn上,則能夠在Yarn上配置Spark的ShuffleService,具體操做僅需作兩點:
<property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle,spark_shuffle</value> </property> <property> <name>yarn.nodemanager.aux-services.spark_shuffle.class</name> <value>org.apache.spark.network.yarn.YarnShuffleService</value> </property> <property> <name>spark.shuffle.service.port</name> <value>7337</value> </property>
當啓用動態Executor申請時,在SparkContext初始化過程當中會實例化ExecutorAllocationManager,它是被用來專門控制動態Executor申請邏輯的,動態Executor申請是一種基於當前Task負載壓力實現動態增刪Executor的機制。一開始會按照參數spark.dynamicAllocation.initialExecutors設置的初始Executor數申請,而後根據當前積壓的Task數量,逐步增加申請的Executor數,若是當前有積壓的Task,那麼取積壓的Task數和spark.dynamicAllocation.maxExecutors中的最小值做爲Executor數上限,每次新增長申請的Executor爲2的次方,即第一次增長1,第二次增長2,第三次增長4,…。另外一方面,若是一個Executor在一段時間內都沒有Task運行,則將其回收,可是在Remove Executor時,要保證最少的Executor數,該值經過參數spark.dynamicAllocation.minExecutors來控制,若是Executor上有Cache的數據,則永遠不會被Remove,以保證中間數據不丟失。
但願後續能夠改下。。。
參考:Spark Scheduler內部原理剖析 spark DAGScheduler、TaskSchedule、Executor執行task源碼分析 Spark核心做業調度和任務調度之DAGScheduler源碼 Spark Scheduler模塊詳解-DAGScheduler實現