Spark源碼分析 – SparkContext

Spark源碼分析之-scheduler模塊
這位寫的很是好, 讓我對Spark的源碼分析, 變的輕鬆了許多
這裏本身再梳理一遍node

先看一個簡單的spark操做,app

val sc = new SparkContext(……)
val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()

 

1. SparkContext

這是Spark的入口, 任何須要使用Spark的地方都須要先建立SparkContextoop

在SparkContext中, 最主要的初始化工做就是start TaskScheduler和DAGScheduler, 這兩個就是Spark的核心所在源碼分析

Spark的設計很是的乾淨, 把整個DAG抽象層從實際的task執行中剝離了出來
DAGScheduler, 負責解析spark命令, 生成stage, 造成DAG, 最終劃分紅tasks, 提交給TaskScheduler, 他只完成靜態分析
TaskScheduler, 專門負責task執行, 他只負責資源管理, task分配, 執行狀況的報告
這樣的好處, 就是Spark能夠經過提供不一樣的TaskScheduler簡單的支持各類資源調度和執行平臺, 如今Spark支持, local, standalone, mesos, Yarn...this

class SparkContext(
    val master: String,
    val appName: String,
    val sparkHome: String = null,
    val jars: Seq[String] = Nil,
    val environment: Map[String, String] = Map(),
    // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too.
    // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host
    val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map())
  extends Logging {

  // Create and start the scheduler
  private var taskScheduler: TaskScheduler = {
  //.......
  }
  taskScheduler.start()

  @volatile private var dagScheduler = new DAGScheduler(taskScheduler)
  dagScheduler.start()
}

 

2. sc.textFile

而後固然要載入被處理的數據, 最經常使用的textFile, 其實就是生成HadoopRDD, 做爲起始的RDDspa

  /**
   * Read a text file from HDFS, a local file system (available on all nodes), or any
   * Hadoop-supported file system URI, and return it as an RDD of Strings.
   */
  def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
    hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
      .map(pair => pair._2.toString)
  }
  /** Get an RDD for a Hadoop file with an arbitrary InputFormat */
  def hadoopFile[K, V](
      path: String,
      inputFormatClass: Class[_ <: InputFormat[K, V]],
      keyClass: Class[K],
      valueClass: Class[V],
      minSplits: Int = defaultMinSplits
      ) : RDD[(K, V)] = {
    val conf = new JobConf(hadoopConfiguration)
    FileInputFormat.setInputPaths(conf, path)
    new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits)
  }

 

3. Transform and Action

這裏調用的filter transform很簡單, 能夠參考前面的blog
關鍵調用count action, action的不一樣在於, 會調用runjob
因此在調用action以前, job都是沒有被真正執行的scala

  def count(): Long = {// 只有在action中才會真正調用runJob, 因此transform都是lazy的
    sc.runJob(this, (iter: Iterator[T]) => { // count調用的是簡化版的runJob, 只傳入rdd和func, 其餘的會用默認值補全
      var result = 0L
      while (iter.hasNext) {
        result += 1L
        iter.next()
      }
      result
    }).sum
  }

 

4. sc.runJob

關鍵在於調用了dagScheduler.runJob設計

  /**
   * Run a function on a given set of partitions in an RDD and pass the results to the given
   * handler function. This is the main entry point for all actions in Spark. The allowLocal
   * flag specifies whether the scheduler can run the computation on the driver(建立SparkContext的進程) rather than
   * shipping it out to the cluster, for short actions like first().
   */
  def runJob[T, U: ClassManifest](
      rdd: RDD[T], //只須要傳入Final RDD, 前面的能夠根據dependency推出
      func: (TaskContext, Iterator[T]) => U, //action的邏輯,好比count邏輯
      partitions: Seq[Int],  //partition的個數
      allowLocal: Boolean, //對於一些簡單的action,是否容許在local執行
      resultHandler: (Int, U) => Unit) { //會在JobWaiter的taskSucceeded中用於處理task result
    val callSite = Utils.formatSparkCallSite
    logInfo("Starting job: " + callSite)
    val start = System.nanoTime
    val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler,
      localProperties.get)
    logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
    rdd.doCheckpoint()
    result
  }
相關文章
相關標籤/搜索