站在不一樣的角度看jobgit
transaction: Job是由一組RDD上轉換和動做組成。github
stage: Job是由ResultStage和多個ShuffleMapState組成web
init:由action操做觸發提交執行的一個函數 action操做會觸發調用sc.runJob方法,api
Job是一組rdd的轉換以及最後動做的操做集合,它是Spark裏面計算最大最虛的概念,甚至在spark的任務頁面中都沒法看到job這個單位。 可是無論怎麼樣,在spark用戶的角度,job是咱們計算目標的單位,每次在一個rdd上作一個動做操做時,都會觸發一個job,完成計算並返回咱們想要的數據。 Job是由一組RDD上轉換和動做組成,這組RDD之間的轉換關係表現爲一個有向無環圖(DAG),每一個RDD的生成依賴於前面1個或多個RDD。 在Spark中,兩個RDD之間的依賴關係是Spark的核心。站在RDD的角度,二者依賴表現爲點對點依賴, 可是在Spark中,RDD存在分區(partition)的概念,兩個RDD之間的轉換會被細化爲兩個RDD分區之間的轉換。 Stage的劃分是對一個Job裏面一系列RDD轉換和動做進行劃分。 首先job是因動做而產生,所以每一個job確定都有一個ResultStage,不然job就不會啓動。 其次,若是Job內部RDD之間存在寬依賴,Spark會針對它產生一箇中間Stage,即爲ShuffleStage,嚴格來講應該是ShuffleMapStage,這個stage是針對父RDD而產生的, 至關於在父RDD上作一個父rdd.map().collect()的操做。ShuffleMapStage生成的map輸入,對於子RDD,若是檢測到所本身所「寬依賴」的stage完成計算,就能夠啓動一個shuffleFectch, 從而將父RDD輸出的數據拉取過程,進行後續的計算。 所以一個Job由一個ResultStage和多個ShuffleMapStage組成。數組
https://github.com/ColZer/DigAndBuried/blob/master/spark/shuffle-study.md閉包
調用SparkContext裏面的函數重載,將分區數量,須要計算的分區下標等參數設置好 以rdd.count爲例:異步
rdd.count // 獲取分區數 sc.runJob(this, Utils.getIteratorSize _).sum // 設置須要計算的分區 runJob(rdd, func, 0 until rdd.partitions.length) // 設置須要在每一個partition上執行的函數 runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
定義一個接收計算結果的對象數組並將其返回 構造一個Array,並構造一個函數對象"(index, res) => results(index) = res"繼續傳遞給runJob函數,而後等待runJob函數運行結束,將results返回; 對這裏的解釋至關在runJob添加一個回調函數,將runJob的運行結果保存到Array到, 回調函數,index表示mapindex, res爲單個map的運行結果ide
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int]): Array[U] = { // 定義返回的結果集 val results = new Array[U](partitions.size) // 定義resulthandler runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res) // 返回計算結果 results }
將須要執行excutor的地址和回調函數等傳給DAG調度器,由DAG調度器進行具體的submitJob操做。函數
def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { // 獲取須要發送的excutor地址 val callSite = getCallSite // 閉包封裝,防止序列化錯誤 val cleanedFunc = clean(func) // 提交給dag調度器, dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) // docheckpoint rdd.doCheckpoint() }
注意:dagScheduler.runJob是堵塞的操做,即直到Spark完成Job的運行以前,rdd.doCheckpoint()是不會執行的 上異步的runJob回調用下面這個方法,裏面設置了JobWaiter,用來等待job執行完畢。oop
def runJob{ ... // job提交後會返回一個jobwaiter對象 val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties) ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf) waiter.completionFuture.value.get match { case scala.util.Success(_) => ... }
給JOB分配一個ID,並將其放入隊列,返回一個阻塞器,等待當前job執行完畢。將結果數據傳送給handler function
def submitJob{ // 生成JOB的ID val jobId = nextJobId.getAndIncrement() // 生成阻塞器 val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler) // post方法的實現:eventQueue.put(event),其實是將此job提交到了一個LinkedBlockingDeque eventProcessLoop.post(JobSubmitted( jobId, rdd, func2, partitions.toArray, callSite, waiter, SerializationUtils.clone(properties))) } waiter
在提交job時,咱們將job放到了一個LinkedBlockingDeque隊列,而後由EventLoop 負責接收處理請求,觸發job的提交,產生一個finalStage. EventLoop是在jobScheduler中啓動的時候在JobGenerator中啓動的 當從隊列中拉去job時,開建立ResultStage:
class EventLoop override def run(): Unit = { try { while (!stopped.get) { // 拉去job val event = eventQueue.take() try { // 觸發建立stage onReceive(event) ... } def doOnReceive{ case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) => dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) }
建立job:根據JobId,finalStage,excutor地址,job狀態監聽的JobListener,task的屬性properties等生成job,並把job放入Map中記錄。
// class DAGScheduler private[scheduler] def handleJobSubmitted() { // 以不一樣形式的hashMap存放job jobIdToStageIds = new HashMap[Int, HashSet[Int]] stageIdToStage = new HashMap[Int, Stage] jobIdToActiveJob = new HashMap[Int, ActiveJob] // 初始化finalStage var finalStage: ResultStage = createResultStage(finalRDD, func, partitions, jobId, callSite) // 初始化job val job = new ActiveJob(jobId, finalStage, callSite, listener, properties) clearCacheLocs() jobIdToActiveJob(jobId) = job activeJobs += job finalStage.setActiveJob(job) val stageIds = jobIdToStageIds(jobId).toArray val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo)) **listenerBus.post(** **SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))** // 提交finalStage,計算時會判斷其以前是否存在shuffleStage,若是存在會優先計算shuffleStage,最後再計算finalStage **submitStage(finalStage)** }
參見: MapOutputTrackerMaster stage的狀態分爲三類:計算失敗,計算完成和未計算完成,迭代的去計算完成父stage後,就能夠到下一步,將stage轉換到具體的task進行執行。
class DAGScheduler private[scheduler] def handleJobSubmitted() { var finalStage: ResultStage = createResultStage(finalRDD, func, partitions, jobId, callSite) ... submitStage(finalStage) } // 迭代的去判斷父stage是否所有計算完成 private def submitStage(stage: Stage) { if(jobId.isDefined){ val missing = getMissingParentStages(stage).sortBy(_.id) if (missing.isEmpty) { // 父stage已經計算完成,能夠開始當前計算 submitMissingTasks(stage, jobId.get) } else { // 父stage的map操做未完成,繼續進行迭代 for (parent <- missing) { submitStage(parent) } waitingStages += stage } } } // 獲取未計算完成的stage private def getMissingParentStages(stage: Stage): List[Stage] = { ... for (dep <- rdd.dependencies) { dep match { case shufDep: ShuffleDependency=> // 判斷當前stage是否計算完成 val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId) if (!mapStage.isAvailable) { missing += mapStage } case narrowDep: NarrowDependency[_] => waitingForVisit.push(narrowDep.rdd) } ... }
首先利於上面說到的Stage知識獲取所須要進行計算的task的分片;由於該Stage有些分片可能已經計算完成了;而後將Task運行依賴的RDD,Func,shuffleDep 進行序列化,經過broadcast發佈出去; 而後建立Task對象,提交給taskScheduler調度器進行運行
參見: 獲取task分片 對Stage進行遍歷全部須要運行的Task分片; 緣由:存在部分task失敗之類的狀況,或者task運行結果所在的BlockManager被刪除了,就須要針對特定分片進行從新計算;即所謂的恢復和重算機制;
class DAGScheduler{ def submitMissingTasks(stage, jobId){ val partitionsToCompute: Seq[Int] = stage.findMissingPartitions() val properties = jobIdToActiveJob(jobId).properties runningStages += stage }
對Stage的運行依賴進行序列化並broadcast給excutors(若是不序列化在數據傳輸過程當中可能出錯) 對ShuffleStage和FinalStage所序列化的內容有所不一樣:對於ShuffleStage序列化的是RDD和shuffleDep;而對FinalStage序列化的是RDD和Func 對於FinalStage咱們知道,每一個Task運行過程當中,須要知道RDD和運行的函數,好比咱們這裏討論的Count實現的Func;而對於ShuffleStage,ShuffleDependency記錄了父RDD,排序方式,聚合器等,reduce端須要獲取這些參數進行初始化和計算。
class DAGScheduler{ def submitMissingTasks(stage, jobId){ ... // consistent view of both variables. RDDCheckpointData.synchronized { taskBinaryBytes = stage match { case stage: ShuffleMapStage => JavaUtils.bufferToArray( closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef)) case stage: ResultStage => JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef)) } partitions = stage.rdd.partitions } taskBinary = sc.broadcast(taskBinaryBytes)
針對每一個須要計算的分片構造一個Task對象, 對於ResultTask就是在分片上調用咱們的Func,而ShuffleMapTask按照ShuffleDep進行 MapOut
class DAGScheduler{ def submitMissingTasks(stage, jobId){ ... val tasks: Seq[Task[_]] = try { val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array() stage match { // 一個stage會產生多個task任務 case stage: ShuffleMapStage => partitionsToCompute.map { id => new ShuffleMapTask() } case stage: ResultStage => partitionsToCompute.map { id => new ResultTask() } }
調用taskScheduler將task提交給Spark進行調度
class DAGScheduler{ def submitMissingTasks(stage, jobId){ ... if (tasks.size > 0) { // 將taskSet發送給 taskScheduler taskScheduler.submitTasks(new TaskSet( tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties)) stage.latestInfo.submissionTime = Some(clock.getTimeMillis()) } else { markStageAsFinished(stage, None) }
DAGScheduler接收到DAGSchedulerEvent後判斷其類型是TaskCompletion,不一樣的stage的實現方式不同,shuffle的實現更復雜一點
private def doOnReceive(event: DAGSchedulerEvent): Unit = event match { case completion: CompletionEvent => dagScheduler.handleTaskCompletion(completion) } private[scheduler] def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match { case rt: ResultTask[_, _] => // 調用jobWaiter的taskSucced通知結果 job.listener.taskSucceeded case smt: ShuffleMapTask => // 調用outputTracker mapOutputTracker.registerMapOutput }
當計算完畢後,JobWaiter同步調用resultHandler處理task返回的結果。
private[scheduler] def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match { case rt: ResultTask[_, _] => // 調用jobWaiter的taskSucced通知結果 job.listener.taskSucceeded( rt.outputId, event.result) case smt: ShuffleMapTask => } // jobWaiter是JobListner的子類 class JobWaiter extends JobListener{ override def taskSucceeded(index: Int, result: Any): Unit = { synchronized { resultHandler(index, result.asInstanceOf[T]) } if (finishedTasks.incrementAndGet() == totalTasks) { jobPromise.success(()) } } }
參見: MapStatus的註冊和獲取 將運行結果(mapStatus)傳送給outputTrancker
private[scheduler] def handleTaskCompletion(event: CompletionEvent) { event.reason match { case Success => task match { case rt: ResultTask[_, _] => case smt: ShuffleMapTask => // mapOutputTracker.registerMapOutput( shuffleStage.shuffleDep.shuffleId, smt.partitionId, status) }
job執行完畢後執行
private[scheduler] abstract class Stage( val id: Int, // stageId val rdd: RDD[_],// RDD that this stage runs on val numTasks: Int,// task數量 val parents: List[Stage],// 父stage val firstJobId: Int,//當前stage上JobId val callSite: CallSite// 生成RDD存放位置 ) extends Logging {
class ShuffleMapStage( val shuffleDep: ShuffleDependency[_, _, _], mapOutputTrackerMaster: MapOutputTrackerMaster) extends Stage(id, rdd, numTasks, parents, firstJobId, callSite){ // 判斷當前stage是否可用 def isAvailable: Boolean = numAvailableOutputs == numPartitions }
每一個運行在Executor上的Task, 經過SparkEnv獲取shuffleManager對象, 而後調用getWriter來當前MapID=partitionId的一組Writer. 而後將rdd的迭代器傳遞給writer.write函數, 由每一個Writer的實現去實現具體的write操做;
class ShuffleMapTask extends Task( def runTask(context: TaskContext): MapStatus = { // 反序列化接收到的數據 val (rdd, dep) = closureSerializer.deserialize( ByteBuffer.wrap(taskBinary.value)) var writer: ShuffleWriter[Any, Any] = null val manager = SparkEnv.get.shuffleManager // 調用ShuffleManager的getWriter方法獲取一組writer writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context) // 遍歷RDD進行write writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]]) writer.stop(success = true).get } }
上面代碼中,在調用rdd的iterator()方法時,會根據RDD實現類的compute方法指定的處理邏輯對數據進行處理,固然,若是該Partition對應的數據已經處理過並存儲在MemoryStore或DiskStore,直接經過BlockManager獲取到對應的Block數據,而無需每次須要時從新計算。而後,write()方法會將已經處理過的Partition數據輸出到磁盤文件。 在Spark Shuffle過程當中,每一個ShuffleMapTask會經過配置的ShuffleManager實現類對應的ShuffleManager對象(其實是在SparkEnv中建立),根據已經註冊的ShuffleHandle,獲取到對應的ShuffleWriter對象,而後經過ShuffleWriter對象將Partition數據寫入內存或文件。
參見: 過濾須要執行的分片 返回須要計算的partition信息
class ShuffleMapStage{ def findMissingPartitions(): Seq[Int] = { mapOutputTrackerMaster .findMissingPartitions(shuffleDep.shuffleId) .getOrElse(0 until numPartitions) } }
參見: reduce端獲取 ResultTask不須要進行寫操做。直接將計算結果返回。
class ResultTask extends Task { def runTask(context: TaskContext): U = { // 對RDD和函數進行反序列化 val (rdd, func) = ser.deserialize( ByteBuffer.wrap(taskBinary.value) // 調用函數進行計算 func(context, rdd.iterator(partition, context)) } } // RDD的iterator函數, class RDD{ def iterator(split: Partition, context: TaskContext): Iterator[T] = { if (storageLevel != StorageLevel.NONE) { getOrCompute(split, context) } else { computeOrReadCheckpoint(split, context) } } }
返回須要計算的partition信息,不須要通過tracker,在提交Job的時候會將其保存在ResultStage
class DAGScheduler{ def handleJobSubmitted(){ // 定義resultStage finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite) // 將job傳遞給resultStage finalStage.setActiveJob(job) } } class ResultStage{ // 過濾掉已經完成的 findMissingPartitions(): Seq[Int] = { val job = activeJob.get (0 until job.numPartitions).filter(id => !job.finished(id)) } }
sparkCore源碼解析系列: