Spark2.x精通:Job觸發流程源碼深度剖析(一)

1、概述    java


    以前幾篇文章對Spark集羣的Master、Worker啓動流程進行了源碼剖析,後面直接從客戶端角度出發,講解了spark-submit任務提交過程及driver的啓動;集羣啓動、任務提交、SparkContext初始化等前期準備工做完成以後,後面就是咱們的主函數的代碼Job如何觸發的,本篇文章仍是結合源碼進行剖析。
sql

   

    軟件版本:
apache

        spark2.2.0
api


2、Job觸發流程源碼剖析數組


1. 咱們先上一段最簡單的代碼,讀取本地文件進行WordCount,並打印統計結果,代碼以下:
緩存

package com.hadoop.ljs.spark220.study;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaPairRDD;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.api.java.JavaSparkContext;import org.apache.spark.api.java.function.FlatMapFunction;import org.apache.spark.api.java.function.Function2;import org.apache.spark.api.java.function.PairFunction;import org.apache.spark.api.java.function.VoidFunction;import org.apache.spark.sql.SparkSession;import scala.Tuple2;import java.util.Arrays;import java.util.Iterator;/** * @author: Created By lujisen * @company ChinaUnicom Software JiNan * @date: 2020-03-12 08:26 * @version: v1.0 * @description: com.hadoop.ljs.spark220.study */public class Example1 {    public static void main(String[] args) throws Exception{        /*spark環境初始化*/        SparkConf sparkConf = new SparkConf().setMaster("local[*]").setAppName("Example1");        SparkSession sc = SparkSession.builder().config(sparkConf).getOrCreate();        JavaSparkContext jsc = new JavaSparkContext(sc.sparkContext());        /*讀取本地文件*/        JavaRDD<String> sourceRDD = jsc.textFile("D:\\kafkaSSL\\kafka_client_jaas.conf");        /*轉換多維爲一維數組*/        JavaRDD<String> words = sourceRDD.flatMap(new FlatMapFunction<String, String>() {            @Override            public Iterator<String> call(String s)  {                return Arrays.asList(s.split(" ")).iterator();            }        });        /*轉換成(hello,1)格式*/        JavaPairRDD<String, Integer> wordOne = words.mapToPair(new PairFunction<String, String, Integer>() {            @Override            public Tuple2<String, Integer> call(String s) {                return new Tuple2<String, Integer>(s, 1);            }        });        /*根據key進行聚合*/        JavaPairRDD<String, Integer> wordCount = wordOne.reduceByKey(new Function2<Integer, Integer, Integer>() {            @Override            public Integer call(Integer v1, Integer v2)  {                return v1+v2;            }        });        /*打印結果*/        wordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {            @Override            public void call(Tuple2<String, Integer> result){                System.out.println("word:  "+result._1+" count: "+result._2);            }        });
   }}

    咱們一行行的進行分析,首先看讀取本地文件textFile()函數:
ide

  /*這裏直接調用的SparkContext的textFile函數*/  def textFile(path: String): JavaRDD[String] = sc.textFile(path)


2. 直接看sc.textFile()函數:函數

  def textFile(      path: String,      minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {    assertNotStopped()    /*這裏調用了hadoopFile函數,傳入三個,寫過Mapreuce的時候都知道 第二個參數就是Map的輸入格式化類型,參數3是行號 4是一行的內容*/    /*hadoopFile()函數,返回了一個HadoopRDD*/    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],    minPartitions).map(pair => pair._2.toString).setName(path)  }

看hadoopFile()函數
oop

 def hadoopFile[K, V](      path: String,      inputFormatClass: Class[_ <: InputFormat[K, V]],      keyClass: Class[K],      valueClass: Class[V],      minPartitions: Int = defaultMinPartitions): RDD[(K, V)] = withScope {    assertNotStopped()    // This is a hack to enforce loading hdfs-site.xml.    // See SPARK-11227 for details.    FileSystem.getLocal(hadoopConfiguration)    //這裏把hadoopConfiguration配置作了一個廣播變量    val confBroadcast = broadcast(new SerializableConfiguration(hadoopConfiguration))    /* 傳入一個jobConf對輸入數據進行格式化*/    val setInputPathsFunc = (jobConf: JobConf) => FileInputFormat.setInputPaths(jobConf, path)    /* 返回一個HadoopRDD實例,這裏Hadoop配置文件是以廣播變量的方式傳進去的*/    /*廣播變量 每一個Worker保存一份,被多個Executor共享*/    /*HadoopRDD繼承自RDD*/    new HadoopRDD(      this,      confBroadcast,      Some(setInputPathsFunc),      inputFormatClass,      keyClass,      valueClass,      minPartitions).setName(path)  }

    上面直接對HadopRDD作了一個map轉換,這裏Hadoop繼承自RDD,調用的是RDD裏面的map()函數,咱們直接看看map函數代碼:
post

  /* 最後實際上是返回了一個MapPartitionsRDD,裏面是(key,value),key是行號,value是內容*/  def map[U: ClassTag](f: T => U): RDD[U] = withScope {    val cleanF = sc.clean(f)    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))  }

 上面對返回的RDD是一個<key,value>鍵值對,而後.map(pair => pair._2.toString對其進行了轉換,其實就是去掉了那個key行號,剩下的是一個vlaue數組,裏面是每行的內容,至此textFile這一行剖析完畢。

   

3.主函數的第30-42行都是對RDD進行了一系列的轉換,其實都是調用RDD.scala中的內容對MapPartitionsRDD進行的轉換,有興趣你能夠跟進去看一下,比較簡單:

def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {    val cleanF = sc.clean(f)    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))  }
  /* mapToPair函數裏面實際上是調用的rdd.map函數,剛纔上面已經說過了*/  def mapToPair[K2, V2](f: PairFunction[T, K2, V2]): JavaPairRDD[K2, V2] = {    def cm: ClassTag[(K2, V2)] = implicitly[ClassTag[(K2, V2)]]    new JavaPairRDD(rdd.map[(K2, V2)](f)(cm))(fakeClassTag[K2], fakeClassTag[V2])  }


4.最後調用reduceBykey進行了聚合,這裏就比較重要了,咱們以前講過一個spark任務裏面會有多個job,job的劃分依據是action,有幾個action就有幾個job,而每一個job的劃分依據是shuffle,只要發生了shuffle就會有新的stage生成,reduceBykey是個action操做,RDD中沒有這個函數,是經過裏面的隱式轉換調用了PairRDDFunctions.scala中的reduceBykey()函數,裏面的轉換先不用管,由於涉及到shuffle操做,會有新的stage的生成,這裏先略過:

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)  }


5. 最後主函數調用了wordCount.foreach()進行告終果打印,這是一個action操做,有幾個action就會提交幾個job,直接去看代碼:

  def foreach(f: T => Unit): Unit = withScope {    val cleanF = sc.clean(f)    /*這裏是執行了runJob,跟其餘操做不同,這裏會提交一個job*/    sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))  }

    跟進代碼,裏面調用了SparkContext.scala中的函數:

  def runJob[T, U: ClassTag](      rdd: RDD[T],      func: Iterator[T] => U,      partitions: Seq[Int]): Array[U] = {      //這裏clean函數其實直接輸出    val cleanedFunc = clean(func)    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)  }

    跟進了好幾層,最後看runJob幹了啥:

def runJob[T, U: ClassTag](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      resultHandler: (Int, U) => Unit): Unit = {    if (stopped.get()) {      throw new IllegalStateException("SparkContext has been shutdown")    }    val callSite = getCallSite    val cleanedFunc = clean(func)    logInfo("Starting job: " + callSite.shortForm)    if (conf.getBoolean("spark.logLineage", false)) {      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)    }    //SparkContext初始化的dagScheduler調用runJob函數比較任務,這樣就跟以前SparkContext源碼剖析內容聯繫在一塊兒了    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)    progressBar.foreach(_.finishAll())    rdd.doCheckpoint()  }


6.上面調用了DAGScheduler中的runJob函數,這個DAGScheduler是咱們在SparkContext初始化的時候執行的初始化,DAGSCheduler主要工做:建立Job,推斷出每個Job的stage劃分(DAG),跟蹤RDD,實體化stage的輸出,調度job,將stage以taskSet的形式提交給TaskScheduler的實現類,在集羣上運運行,其中,TaskSet是一組能夠當即運行的獨立task,基於集羣上已存在的數據,直接看下代碼:


def runJob[T, U](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      resultHandler: (Int, U) => Unit,      properties: Properties): Unit = {    val start = System.nanoTime    /* 這裏就一行比較重要,這裏調用submitJob進行提交 */    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)    // 下面這些就是任務結果的一些判斷了     waiter.completionFuture.value.get match {      case scala.util.Success(_) =>        logInfo("Job %d finished: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))      case scala.util.Failure(exception) =>        logInfo("Job %d failed: %s, took %f s".format          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.        val callerStackTrace = Thread.currentThread().getStackTrace.tail        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)        throw exception    }  }

    下面就是調用了submitJob進行任務的提交,代碼以下:

def submitJob[T, U](      rdd: RDD[T],      func: (TaskContext, Iterator[T]) => U,      partitions: Seq[Int],      callSite: CallSite,      resultHandler: (Int, U) => Unit,      properties: Properties): JobWaiter[U] = {    // 這裏確認咱們提交的Partition存在    val maxPartitions = rdd.partitions.length    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>      throw new IllegalArgumentException(        "Attempting to access a non-existent partition: " + p + ". " +          "Total number of partitions: " + maxPartitions)    }
   val jobId = nextJobId.getAndIncrement()    if (partitions.size == 0) {      // Return immediately if the job is running 0 tasks      return new JobWaiter[U](this, jobId, 0, resultHandler)    }
   assert(partitions.size > 0)    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)    //這裏會觸發DAGSchedulerEventProcessLoop的JobSubmitted,他裏面onReceive()函數    //接收消息進行處理,這裏調用的是JobSubmitted,觸發dagScheduler.handleJobSubmitted    //函數進行處理    eventProcessLoop.post(JobSubmitted(      jobId, rdd, func2, partitions.toArray, callSite, waiter,      SerializationUtils.clone(properties)))    waiter  }


下面就是調用handleJobSubmitted()函數進行處理,它是DAGSchduler的job調度核心入口,代碼以下:

 private[scheduler] def handleJobSubmitted(jobId: Int,      finalRDD: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      callSite: CallSite,      listener: JobListener,      properties: Properties) {    //     var finalStage: ResultStage = null    try {      //使用觸發job的最後一個rdd,建立stage      //當hdfs上的文件被刪除的時候  stage可能建立失敗      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)    } catch {      case e: Exception =>        logWarning("Creating new stage failed due to exception - job: " + jobId, e)        listener.jobFailed(e)        return    }    //經過finalStage創建立一個job,    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)    clearCacheLocs()    logInfo("Got job %s (%s) with %d output partitions".format(      job.jobId, callSite.shortForm, partitions.length))    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")    logInfo("Parents of final stage: " + finalStage.parents)    logInfo("Missing parents: " + getMissingParentStages(finalStage))
   val jobSubmissionTime = clock.getTimeMillis()    //將job加入到activeJob緩存中    jobIdToActiveJob(jobId) = job    activeJobs += job    finalStage.setActiveJob(job)    val stageIds = jobIdToStageIds(jobId).toArray    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))    listenerBus.post(      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))    //提交finalStage,可是finalStage確定不會首先執行,它要先執行它的依賴stage    submitStage(finalStage)  }


7.最後調用了submitStage進行了finalStage的提交,finalStage確定不會首先執行,它要先執行它的依賴stage,這裏面就涉及到了stage的換分了,代碼以下:

/** Submits stage, but first recursively submits any missing parents. */  private def submitStage(stage: Stage) {    val jobId = activeJobForStage(stage)    if (jobId.isDefined) {      logDebug("submitStage(" + stage + ")")      if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {        //獲取stage對應的父stage,返回List[Stage]按id排序        val missing = getMissingParentStages(stage).sortBy(_.id)        logDebug("missing: " + missing)        // 若是父stage爲空,則調用submitMissingTasks 提交stage,        if (missing.isEmpty) {          logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")          submitMissingTasks(stage, jobId.get)        } else {          for (parent <- missing) {           // 若是父stage不爲空,則調用submitStage 提交父stage            submitStage(parent)          }          //並將stage放入等待的隊列中,先去執行父stage          waitingStages += stage        }      }    } else {      abortStage(stage, "No active job for stage " + stage.id, None)    }  }

   咱們看下getMissingParentStages()函數,如何進行stage劃分的,代碼以下:

 //大致劃分流程:遍歷rdd的全部的依賴,若是是ShufDep,則經過getShuffleMapStage獲取stage, // 並加入到missing隊列中。若是是窄依賴的話,將放入waitingForVisit的棧中。 private def getMissingParentStages(stage: Stage): List[Stage] = {    val missing = new HashSet[Stage]    val visited = new HashSet[RDD[_]]    // We are manually maintaining a stack here to prevent StackOverflowError    // caused by recursively visiting    val waitingForVisit = new Stack[RDD[_]]    def visit(rdd: RDD[_]) {      if (!visited(rdd)) {        visited += rdd        val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)        if (rddHasUncachedPartitions) {          for (dep <- rdd.dependencies) {            dep match {            //若是shufDep也就是咱們說的寬依賴              case shufDep: ShuffleDependency[_, _, _] =>              //寬依賴,則建立一個shuffleStage,即finalStage以前的stage是shuffle stage                val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)                if (!mapStage.isAvailable) {                 //加入到missing隊列,返回                  missing += mapStage                }                //若是narrowDep也就是咱們說的窄依賴              case narrowDep: NarrowDependency[_] =>              //加入等待隊列中                waitingForVisit.push(narrowDep.rdd)            }          }        }      }    }    waitingForVisit.push(stage.rdd)    while (waitingForVisit.nonEmpty) {     // 若是是窄依賴,將rdd放入棧中      visit(waitingForVisit.pop())    }    missing.toList  }

    submitStage()函數中若是父stage爲空則,調用submitMissingTasks()函數進行提交,這個函數主要作了一下幾件事:

    

    a.首先獲取stage中沒有計算的partition;

    b.經過 taskIdToLocations(id) 方法進行tasks運行最佳位置的肯定;

    c.調用taskScheduler的submitTasks進行任務的提交

相關文章
相關標籤/搜索