前面幾篇文章已經說清楚了從 spark 任務提交到 driver 啓動,而後執行 main 方法,初始化 SparkContext 對象。java
在初始化 SparkContext 對象的過程當中建立了兩個重要組件:算法
一個是 TaskScheduler(其實是他的實現類 TaskSchedulerImpl 對象),這個對象內部會持有一個 SchedulerBackend 對象,SchedulerBackend 內部會又會持有一個 DriverEndpoint 對象(實際上就是一個 RpcEndpoint)。這樣 TaskScheduler 就能夠經過 SchedulerBackend 和集羣資源管理器或者 Executor 對應 worker 節點進行通訊作一些事情。好比向 master 節點去註冊 application,master 在註冊 application 的過程當中會分配 worker 去啓動 Executor,當 Executor 啓動後又會和 TaskScheduler 進行註冊。數據結構
另外一個是 DAGScheduler,關於這個對象的建立過程前面沒有詳細講,主要是由於 DAGScheduler 是在 SparkContext 初始化結束後,執行到 RDD 的 Action 操做的時候纔會開始工做,下面就從 RDD 的 action 操做提及,看看 DAGScheduler 是怎麼工做的。app
仍是以 wordcount 程序爲例:ide
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("./file/localfile")
val words = lines.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey(_ + _)
wordCounts.foreach(wordCount => println(wordCount._1 + " " + wordCount._2))
複製代碼
當代碼執行到 wordCounts.foreach 時候會調用到 RDD 的 foreach 方法,RDD 的 foreach 方法會去調用 SparkContext 的 runjob 方法。函數
SparkContext 中會有多個 runjob 方法,最後都會走到一個 runjob 那裏去,這個 runjob 方法最終會調用 DAGScheduler 的 runJob 的方法,具體能夠先看下這個 SparkContext 的 runjob 方法。oop
def runJob[T, U: ClassTag](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
resultHandler: (Int, U) => Unit): Unit = {
if (stopped.get()) {
throw new IllegalStateException("SparkContext has been shutdown")
}
val callSite = getCallSite
val cleanedFunc = clean(func)
logInfo("Starting job: " + callSite.shortForm)
if (conf.getBoolean("spark.logLineage", false)) {
logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
}
// 去調用 DAGScheduler 的 runjob 方法
dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
progressBar.foreach(_.finishAll())
rdd.doCheckpoint()
}
複製代碼
最主要的仍是 DAGScheduler 中的 runjob 方法。post
這個 runjob 方法內部實際上調用了 submitJob 方法,用於提交 job。該方法返回一個 JobWaiter,用於等待 DAGScheduler 任務的完成。spa
def runJob[T, U](
rdd: RDD[T],
func: (TaskContext, Iterator[T]) => U,
partitions: Seq[Int],
callSite: CallSite,
resultHandler: (Int, U) => Unit,
properties: Properties): Unit = {
val start = System.nanoTime
// 調用 submitJob 方法
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
複製代碼
submitJob 方法是調用 eventProcessLoop 的 post 方法將 JobSubmitted 事件添加到 DAGScheduler 的事件隊列中去。線程
eventProcessLoop.post(JobSubmitted(
jobId, rdd, func2, partitions.toArray, callSite, waiter,
SerializationUtils.clone(properties)))
複製代碼
這裏的 eventProcessLoop 是 DAGSchedulerEventProcessLoop 對象,在 DAGScheduler 的初始化代碼中能夠看到。DAGSchedulerEventProcessLoop 實際上內部有一個線程,用來處理事件隊列。
事件隊列的處理最後會走到 DAGSchedulerEventProcessLoop 的 onReceive 的回調方法裏面去。
/** * The main event loop of the DAG scheduler. */
override def onReceive(event: DAGSchedulerEvent): Unit = {
val timerContext = timer.time()
try {
// 調用 doOnReceive 方法
doOnReceive(event)
} finally {
timerContext.stop()
}
}
複製代碼
後面會去調用 doOnReceive 方法,根據 event 進行模式匹配,匹配到 JobSubmitted 的 event 後其實是去調用 DAGScheduler 的 handleJobSubmitted 這個方法。
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
// 模式匹配
case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
// 調用 handleJobSubmitted 方法
dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
複製代碼
下面來看 handleJobSubmitted 這個方法作了哪些操做:
1,使用觸發 job 的最後一個 rdd,建立 finalStage;
注: Stage 是一個抽象類,一共有兩個實現,一個是 ResultStage,是用 action 中的函數計算結果的 stage;另外一個是 ShuffleMapStage,是爲 shuffle 準備數據的 stage。
2,構造一個 Job 對象,將上面建立的 finalStage 封裝進去,這個 Job 的最後一個 stage 也就是這個 finalStage;
3,將 Job 的相關信息保存到內存的數據結構中;
4,調用 submitStage 方法提交 finalStage。
private[scheduler] def handleJobSubmitted(jobId: Int, finalRDD: RDD[_], func: (TaskContext, Iterator[_]) => _,
partitions: Array[Int],
callSite: CallSite,
listener: JobListener,
properties: Properties) {
var finalStage: ResultStage = null
try {
// 使用觸發 job 的最後一個 RDD 建立一個 ResultStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
} catch {
case e: Exception =>
logWarning("Creating new stage failed due to exception - job: " + jobId, e)
listener.jobFailed(e)
return
}
// 使用前面建立好的 ResultStage 去建立一個 job
// 這個 job 的最後一個 stage 就是 finalStage
val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
clearCacheLocs()
logInfo("Got job %s (%s) with %d output partitions".format(
job.jobId, callSite.shortForm, partitions.length))
logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
logInfo("Parents of final stage: " + finalStage.parents)
logInfo("Missing parents: " + getMissingParentStages(finalStage))
// 將 job 的相關信息存儲到內存中
val jobSubmissionTime = clock.getTimeMillis()
jobIdToActiveJob(jobId) = job
activeJobs += job
finalStage.setActiveJob(job)
val stageIds = jobIdToStageIds(jobId).toArray
val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
listenerBus.post(
SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
// 提交 finalStage
submitStage(finalStage)
}
複製代碼
下面就會走進 submitStage 方法,這個方法是用來提交 stage 的,具體作了這些操做:
1,首先會驗證 stage 對應的 job id 進行校驗,存在纔會繼續執行;
2,在提交這個 stage 以前會判斷當前 stage 的狀態。
若是是 running、waiting、failed 的話就不作任何操做。
若是不是這三個狀態則會根據當前 stage 去往前推前面的 stage,若是能找到前面的 stage 則繼續遞歸調用 submitStage 方法,直到當前 stage 找不到前面的 stage 爲止,這時候的 stage 就至關於當前 job 的第一個 stage,而後回去調用 submitMissingTasks 方法去分配 task。
private def submitStage(stage: Stage) {
val jobId = activeJobForStage(stage)
// 看看當前的 job 是否存在
if (jobId.isDefined) {
logDebug("submitStage(" + stage + ")")
// 判斷當前 stage 的狀態
if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
// 根據當前的 stage 去推倒前面的 stage
val missing = getMissingParentStages(stage).sortBy(_.id)
logDebug("missing: " + missing)
// 若是前面已經沒有 stage 了,那麼久將當前 stage 去執行 submitMissingTasks 方法
// 若是前面還有 stage 的話那麼遞歸調用 submitStage
if (missing.isEmpty) {
logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
submitMissingTasks(stage, jobId.get)
} else {
for (parent <- missing) {
submitStage(parent)
}
// 將當前 stage 加入等待隊列
waitingStages += stage
}
}
} else {
// abortStage 終止提交當前 stage
abortStage(stage, "No active job for stage " + stage.id, None)
}
}
複製代碼
上面最重要的一個地方就是使用當前 stage 向前推,找到前面的 stage,也是 stage 的劃分算法。下面就看看 getMissingParentStages 這個劃分算法作了哪些操做:
1,建立 missing 和 visited 兩個 HashSet,分別用來存儲根據當前 stage 向前找到的全部 stage 數據和已經調用過 visit 方法的 RDD;
2,建立一個存放 RDD 的棧,而後將傳進來的 stage 中的 rdd 也就是 finalStage 中的那個 job 觸發的最後一個 RDD 放入棧中;
3,而後將棧中的 RDD 拿出來調用 visit 方法,這個 visit 方法內部會根據當前 RDD 的依賴鏈逐個遍歷全部 RDD,而且會根據相鄰兩個 RDD 的依賴關係來決定下面的操做:
若是是寬依賴,即 ShuffleDependency ,那麼會調用 getOrCreateShuffleMapStage 建立一個新的 stage,默認每一個 job 的最後一個 stage 是 ResultStage,剩餘的 job 中的其它 stage 均爲 ShuffleMapStage。而後會將建立的這個 stage 加入前面建立的 missing 的 HashSet 中;
若是是窄依賴,即 NarrowDependency,那麼會將該 RDD 加入到前面建立的 RDD 棧中,繼續遍歷調用 visit 方法。
直到全部的 RDD 都遍歷結束後返回前面建立的 missing 的集合。
private def getMissingParentStages(stage: Stage): List[Stage] = {
// 存放下面找到的全部 stage
val missing = new HashSet[Stage]
// 存放已經遍歷過的 rdd
val visited = new HashSet[RDD[_]]
// We are manually maintaining a stack here to prevent StackOverflowError
// caused by recursively visiting
// 建立一個維護 RDD 的棧
val waitingForVisit = new Stack[RDD[_]]
// visit 方法
def visit(rdd: RDD[_]) {
// 判斷當前 rdd 是否 visit 過
if (!visited(rdd)) {
visited += rdd
val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
if (rddHasUncachedPartitions) {
// 遍歷當前 RDD 的依賴鏈
for (dep <- rdd.dependencies) {
dep match {
// 若是是寬依賴
case shufDep: ShuffleDependency[_, _, _] =>
// 建立 ShuffleMapStage
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
// 加入 missing 集合
missing += mapStage
}
// 若是是窄依賴
case narrowDep: NarrowDependency[_] =>
// 加入等待 visit 的集合中,準備下一次遍歷
waitingForVisit.push(narrowDep.rdd)
}
}
}
}
}
// 將傳入的 stage 中的 rdd 拿出來壓入 waitingForVisit 的棧中
waitingForVisit.push(stage.rdd)
// 遍歷棧裏的全部 RDD
while (waitingForVisit.nonEmpty) {
// 調用 visit 方法
visit(waitingForVisit.pop())
}
// 返回 missing 這個 stage 集合
missing.toList
}
複製代碼
至此,全部的 stage 都已經劃分結束了。能夠看出每一個 Spark Application 執行代碼的時候,每當碰到一個 Action 操做就會劃分出一個 Job,而後每一個 Job 裏會根據寬窄依賴去劃分出多個 stage。