Spark源碼分析之-scheduler模塊
這位寫的很是好, 讓我對Spark的源碼分析, 變的輕鬆了許多
這裏本身再梳理一遍node
先看一個簡單的spark操做,app
val sc = new SparkContext(……)
val textFile = sc.textFile("README.md") textFile.filter(line => line.contains("Spark")).count()
這是Spark的入口, 任何須要使用Spark的地方都須要先建立SparkContextoop
在SparkContext中, 最主要的初始化工做就是start TaskScheduler和DAGScheduler, 這兩個就是Spark的核心所在源碼分析
Spark的設計很是的乾淨, 把整個DAG抽象層從實際的task執行中剝離了出來
DAGScheduler, 負責解析spark命令, 生成stage, 造成DAG, 最終劃分紅tasks, 提交給TaskScheduler, 他只完成靜態分析
TaskScheduler, 專門負責task執行, 他只負責資源管理, task分配, 執行狀況的報告
這樣的好處, 就是Spark能夠經過提供不一樣的TaskScheduler簡單的支持各類資源調度和執行平臺, 如今Spark支持, local, standalone, mesos, Yarn...this
class SparkContext( val master: String, val appName: String, val sparkHome: String = null, val jars: Seq[String] = Nil, val environment: Map[String, String] = Map(), // This is used only by yarn for now, but should be relevant to other cluster types (mesos, etc) too. // This is typically generated from InputFormatInfo.computePreferredLocations .. host, set of data-local splits on host val preferredNodeLocationData: scala.collection.Map[String, scala.collection.Set[SplitInfo]] = scala.collection.immutable.Map()) extends Logging { // Create and start the scheduler private var taskScheduler: TaskScheduler = { //....... } taskScheduler.start() @volatile private var dagScheduler = new DAGScheduler(taskScheduler) dagScheduler.start() }
而後固然要載入被處理的數據, 最經常使用的textFile, 其實就是生成HadoopRDD, 做爲起始的RDDspa
/**
* Read a text file from HDFS, a local file system (available on all nodes), or any
* Hadoop-supported file system URI, and return it as an RDD of Strings.
*/
def textFile(path: String, minSplits: Int = defaultMinSplits): RDD[String] = {
hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], minSplits)
.map(pair => pair._2.toString)
}
/** Get an RDD for a Hadoop file with an arbitrary InputFormat */ def hadoopFile[K, V]( path: String, inputFormatClass: Class[_ <: InputFormat[K, V]], keyClass: Class[K], valueClass: Class[V], minSplits: Int = defaultMinSplits ) : RDD[(K, V)] = { val conf = new JobConf(hadoopConfiguration) FileInputFormat.setInputPaths(conf, path) new HadoopRDD(this, conf, inputFormatClass, keyClass, valueClass, minSplits) }
這裏調用的filter transform很簡單, 能夠參考前面的blog
關鍵調用count action, action的不一樣在於, 會調用runjob
因此在調用action以前, job都是沒有被真正執行的scala
def count(): Long = {// 只有在action中才會真正調用runJob, 因此transform都是lazy的 sc.runJob(this, (iter: Iterator[T]) => { // count調用的是簡化版的runJob, 只傳入rdd和func, 其餘的會用默認值補全 var result = 0L while (iter.hasNext) { result += 1L iter.next() } result }).sum }
關鍵在於調用了dagScheduler.runJob設計
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. The allowLocal * flag specifies whether the scheduler can run the computation on the driver(建立SparkContext的進程) rather than * shipping it out to the cluster, for short actions like first(). */ def runJob[T, U: ClassManifest]( rdd: RDD[T], //只須要傳入Final RDD, 前面的能夠根據dependency推出 func: (TaskContext, Iterator[T]) => U, //action的邏輯,好比count邏輯 partitions: Seq[Int], //partition的個數 allowLocal: Boolean, //對於一些簡單的action,是否容許在local執行 resultHandler: (Int, U) => Unit) { //會在JobWaiter的taskSucceeded中用於處理task result val callSite = Utils.formatSparkCallSite logInfo("Starting job: " + callSite) val start = System.nanoTime val result = dagScheduler.runJob(rdd, func, partitions, callSite, allowLocal, resultHandler, localProperties.get) logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s") rdd.doCheckpoint() result }