Spark 源碼分析(三): SparkContext 初始化之 TaskScheduler 建立與啓動

前面已經分析到了 driver 進程成功在某臺 worker 上啓動了,下面就開始執行咱們寫的那些代碼了。以一個 wordcount 程序爲例,代碼以下:java

val conf = new SparkConf()
      .setAppName("WordCount")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val lines = sc.textFile("./file/localfile")
    val words = lines.flatMap(line => line.split(" "))
    val wordPairs = words.map(word => (word, 1))
    val wordCounts = wordPairs.reduceByKey(_ + _)
    wordCounts.foreach(wordCount => println(wordCount._1 + " " + wordCount._2))
複製代碼

首先會去初始化咱們的 SparkContext 對象,在初始化 SparkContext 對象前會先建立一個 SparkConf 對象用來配置各類參數。SparkContext 對象的初始化代碼在 org.apache.spark.SparkContext 的 374-594 行(spark 2.1.1 的源碼中),這塊代碼大概作了這些事情:算法

1,建立 Spark 的執行環境 SparkEnv;apache

2,建立並初始化 SparkUI;後端

3,Hadoop 相關配置和 Executor 環境變量的設置;緩存

4,建立心跳接收器,用來和 Executor 作通訊;ide

5,建立和啓動 TaskScheduler;函數

6,建立和啓動 DAGScheduler;oop

7,初始化 BlockManager;ui

8,啓動測量系統 MetricsSystem;this

9,建立和啓動 ExecutorAllocationManager;

10,建立和啓動 ContextCleaner;

11,Spark Environment Update;

12,向系統的測量系統註冊 DAGSchedulerSource,BlockManagerSource,executorAllocationManagerSource;

13,標記當前 SparkContext 爲激活狀態(這個代碼在 SparkContext 類的最後,2237 行);

以上就是 SparkContext 初始化過程,咱們最主要的是分析 TaskScheduler 和 DAGScheduler 的建立和啓動過程,這篇主要來看 TaskScheduler。

建立和啓動 TaskScheduler 的代碼從 501 行開始,代碼以下:

// 建立 TaskScheduler,DAGScheduler
		val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)
    _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)

    // 開啓 taskScheduler
    _taskScheduler.start()
複製代碼

上面代碼能夠看出經過調用 SparkContext.createTaskScheduler 方法去建立 TaskScheduler。

createTaskScheduler 方法會根據傳入的 master 參數進行模式匹配,咱們使用的是 spark 的 standalone cluster 模式,對應的 master 確定是這種的:「spark://xxxx」。

因此會匹配到 SPARK_REGEX(sparkUrl) 這裏,這裏的 SPARK_REGEX 的匹配規則是 val SPARK_REGEX = """spark://(.*)""".r

匹配成功後,會去建立 TaskSchedulerImpl 對象、StandaloneSchedulerBackend 對象,而後而後將 StandaloneSchedulerBackend 對象帶入到 TaskSchedulerImpl 的 initialize 方法中進行初始化操做。最後返回 StandaloneSchedulerBackend 和 TaskSchedulerImpl 對象。

這裏的 TaskSchedulerImpl 是 TaskScheduler trait 的實現類,做用是:從 DAGScheduler 接收不一樣的 stage 任務,並向集羣提交這些任務。

以上分析對應的代碼以下:

case SPARK_REGEX(sparkUrl) =>
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
        scheduler.initialize(backend)
        (backend, scheduler)
複製代碼

下面主要看 scheduler.initialize(backend) 這裏,執行 TaskSchedulerImpl 的 initialize 方法。這個方法要傳入一個 SchedulerBackend 對象。

initialize 的過程當中首先會將 StandaloneSchedulerBackend 對象持有到該 TaskSchedulerImpl 對象裏。

而後去建立調度池 rootPool,這裏面緩存了調度隊列等相關信息。

而後再去根據 schedulingMode 去進行模式匹配使用哪一種調度算法,schedulingMode 初始化時候默認值是 FIFO,因此這裏默認的調度算法會匹配到 FIFO 模式。

模式匹配成功後建立 FIFOSchedulableBuilder,用來操做 rootPool 中的調度隊列。

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // 建立調度池
    rootPool = new Pool("", schedulingMode, 0, 0)
    // 模式匹配調度算法
    // schedulingMode 有個默認值爲 FIFO,具體能夠點到對應的代碼看
    schedulableBuilder = {
      schedulingMode match {
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
        case _ =>
          throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
      }
    }
    schedulableBuilder.buildPools()
  }
複製代碼

初始化完成後就會返回 (backend, scheduler)

val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
複製代碼

而後爲 SparkContext 對象中對應的 _schedulerBackend 和 _taskScheduler 對象賦值,這裏的 _schedulerBackend 是 StandaloneSchedulerBackend,_taskScheduler 是 TaskSchedulerImpl

_schedulerBackend = sched
    _taskScheduler = ts
複製代碼

接下來就是建立 DAGScheduler。

DAGScheduler 主要做用是:分析用戶提交的應用,根據 RDD 之間的依賴關係建立 DAG 圖,而後根據 DAG 圖劃分紅多個 stage,爲每一個 stage 分一組 task 去處理一批數據。而後將這些 task 交給 TaskScheduler,TaskScheduler 會經過 ClusterManager 在集羣中找到符合要求的 Worker 上的 Executor 去啓動這些 task。

具體源碼能夠看後面的文章。

_dagScheduler = new DAGScheduler(this)
複製代碼

new DAGScheduler(this) 這個構造函數裏 this 就是當前的 SparkContext 對象,後續代碼裏會經過 SparkContext.taskScheduler 方法拿到裏面的 _taskScheduler 對象,而後會調用 TaskScheduler.setDAGScheduler 方法設置好 DAGScheduler 的引用。這裏設置好以後就會開啓 TaskScheduler。

_taskScheduler.start()
複製代碼

_taskScheduler.start() 方法中回去調用 backend.start 方法,在這裏就是 StandaloneSchedulerBackend 中的 start 方法。

override def start() {
    // 這裏會去調用 StandaloneSchedulerBackend 的 start 方法
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      speculationScheduler.scheduleAtFixedRate(new Runnable {
        override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
          checkSpeculatableTasks()
        }
      }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
    }
  }
複製代碼

StandaloneSchedulerBackend 是一個 SchedulerBackend trait 的實現類,是 TaskScheduler 調度後端接口,不一樣的集羣部署模式會有不一樣的實現。

TaskScheduler 給 task 分配資源的時候其實是經過 SchedulerBackend 去完成的。StandaloneSchedulerBackend 是用於 standalone cluster 模式下的 SchedulerBackend。做用於 driver 內,用於和 Executor 通訊,Task 的資源分配。

StandaloneSchedulerBackend 的 start 方法會去調用其父類的 start 方法,也就是 CoarseGrainedSchedulerBackend 的 start 方法。

這個方法內會去建立一個 DriverEndPointRef。

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }

    // 根據配置參數屬性去建立 DriverEndPointRef
    driverEndpoint = createDriverEndpointRef(properties)
  }
複製代碼

createDriverEndpointRef 這個方法內部會先想 rpcEnv 上註冊一個建立好的 DriverEndpoint。

protected def createDriverEndpointRef( properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
  	// 向 rpcEnv 註冊 createDriverEndpoint 這個方法返回的 DriverEndpoint
    rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
  }
複製代碼

createDriverEndpoint 這個方法是用來建立 DriverEndpoint 的,這個 DriverEndpoint 就是用來 提交 task 到 Executor,並接收 Executor 的返回結果的。

protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
  // 建立 DriverEndpoint 
  new DriverEndpoint(rpcEnv, properties)
  }
複製代碼

至此,TaskScheduler 建立和啓動已經完成。

相關文章
相關標籤/搜索