上篇文章 spark 源碼分析之十八 -- Spark存儲體系剖析 重點剖析了 Spark的存儲體系。從本篇文章開始,剖析Spark做業的調度和計算體系。html
在說DAG以前,先簡單說一下RDD。node
文檔說明以下:apache
RDD全稱Resilient Distributed Dataset,即分佈式彈性數據集。它是Spark的基本抽象,表明不可變的可分區的可並行計算的數據集。app
RDD的特色:框架
1. 包含了一系列的分區異步
2. 在每個split上執行函數計算分佈式
3. 依賴於其餘的RDDide
4. 對於key-value對的有partitioner函數
5. 每個計算有優先計算位置oop
更多內容能夠去看Spark的論文:http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf
RDD的操做
RDD支持兩種類型的操做:
注意:reduce 是一個action,而 reduceByKey 則是一個transform,由於它返回的是一個分佈式數據集,並無把數據返回給driver節點。
官方提供了RDD的action函數,以下:
注意:這只是常見的函數,並無列舉全部的action函數。
那麼action函數有哪些特色呢?
根據上面介紹的,即action會返回一個值給driver節點。即它們的函數返回值是一個具體的非RDD類型的值或Unit,而不是RDD類型的值。
官方提供了Transform 函數,以下:
上文提到,transformation接收一個存在的數據集,並將計算結果做爲新的RDD返回。也是就說,它的返回結果是RDD。
其實,理解了action和transformation的特色,看函數的定義就知道是action仍是transformation。
官方文檔裏,聊完RDD的操做,緊接着就聊了一下shuffle,咱們按照這樣的順序來作一下說明。
官方給出的shuffle的解釋以下:
注意:shuffle是特定操做纔會發生的事情,這跟action和transformation劃分沒有關係。
官方給出了一些常見的例子。
Operations which can cause a shuffle include repartition operations like repartition
and coalesce
, ByKey operations (except for counting) like groupByKey
and reduceByKey
, and join operations like cogroup
and join
.
那麼shuffle跟什麼有關係呢?
shuffle跟依賴有關係。在 spark 源碼分析之一 -- RDD的四種依賴關係 中,說到 RDD 分爲寬依賴和窄依賴,其中窄依賴有三種,一對一依賴、Range依賴、Prune 依賴。寬依賴只有一種,那就是 shuffle 依賴。
即RDD跟父RDD的依賴關係是寬依賴,那麼就是父RDD在生成新的子RDD的過程當中是存在shuffle過程的。
如圖:
這張圖也說明了一個結論,並非全部的join都是寬依賴。
咱們一般說的 RDD,在Spark中具體表現爲一個抽象類,全部的RDD子類繼承自該RDD,全稱爲 org.apache.spark.rdd.RDD,以下:
它有兩個參數,一個參數是SparkContext,另外一個是deps,即Dependency集合,Dependency是全部依賴的公共父類,即deps保存了父類的依賴關係。
其中,窄依賴的父類是 NarrowDependency, 它的構造方法裏是由父RDD這個參數的,寬依賴 ShuffleDependency ,它的構造方法裏也是有父RDD這個參數的。
獲取抽象的方法是 getDependencies 方法,以下:
這只是定義在RDD抽象父類中的默認方法,不一樣的子類會有不一樣的實現。
它在以下類中又從新實現了這個方法,以下:
是不是shuffle依賴,跟分區的數量也有必定的關係,具體能夠看下面的幾個RDD的依賴的實現:
以下圖,一個application的執行過程被劃分爲四個階段:
階段一:咱們編寫driver程序,定義RDD的action和transformation操做。這些依賴關係造成操做的DAG。
階段二:根據造成的DAG,DAGScheduler將其劃分爲不一樣的stage。
階段三:每個stage中有一個TaskSet,DAGScheduler將TaskSet交給TaskScheduler去執行,TaskScheduler將任務執行完畢以後結果返回給DAGSCheduler。
階段四:TaskScheduler將任務分發到每個Worker節點去執行,並將結果返回給TaskScheduler。
本篇文章的定位就是階段一和階段二。後面會介紹階段三和階段四。
注:圖片不知出處。
咱們先來分析一個top N案例。
需求:一個大文件裏有不少的重複整數,如今求出重複次數最多的前10個數。
代碼以下(爲了多幾個stage,特地加了幾個repartition):
scala> val sourceRdd = sc.textFile("/tmp/hive/hive/result",10).repartition(5)
sourceRdd: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at repartition at <console>:27
scala> val allTopNs = sourceRdd.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_).repartition(10).sortByKey(ascending = true, 100).map(tup => (tup._2, tup._1)).mapPartitions(
| iter => {
| iter.toList.sortBy(tup => tup._1).takeRight(100).iterator
| }
| ).collect()
// 結果略
scala> val finalTopN = scala.collection.SortedMap.empty[Int, String].++(allTopNs)
//結果略
scala> finalTopN.takeRight(10).foreach(tup => {println(tup._2 + " occurs times : " + tup._1)})
53 occurs times : 1070
147 occurs times : 1072
567 occurs times : 1073
931 occurs times : 1075
267 occurs times : 1077
768 occurs times : 1080
612 occurs times : 1081
877 occurs times : 1082
459 occurs times : 1084
514 occurs times : 1087
下面看一下生成的DAG和Stage
任務概覽
Description描述的就是每個job的最後一個方法。
stage 0 到 3的DAG圖:
stage 4 到 8的DAG圖:
每個stage的Description描述的是stage的最後一個方法。
能夠看出,RDD的依賴關係是有driver端對RDD的操做造成的。
一個Stage中DAG的是根據RDD的依賴來構建的。
咱們來看一下源碼。
Stage是一組並行任務,它們都計算須要做爲Spark做業的一部分運行的相同功能,其中全部任務具備相同的shuffle依賴。由調度程序運行的每一個DAG任務在發生shuffle的邊界處被分紅多個階段,而後DAGScheduler以拓撲順序運行這些階段。每一個Stage均可以是一個shuffle map階段,在這種狀況下,其任務的結果是爲其餘階段或結果階段輸入的,在這種狀況下,其任務在RDD上運行函數直接計算Spark action(例如count(),save()等)。對於shuffle map階段,咱們還跟蹤每一個輸出分區所在的節點。每一個stage還有一個firstJobId,用於識別首次提交stage的做業。使用FIFO調度時,這容許首先計算先前做業的階段,或者在失敗時更快地恢復。最後,因爲故障恢復,能夠在屢次嘗試中從新執行單個stage。在這種狀況下,Stage對象將跟蹤多個StageInfo對象以傳遞給listener 或Web UI。最近的一個將經過latestInfo訪問。
Stage是一個抽象類,構造方法以下:
參數介紹以下:
id – Unique stage ID
rdd – RDD that this stage runs on: for a shuffle map stage, it's the RDD we run map tasks on, while for a result stage, it's the target RDD that we ran an action on
numTasks – Total number of tasks in stage; result stages in particular may not need to compute all partitions, e.g. for first(), lookup(), and take().
parents – List of stages that this stage depends on (through shuffle dependencies).
firstJobId – ID of the first job this stage was part of, for FIFO scheduling.
callSite – Location in the user program associated with this stage: either where the target RDD was created, for a shuffle map stage, or where the action for a result stage was called
callSite其實記錄的就是stage用戶代碼的位置。
其實相對來講比較簡單。
它有兩個子類,以下:
類說明:
ResultStages apply a function on some partitions of an RDD to compute the result of an action.
The ResultStage object captures the function to execute, func, which will be applied to each partition, and the set of partition IDs, partitions.
Some stages may not run on all partitions of the RDD, for actions like first() and lookup().
ResultStage在RDD的某些分區上應用函數來計算action操做的結果。 對於諸如first()和lookup()之類的操做,某些stage可能沒法在RDD的全部分區上運行。
簡言之,ResultStage是應用action操做在action上進而得出計算結果。
源碼以下:
ShuffleMapStages are intermediate stages in the execution DAG that produce data for a shuffle.
They occur right before each shuffle operation, and might contain multiple pipelined operations before that (e.g. map and filter).
When executed, they save map output files that can later be fetched by reduce tasks.
The shuffleDep field describes the shuffle each stage is part of, and variables like outputLocs and numAvailableOutputs track how many map outputs are ready.
ShuffleMapStages can also be submitted independently as jobs with DAGScheduler.submitMapStage.
For such stages, the ActiveJobs that submitted them are tracked in mapStageJobs.
Note that there can be multiple ActiveJobs trying to compute the same shuffle map stage.
ShuffleMapStage 是中間的stage,爲shuffle生產數據。它們在shuffle以前出現。當執行完畢以後,結果數據被保存,以便reduce 任務能夠獲取到。
shuffleDep記錄了每個stage所屬的shuffle。
在上面咱們提到,每個RDD都有對父RDD的依賴關係,這樣的依賴關係造成了一個有向無環圖。即DAG。
當一個用戶在一個RDD上運行一個action時,調度會檢查RDD的血緣關係(即依賴關係)來建立一個stage中的DAG圖來執行。
以下圖:
在說stage劃分以前先,剖析一下跟DAGScheduler相關的類。
An event loop to receive events from the caller and process all events in the event thread. It will start an exclusive event thread to process all events.
Note: The event queue will grow indefinitely. So subclasses should make sure onReceive can handle events in time to avoid the potential OOM.
它定義了異步消息處理機制框架。
其內部有一個阻塞雙端隊列,用於存放消息:
外部線程調用 post 方法將事件post到堵塞隊列中:
有一個消息的消費線程:
onReceive 方法是一個抽象方法,由子類來實現。
下面來看其實現類 -- DAGSchedulerEventProcessLoop。
其接收的是DAGSchedulerEvent類型的事件。DAGSchedulerEvent 是一個sealed trait,其實現以下:
它的每個子類事件,在doOnReceive 方法中都有體現,以下:
這個類的定義已經超過2k行了。因此也不打算所有介紹,本篇文章只介紹跟stage任務的生成相關的屬性和方法。
The high-level scheduling layer that implements stage-oriented scheduling. It computes a DAG of stages for each job, keeps track of which RDDs and stage outputs are materialized, and finds a minimal schedule to run the job. It then submits stages as TaskSets to an underlying TaskScheduler implementation that runs them on the cluster. A TaskSet contains fully independent tasks that can run right away based on the data that's already on the cluster (e.g. map output files from previous stages), though it may fail if this data becomes unavailable.
Spark stages are created by breaking the RDD graph at shuffle boundaries. RDD operations with "narrow" dependencies, like map() and filter(), are pipelined together into one set of tasks in each stage, but operations with shuffle dependencies require multiple stages (one to write a set of map output files, and another to read those files after a barrier). In the end, every stage will have only shuffle dependencies on other stages, and may compute multiple operations inside it. The actual pipelining of these operations happens in the RDD.compute() functions of various RDDs
In addition to coming up with a DAG of stages, the DAGScheduler also determines the preferred locations to run each task on, based on the current cache status, and passes these to the low-level TaskScheduler. Furthermore, it handles failures due to shuffle output files being lost, in which case old stages may need to be resubmitted. Failures within a stage that are not caused by shuffle file loss are handled by the TaskScheduler, which will retry each task a small number of times before cancelling the whole stage. When looking through this code, there are several key concepts:
- Jobs (represented by ActiveJob) are the top-level work items submitted to the scheduler. For example, when the user calls an action, like count(), a job will be submitted through submitJob. Each Job may require the execution of multiple stages to build intermediate data.
- Stages (Stage) are sets of tasks that compute intermediate results in jobs, where each task computes the same function on partitions of the same RDD. Stages are separated at shuffle boundaries, which introduce a barrier (where we must wait for the previous stage to finish to fetch outputs). There are two types of stages: ResultStage, for the final stage that executes an action, and ShuffleMapStage, which writes map output files for a shuffle. Stages are often shared across multiple jobs, if these jobs reuse the same RDDs.
- Tasks are individual units of work, each sent to one machine.
- Cache tracking: the DAGScheduler figures out which RDDs are cached to avoid recomputing them and likewise remembers which shuffle map stages have already produced output files to avoid redoing the map side of a shuffle.
- Preferred locations: the DAGScheduler also computes where to run each task in a stage based on the preferred locations of its underlying RDDs, or the location of cached or shuffle data.
- Cleanup: all data structures are cleared when the running jobs that depend on them finish, to prevent memory leaks in a long-running application.
To recover from failures, the same stage might need to run multiple times, which are called "attempts". If the TaskScheduler reports that a task failed because a map output file from a previous stage was lost, the DAGScheduler resubmits that lost stage. This is detected through a CompletionEvent with FetchFailed, or an ExecutorLost event. The DAGScheduler will wait a small amount of time to see whether other nodes or tasks fail, then resubmit TaskSets for any lost stage(s) that compute the missing tasks. As part of this process, we might also have to create Stage objects for old (finished) stages where we previously cleaned up the Stage object. Since tasks from the old attempt of a stage could still be running, care must be taken to map any events received in the correct Stage object.
Here's a checklist to use when making or reviewing changes to this class:
- All data structures should be cleared when the jobs involving them end to avoid indefinite accumulation of state in long-running programs.
- When adding a new data structure, update DAGSchedulerSuite.assertDataStructuresEmpty to include the new structure. This will help to catch memory leaks.
下面直接來看stage的劃分
以collect函數爲例。
collect 函數定義以下:
其調用了SparkContext的 runJob 方法,又調用了幾回其重載方法最終調用的runJob 方法以下:
其內部調用了DAGScheduler的runJob 方法
DAGScheduler的runJob 方法以下:
思路,提交方法後返回一個JobWaiter 對象,等待任務執行完成,而後根據任務執行狀態去執行對應的成功或失敗的方法。
submitJob 以下:
最終任務被封裝進了JobSubmitted 事件消息體中,最終該事件消息被放入了eventProcessLoop 對象中,eventProcessLoop定義以下:
即事件被放入到了上面咱們提到的 DAGSchedulerEventProcessLoop 異步消息處理模型中。
DAGSchedulerEventProcessLoop 的 doOnReceive 中,發現了 JobSubmitted 事件對應的分支爲:
即會執行DAGScheduler的handleJobSubmitted方法,以下:
這個方法裏面有兩步:
本篇文章,咱們只分析第一步,第二步在下篇文章分析。
createResultStage 方法以下:
getOrCreateParentStage 方法建立或獲取該RDD的Shuffle依賴關係,而後根據shuffle依賴進而劃分stage,源碼以下:
獲取其全部父類的shuffle依賴,getShuffleDependency 方法以下,相似於樹的深度遍歷。
getOrCreateShuffleMapStage方法根據shuffle依賴建立ShuffleMapStage,以下,思路,先查看當前stage是否已經記錄在shuffleIdToMapStage變量中,若存在,表示已經建立過了,不然須要根據依賴的RDD去找其RDD的shuffle依賴,而後再建立shuffleMapStage。
shuffleIdToMapStage定義以下:
這個map中只包含正在運行的job的stage信息。
其中shuffle 依賴的惟一id 是:shuffleId,這個id 是 SpackContext 生成的全局shuffleId。
getMissingAncestorShuffleDependencies 方法以下,思路:深度遍歷依賴關係,把全部未運行的shuffle依賴都找到。
到此,全部尋找shuffle依賴關係的的邏輯都已經剖析完畢,下面看建立MapShuffleStage的方法,
思路:生成ShuffleMapStage,並更新 stageIdToStage變量,更新shuffleIdToMapStage變量,若是 MapOutputTrackerMaster 中沒有註冊過該shuffle,須要註冊,最後返回ShuffleMapStage對象。
updateJobIdStageIdMaps方法以下,思路該ResultStage依賴的全部ShuffleMapStage的jobId設定爲指定的jobId,即跟ResultStage一致的jobId:
至此,stage的劃分邏輯剖析完畢。
本篇文章對照官方文檔,說明了RDD的主要操做,action和transformation,進一步引出了RDD的依賴關係,最後剖析了DAGScheduler根據shuffle依賴劃分stage的邏輯。
注:文章中圖片來源於 Spark 論文,論文地址:http://people.csail.mit.edu/matei/papers/2012/nsdi_spark.pdf