sparkCore源碼解析之Job

job

1. 概念

站在不一樣的角度看jobgit

  1. transaction: Job是由一組RDD上轉換和動做組成。github

  2. stage: Job是由ResultStage和多個ShuffleMapState組成web

  3. init:由action操做觸發提交執行的一個函數 action操做會觸發調用sc.runJob方法,api

Job是一組rdd的轉換以及最後動做的操做集合,它是Spark裏面計算最大最虛的概念,甚至在spark的任務頁面中都沒法看到job這個單位。 可是無論怎麼樣,在spark用戶的角度,job是咱們計算目標的單位,每次在一個rdd上作一個動做操做時,都會觸發一個job,完成計算並返回咱們想要的數據。 Job是由一組RDD上轉換和動做組成,這組RDD之間的轉換關係表現爲一個有向無環圖(DAG),每一個RDD的生成依賴於前面1個或多個RDD。 在Spark中,兩個RDD之間的依賴關係是Spark的核心。站在RDD的角度,二者依賴表現爲點對點依賴, 可是在Spark中,RDD存在分區(partition)的概念,兩個RDD之間的轉換會被細化爲兩個RDD分區之間的轉換。 Stage的劃分是對一個Job裏面一系列RDD轉換和動做進行劃分。 首先job是因動做而產生,所以每一個job確定都有一個ResultStage,不然job就不會啓動。 其次,若是Job內部RDD之間存在寬依賴,Spark會針對它產生一箇中間Stage,即爲ShuffleStage,嚴格來講應該是ShuffleMapStage,這個stage是針對父RDD而產生的, 至關於在父RDD上作一個父rdd.map().collect()的操做。ShuffleMapStage生成的map輸入,對於子RDD,若是檢測到所本身所「寬依賴」的stage完成計算,就能夠啓動一個shuffleFectch, 從而將父RDD輸出的數據拉取過程,進行後續的計算。 所以一個Job由一個ResultStage和多個ShuffleMapStage組成數組

2. job處理流程

img https://github.com/ColZer/DigAndBuried/blob/master/spark/shuffle-study.md閉包

2.1. job生成過程

img

2.1.1. job重載函數

調用SparkContext裏面的函數重載,將分區數量,須要計算的分區下標等參數設置好 以rdd.count爲例:異步

rdd.count
// 獲取分區數
sc.runJob(this, Utils.getIteratorSize _).sum
// 設置須要計算的分區
runJob(rdd, func, 0 until rdd.partitions.length)
// 設置須要在每一個partition上執行的函數
runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)

2.1.2. 設置回調函數

定義一個接收計算結果的對象數組並將其返回 構造一個Array,並構造一個函數對象"(index, res) => results(index) = res"繼續傳遞給runJob函數,而後等待runJob函數運行結束,將results返回; 對這裏的解釋至關在runJob添加一個回調函數,將runJob的運行結果保存到Array到, 回調函數,index表示mapindex, res爲單個map的運行結果ide

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int]): Array[U] = {
	// 定義返回的結果集
    val results = new Array[U](partitions.size)
	// 定義resulthandler
    runJob[T, U](rdd, func, partitions, (index, res) => results(index) = res)
	// 返回計算結果
    results
  }

2.1.3. 獲取須要執行的excutor

將須要執行excutor的地址和回調函數等傳給DAG調度器,由DAG調度器進行具體的submitJob操做。函數

def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
	// 獲取須要發送的excutor地址
    val callSite = getCallSite
	// 閉包封裝,防止序列化錯誤
    val cleanedFunc = clean(func)
	// 提交給dag調度器,
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    // docheckpoint
    rdd.doCheckpoint()
  }

注意:dagScheduler.runJob是堵塞的操做,即直到Spark完成Job的運行以前,rdd.doCheckpoint()是不會執行的 上異步的runJob回調用下面這個方法,裏面設置了JobWaiter,用來等待job執行完畢。oop

def runJob{
...
// job提交後會返回一個jobwaiter對象
val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
 waiter.completionFuture.value.get match {
​      case scala.util.Success(_) =>
​		...
}

2.1.4. 將Job放入隊列

給JOB分配一個ID,並將其放入隊列,返回一個阻塞器,等待當前job執行完畢。將結果數據傳送給handler function

def submitJob{
// 生成JOB的ID
val jobId = nextJobId.getAndIncrement()
// 生成阻塞器
val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
// post方法的實現:eventQueue.put(event),其實是將此job提交到了一個LinkedBlockingDeque
eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
}
waiter

2.2. job監聽

img

2.2.1. 監聽觸發

在提交job時,咱們將job放到了一個LinkedBlockingDeque隊列,而後由EventLoop 負責接收處理請求,觸發job的提交,產生一個finalStage. EventLoop是在jobScheduler中啓動的時候在JobGenerator中啓動的 當從隊列中拉去job時,開建立ResultStage:

class EventLoop
override def run(): Unit = {
​      try {
​        while (!stopped.get) {
​		 // 拉去job
​          val event = eventQueue.take()
​          try {
​			// 觸發建立stage
​            onReceive(event)
​	...
}
def doOnReceive{
​	case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
​      dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
}

2.2.2. 初始化job和stage

建立job:根據JobId,finalStage,excutor地址,job狀態監聽的JobListener,task的屬性properties等生成job,並把job放入Map中記錄。

// class DAGScheduler
private[scheduler] def handleJobSubmitted() {
// 以不一樣形式的hashMap存放job
 jobIdToStageIds = new HashMap[Int, HashSet[Int]]
stageIdToStage = new HashMap[Int, Stage]  
jobIdToActiveJob = new HashMap[Int, ActiveJob] 
// 初始化finalStage
var finalStage: ResultStage = 	createResultStage(finalRDD, func, partitions, jobId, callSite)
   // 初始化job
​    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
​    clearCacheLocs()
​    jobIdToActiveJob(jobId) = job
​    activeJobs += job
​    finalStage.setActiveJob(job)
​    val stageIds = jobIdToStageIds(jobId).toArray
​    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
​    **listenerBus.post(**
​      **SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))**
   // 提交finalStage,計算時會判斷其以前是否存在shuffleStage,若是存在會優先計算shuffleStage,最後再計算finalStage
​    **submitStage(finalStage)**
  }

2.2.3. 提交stage

參見: MapOutputTrackerMaster stage的狀態分爲三類:計算失敗,計算完成和未計算完成,迭代的去計算完成父stage後,就能夠到下一步,將stage轉換到具體的task進行執行。

class DAGScheduler
private[scheduler] def handleJobSubmitted() {
 var finalStage: ResultStage = 	createResultStage(finalRDD, func, partitions, jobId, callSite)
​    ...
​    submitStage(finalStage)
  }
// 迭代的去判斷父stage是否所有計算完成
private def submitStage(stage: Stage) {
if(jobId.isDefined){
val missing = getMissingParentStages(stage).sortBy(_.id)
if (missing.isEmpty) {
   // 父stage已經計算完成,能夠開始當前計算
​	submitMissingTasks(stage, jobId.get)
​    } else {
​	//  父stage的map操做未完成,繼續進行迭代
​          for (parent <- missing) {
​            submitStage(parent)
​          }
​          waitingStages += stage
​     }
}
}
// 獲取未計算完成的stage
private def getMissingParentStages(stage: Stage): List[Stage] = {
...
for (dep <- rdd.dependencies) {
   dep match {
​      case shufDep: ShuffleDependency=>
// 判斷當前stage是否計算完成
​	 val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
​         if (!mapStage.isAvailable) {
​                  missing += mapStage
​          }           
​       case narrowDep: NarrowDependency[_] =>
​             waitingForVisit.push(narrowDep.rdd)
​      }
...
}

2.3. stage轉task

img 首先利於上面說到的Stage知識獲取所須要進行計算的task的分片;由於該Stage有些分片可能已經計算完成了;而後將Task運行依賴的RDD,Func,shuffleDep 進行序列化,經過broadcast發佈出去; 而後建立Task對象,提交給taskScheduler調度器進行運行

2.3.1. 過濾須要執行的分片

參見: 獲取task分片 對Stage進行遍歷全部須要運行的Task分片; 緣由:存在部分task失敗之類的狀況,或者task運行結果所在的BlockManager被刪除了,就須要針對特定分片進行從新計算;即所謂的恢復和重算機制;

class DAGScheduler{
def submitMissingTasks(stage, jobId){
​	val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
​	val properties = jobIdToActiveJob(jobId).properties
​	runningStages += stage
}

2.3.2. 序列化和廣播

對Stage的運行依賴進行序列化並broadcast給excutors(若是不序列化在數據傳輸過程當中可能出錯) 對ShuffleStage和FinalStage所序列化的內容有所不一樣:對於ShuffleStage序列化的是RDD和shuffleDep;而對FinalStage序列化的是RDD和Func 對於FinalStage咱們知道,每一個Task運行過程當中,須要知道RDD和運行的函數,好比咱們這裏討論的Count實現的Func;而對於ShuffleStage,ShuffleDependency記錄了父RDD,排序方式,聚合器等,reduce端須要獲取這些參數進行初始化和計算。

class DAGScheduler{
def submitMissingTasks(stage, jobId){
​	...
​      // consistent view of both variables.
RDDCheckpointData.synchronized {
​        taskBinaryBytes = stage match {
​          case stage: ShuffleMapStage =>
​            JavaUtils.bufferToArray(
​              closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef))
​          case stage: ResultStage =>           JavaUtils.bufferToArray(closureSerializer.serialize((stage.rdd, stage.func): AnyRef))
​        }
​        partitions = stage.rdd.partitions
​      }
​      taskBinary = sc.broadcast(taskBinaryBytes)

2.3.3. 構造task對象

img 針對每一個須要計算的分片構造一個Task對象, 對於ResultTask就是在分片上調用咱們的Func,而ShuffleMapTask按照ShuffleDep進行 MapOut

class DAGScheduler{
def submitMissingTasks(stage, jobId){
​	...
 val tasks: Seq[Task[_]] = try {
​      val serializedTaskMetrics = closureSerializer.serialize(stage.latestInfo.taskMetrics).array()
​      stage match {
​		// 一個stage會產生多個task任務
​        case stage: ShuffleMapStage =>
​          partitionsToCompute.map { id =>
​            new ShuffleMapTask()
​          }
​        case stage: ResultStage =>
​          partitionsToCompute.map { id =>
​            new ResultTask()
​          }
​      }

2.3.3.1. ShuffleMapTask

2.3.3.2. ResultTask

2.3.4. taskScheduler調度task

調用taskScheduler將task提交給Spark進行調度

class DAGScheduler{
def submitMissingTasks(stage, jobId){
	...
if (tasks.size > 0) {
   // 將taskSet發送給 taskScheduler
	taskScheduler.submitTasks(new TaskSet(
        tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
      stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
    } else {
     markStageAsFinished(stage, None)
    }

2.4. 獲取運行結果

img DAGScheduler接收到DAGSchedulerEvent後判斷其類型是TaskCompletion,不一樣的stage的實現方式不同,shuffle的實現更復雜一點

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
  case completion: CompletionEvent =>
dagScheduler.handleTaskCompletion(completion)
}
private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
event.reason match {
  case Success =>
​      task match {
​      case rt: ResultTask[_, _] =>
​			// 調用jobWaiter的taskSucced通知結果
​			job.listener.taskSucceeded
​	 case smt: ShuffleMapTask =>
​			// 調用outputTracker
​		mapOutputTracker.registerMapOutput
}

2.4.1. ResultStage

當計算完畢後,JobWaiter同步調用resultHandler處理task返回的結果。

private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
event.reason match {
  case Success =>
      task match {
      case rt: ResultTask[_, _] =>
			// 調用jobWaiter的taskSucced通知結果
			job.listener.taskSucceeded(
			rt.outputId, event.result)
	 case smt: ShuffleMapTask =>
}
// jobWaiter是JobListner的子類
class JobWaiter extends JobListener{
  override def taskSucceeded(index: Int, result: Any): Unit = {
    synchronized {
      resultHandler(index, result.asInstanceOf[T])
    }
    if (finishedTasks.incrementAndGet() == totalTasks) {
      jobPromise.success(())
    }
  }
}

2.4.2. ShuffleMapStage

參見: MapStatus的註冊和獲取 將運行結果(mapStatus)傳送給outputTrancker

private[scheduler] def handleTaskCompletion(event: CompletionEvent) {
event.reason match {
  case Success =>
​      task match {
​      case rt: ResultTask[_, _] =>
​	 case smt: ShuffleMapTask =>
​			// 
​		mapOutputTracker.registerMapOutput(
​                shuffleStage.shuffleDep.shuffleId, 				smt.partitionId, status)
}

2.5. doCheckPoint

job執行完畢後執行

3. stage

img

  1. 一組含有相同計算函數的任務集合,這些任務組合成了一個完整的job
  2. stage分爲兩種:FinalStage和shuffleStage
  3. stage中包含了jobId,對於FIFO規則,jobId越小的優先級越高
  4. 爲了保證容錯性,一個stage能夠被重複執行,因此在web UI上有可能看見多個stage的信息,取最新更新時間的便可
  5. 組成:
private[scheduler] abstract class Stage(
    val id: Int, // stageId
    val rdd: RDD[_],// RDD that this stage runs on
    val numTasks: Int,// task數量
    val parents: List[Stage],// 父stage
    val firstJobId: Int,//當前stage上JobId
    val callSite: CallSite// 生成RDD存放位置
)  extends Logging {

3.1. ShuffleMapStage

class ShuffleMapStage(
    val shuffleDep: ShuffleDependency[_, _, _],
    mapOutputTrackerMaster: MapOutputTrackerMaster)
  extends Stage(id, rdd, numTasks, parents, firstJobId, callSite){
// 判斷當前stage是否可用
  def isAvailable: Boolean = numAvailableOutputs == numPartitions
}

3.1.1. ShuffleMapTask

每一個運行在Executor上的Task, 經過SparkEnv獲取shuffleManager對象, 而後調用getWriter來當前MapID=partitionId的一組Writer. 而後將rdd的迭代器傳遞給writer.write函數, 由每一個Writer的實現去實現具體的write操做;

class ShuffleMapTask extends Task(
def runTask(context: TaskContext): MapStatus = {
​	// 反序列化接收到的數據
​    val (rdd, dep) = closureSerializer.deserialize(
​      ByteBuffer.wrap(taskBinary.value))
var writer: ShuffleWriter[Any, Any] = null
val manager = SparkEnv.get.shuffleManager
// 調用ShuffleManager的getWriter方法獲取一組writer
 writer = manager.getWriter[Any, Any](dep.shuffleHandle, partitionId, context)
// 遍歷RDD進行write
 writer.write(rdd.iterator(partition, context).asInstanceOf[Iterator[_ <: Product2[Any, Any]]])
writer.stop(success = true).get
}
}

上面代碼中,在調用rdd的iterator()方法時,會根據RDD實現類的compute方法指定的處理邏輯對數據進行處理,固然,若是該Partition對應的數據已經處理過並存儲在MemoryStore或DiskStore,直接經過BlockManager獲取到對應的Block數據,而無需每次須要時從新計算。而後,write()方法會將已經處理過的Partition數據輸出到磁盤文件。 在Spark Shuffle過程當中,每一個ShuffleMapTask會經過配置的ShuffleManager實現類對應的ShuffleManager對象(其實是在SparkEnv中建立),根據已經註冊的ShuffleHandle,獲取到對應的ShuffleWriter對象,而後經過ShuffleWriter對象將Partition數據寫入內存或文件。

3.1.2. 獲取task分片

參見: 過濾須要執行的分片 返回須要計算的partition信息

class ShuffleMapStage{
def findMissingPartitions(): Seq[Int] = {
​    mapOutputTrackerMaster
​      .findMissingPartitions(shuffleDep.shuffleId)
​      .getOrElse(0 until numPartitions)
  }
}

3.2. ResultStage

3.2.1. ResultTask

參見: reduce端獲取 ResultTask不須要進行寫操做。直接將計算結果返回。

class ResultTask extends Task {
def runTask(context: TaskContext): U = {
	// 對RDD和函數進行反序列化
	val (rdd, func) = ser.deserialize(
      ByteBuffer.wrap(taskBinary.value)
	// 調用函數進行計算
	func(context, rdd.iterator(partition, context))
}
}
// RDD的iterator函數,
class RDD{
def iterator(split: Partition, context: TaskContext): Iterator[T] = {
    if (storageLevel != StorageLevel.NONE) {
      getOrCompute(split, context)
    } else {
      computeOrReadCheckpoint(split, context)
    }
  }
}

3.2.2. 獲取task分片

返回須要計算的partition信息,不須要通過tracker,在提交Job的時候會將其保存在ResultStage

class DAGScheduler{
def handleJobSubmitted(){
// 定義resultStage
finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
// 將job傳遞給resultStage
finalStage.setActiveJob(job)
}
}
class ResultStage{
// 過濾掉已經完成的
findMissingPartitions(): Seq[Int] = {
    val job = activeJob.get
    (0 until job.numPartitions).filter(id => !job.finished(id))
  }
}

sparkCore源碼解析系列:

  1. sparkCore源碼解析之block
  2. sparkCore源碼解析之partition
  3. sparkCore源碼解析之Job
  4. sparkCore源碼解析之shuffle
  5. sparkCore源碼解析之完整腦圖地址
相關文章
相關標籤/搜索