調度系統,是貫穿整個Spark應用的主心骨,從調度系統開始入手瞭解Spark Core,比較容易理清頭緒。html
Spark的資源調度採用的是常見的兩層調度,底層資源的管理和分配是第一層調度,交給YARN、Mesos或者Spark的Standalone集羣處理,Application從第一層調度拿到資源後,還要進行內部的任務和資源調度,將任務和資源進行匹配,這是第二層調度,本文講的就是這第二層調度。算法
Spark的調度體系涉及的任務包括3個粒度,分別是Job、Stage、Task。
Job表明用戶提交的一系列操做的整體,一個具體的計算任務,有明確的輸入輸出,一個Job由多個Stage組成;
一個Stage表明Job計算流程的一個組成部分,一個階段,包含多個Task;
一個Task表明對一個分區的數據進行計算的具體任務。shell
層級關係:Job > Stage > Taskapache
在Spark Core 解析:RDD 彈性分佈式數據集中,已經解釋了RDD之間的依賴,以及如何組成RDD血緣圖。segmentfault
因此本文主要目的就是解釋清楚:Scheduler將RDD血緣圖轉變成Stage DAG,而後生成Task,最後提交給Executor去執行的過程。後端
Job的不一樣分區的計算一般能夠並行,可是有些計算須要將數據進行從新分區,這個過程稱做shuffle(混洗)。Shuffle的過程是無法徹底並行的,這時候就會出現task之間的等待,task的數量也可能發生變化,因此Spark中以shuffle爲邊界,對task進行劃分,劃分出來的每段稱爲Stage。app
Stage表明一組能夠並行的執行相同計算的task,每一個任務必須有相同的分區規則,這樣一個stage中是沒有shuffle的。異步
在一個Spark App中,stage有一個全局惟一ID,stage id是自增的。async
Stage分爲兩種:分佈式
stage建立流程:
一旦發現某個RDD的dependency是ShuffleDependency,就建立一個ShuffleMapStage。
val rg=sc.parallelize(List((1,10),(2,20))) rg.reduceByKey(_+_).collect
這裏reduceByKey操做引發了一次shuffle,因此job被切分紅了2個stage。
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))) val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C"))) rddA.join(rddB).collect
join操做致使rddA和rddB都進行了一次shuffle,因此有3個stage。
import org.apache.spark.HashPartitioner val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))).partitionBy(new HashPartitioner(3)) val rddB=sc.parallelize(List((1,"A"),(2,"B"),(3,"C"))) rddA.join(rddB).collect
WHAT ?
由於rddA已經定義了Partitioner,這裏join操做會保留rddA的分區方式,因此對rddA的依賴是OneToOneDepenency,而對於rddB則是ShuffleDependency。
val rddA=sc.parallelize(List((1,"a"),(2,"b"),(3,"c"))) rddA join rddA collect
一個RDD被兩個stage使用了。
綜上,stage的劃分必定是依據shuffle即ShuffleDependency,跟算子和RDD變量的定義沒有很強的關係,example2和3中的join操做rddA.join(rddB).collect
看起來如出一轍,但實際產生的stage劃分卻差異很大。
與stage對應,task也分爲兩種:
一個stage有多少個partition就會建立多少個task,好比一個ShuffleMapStage有10個partition,那麼就會建立10個ShuffleMapTask。
一個Stage中的全部task組成一個TaskSet。
graph TB R(RDD.action)-->S(SparkContext.runJob)-- RDD -->D(DAGScheduler.runJob) -- TaskSet -->T(TaskScheduler.submitTasks)-- TaskDescription -->E(Executor.launchTask)
RDD在action操做中經過SparkContext.runJob方法觸發Job執行流程,該方法將調用DagScheduler.runJob方法,將RDD傳入DagScheduler。而後,DAGScheduler建立TaskSet提交給TaskScheduler,TaskScheduler再將TaskSet封裝成TaskDescription發送給Executor,最後Executor會將TaskDescription提交給線程池來運行。
Stage級別的調度是DagScheduler負責的,也是Spark調度體系的核心。
sequenceDiagram participant M as main thread participant L as eventProcessLoop participant E as event thread M-->>L: post event E-->>L: handle event
DagScheduler內部維護了一個事件消息總線eventProcessLoop(類型爲DAGSchedulerEventProcessLoop),其實就是一個用來存儲DAGSchedulerEvent類型數據的隊列。
當DagScheduler的一些方法被調用的時候(如submitJob方法),並不會在主線程中處理該任務,而是post一個event(如JobSubmitted)到eventProcessLoop。eventProcessLoop中有一個守護線程,會不斷的依次從隊列中取出event,而後調用對應的handle(如handleJobSubmitted)方法來執行具體的任務。
DagScheduler.runJob方法會調用submitJob方法,向eventProcessLoop發送一個JobSubmitted類型的消息,其中包含了RDD等信息。當eventProcessLoop接收到JobSubmitted類型的消息,會調用DagScheduler.handleJobSubmitted方法來處理消息。
sequenceDiagram participant M as main thread(runJob) participant L as eventProcessLoop participant E as event thread(handleJobSubmitted) M-->>L: post JobSubmitted event E-->>L: handle JobSubmitted event
2.create stage
建立一個ShuffleMapStage的過程同理會須要建立它的parent stage,也是若干ShuffleMapStage。如此遞歸下去,直到建立完全部的ShuffleMapStage,最後才完成ResultStage的建立。最後建立出來的這些Stage(若干ShuffleMapStage加一個ResultStage),經過parent屬性串起來,就像這樣
graph TD A[ResultStage]-- parent -->B[ShuffleMapStage 1] A-- parent -->C[ShuffleMapStage 2] B-- parent -->D[ShuffleMapStage 3]
這就生成了所謂的DAG圖,可是這個圖的指向跟執行順序是反過來的,若是按執行順序來畫DAG圖,就是常見的形式了:
graph TD D[ShuffleMapStage 3]-->C[ShuffleMapStage 2] C[ShuffleMapStage 2]-->A[ResultStage] B[ShuffleMapStage 1]-->A[ResultStage]
DagScheduler.handleJobSubmitted方法建立好ResultStage後會提交這個stage(submitStage方法),在提交一個stage的時候,會要先提交它的parent stage,也是經過遞歸的形式,直到一個stage的全部parent stage都被提交了,它本身才能被提交,若是一個stage的parent尚未完成,則會把這個stage加入waitingStages。也就是說,DAG圖中前面的stage會被先提交。當一個stage的parent都準備好了,也就是執行完了,它纔會進入submitMissingTasks的環節。
Task是在DagScheduler(不是TaskScheduler)的submitMissingTasks方法中建立的,包括ShuffleMapTask和ResultTask,與Stage對應。歸屬於同一個stage的這批Task組成一個TaskSet集合,最後提交給TaskScheduler的就是這個TaskSet集合。
Task的調度工做是由TaskScheduler與SchedulerBackend緊密合做,共同完成的。
TaskScheduler是task級別的調度器,主要做用是管理task的調度和提交,是Spark底層的調度器。
SchedulerBackend是TaskScheduler的後端服務,有獨立的線程,全部的Executor都會註冊到SchedulerBackend,主要做用是進行資源分配、將task分配給executor等。
第一個線程是DAGScheduler的事件處理線程,在其中,Task先通過DAGScheduler(藍色箭頭表示)封裝成TaskSet,再由TaskScheduler(綠色箭頭)封裝成TaskSetManager,並加入調度隊列中。
SchedulerBackend在收到ReviveOffers消息時,會從線程池取一個線程進行makeOffers操做,WorkerOffer建立後傳遞給TaskScheduler進行分配。
圖中第二個線程就是SchedulerBackend的一個事件分發線程,從Pool中取出最優先的TaskSetManager,而後將WorkerOffer與其中的Task進行配對,生成TaskDescription,發送給WorkerOffer指定的Executor去執行。
Stage,TaskSet,TaskSetManager是一一對應的,數量相等,都是隻存在driver上的。
Parition,Task,TaskDescription是一一對應,數量相同,Task和TaskDescription是會被髮到executor上的。
與DAGScheduler不一樣的是TaskScheduler有調度池,有兩種調度實體,Pool和TaskSetManager。
與YARN的調度隊列相似,採用了層級隊列的方式,Pool是TaskSetManager的容器,起到將TaskSetManager分組的做用。
Schedulable是調度實體的基類,有兩個子類Pool和TaskSetManager。
要理解調度規則,必須知道下面幾個屬性:
Pool和TaskSetManager對於這些屬性的取值有所不一樣,從而致使了他們的調度行爲也不同。
properties | Pool | TaskSetManager |
---|---|---|
weight | config | 1 |
minShare | config | 0 |
priority | 0 | jobId |
stageId | -1 | stageId |
name | config | TaskSet_{taskSet.id} |
runningTasks | Pool所含TaskSetManager的runningTasks和 | TaskSetManager運行中task數 |
TaskScheduler有個屬性schedulingMode,值取決於配置項spark.scheduler.mode
,默認爲FIFO。這個屬性會致使TaskScheduler使用不一樣的SchedulableBuilder,即FIFOSchedulableBuilder和FairSchedulableBuilder。
TaskScheduler在初始化的時候,就會建立root pool,根調度池,是全部pool的祖先。
它的屬性取值爲:
name: "" (空字符串) schedulingMode: 同TaskScheduler的schedulingMode屬性 weight: 0 minShare: 0
注意root pool的調度模式肯定了。
接下來會執行schedulableBuilder.buildPools()
方法,
如果FairSchedulableBuilder
這時default pool它的屬性取值是固定的:
name: "default" schedulingMode: FIFO weight: 1 minShare: 0
當TaskScheduler提交task的時候,會先建立TaskSetManager,而後經過schedulableBuilder添加到pool中。
如果FairSchedulableBuilder
spark.scheduler.pool
配置獲取pool name,沒有定義則用'default';通過上面兩部分,最終獲得的調度池結構以下:
spark.scheduler.mode=FIFO
spark.scheduler.mode=FAIR
Fair Scheduler Pool的劃分依賴於配置文件,默認的配置文件爲'fairscheduler.xml',也能夠經過配置項"spark.scheduler.allocation.file"指定配置文件。
煮個栗子,文件內容以下:
<?xml version="1.0"?> <allocations> <pool name="prod"> <schedulingMode>FAIR</schedulingMode> <weight>1</weight> <minShare>2</minShare> </pool> <pool name="test"> <schedulingMode>FIFO</schedulingMode> <weight>2</weight> <minShare>3</minShare> </pool> </allocations>
這裏配置了兩個pool,prod和test,而且配置了相關屬性,這兩個pool都會添加到root pool中。
以SchedulingAlgorithm爲基類,內置實現的調度算法有兩種FIFOSchedulingAlgorithm和FairSchedulingAlgorithm,其邏輯以下:
FIFO: 先進先出,優先級比較算法以下,
FAIR:公平調度,優先級比較算法以下,
TaskSetManager之間的比較,其實就是先比較jobId再比較stageId,誰小誰優先,意味着就是誰先提交誰優先。
Pool之間的比較,不存在!FIFO的pool隊列中是不會有pool的。
TaskSetManager之間的比較,由於minShare=0,weight=1,FAIR算法變成了:
Pool之間的比較,就是標準的FAIR算法。
當root pool爲FAIR模式,先取最優先的pool,再從pool中,按pool的調度模式取優先的TaskSetManager。
啓用FAIR模式:
fairscheduler.xml
文件--conf spark.scheduler.mode=FAIR
spark-shell --master yarn --deploy-mode client --conf spark.scheode=FAIR
啓動後若是直接運行Job會自動提交到default pool,那麼如何提交Job到指定pool?
SparkContext.setLocalProperty("spark.scheduler.pool","poolName")
若是每次只運行一個Job,開啓FAIR模式的意義不大,那麼如何同時運行多個Job?
要異步提交Job,須要用到RDD的async action,目前有以下幾個:
countAsync collectAsync takeAsync foreachAsync foreachPartitionAsync
舉個例子:
sc.setLocalProperty("spark.scheduler.pool","test") b.foreachAsync(_=>Thread.sleep(100)) sc.setLocalProperty("spark.scheduler.pool","production") b.foreachAsync(_=>Thread.sleep(100))
這樣就會有兩個任務在不一樣的pool同時運行:
場景1:Spark SQL thrift server
做用:讓離線任務和交互式查詢任務分配到不一樣的pool,給交互式查詢任務更高的優先級,這樣長時間運行的離線任務就不會一直佔用全部資源,阻塞交互式查詢任務。
場景2:Streaming job與Batch job同時運行
做用:好比用Streaming接數據寫入HDFS,可能產生不少小文件,能夠在低優先級的pool定時運行batch job合併小文件。
另外能夠參考Spark Summit 2017的分享:Continuous Application with FAIR Scheduler
轉載請註明原文地址:
https://liam-blog.ml/2019/11/07/spark-core-scheduler/