

0. spark-submit提交任務apache

  0.1 啓動腳本解析app

  分析spark-submit腳本源碼可知最終該命令執行./bin/spark-class的Java類腳本,./bin/spark-class腳本啓動的類是org.apache.spark.launcher.Main,在spark-submit模式下該類會啓動SparkSubmitCommandBuilder.java類,最終調用package org.apache.spark.deploy.SparkSubmit.scala。dom

 * Main gateway of launching a Spark application.  啓動Spark應用程序的主要入口
 * This program handles setting up the classpath with relevant Spark dependencies and provides   處理與Spark依賴相關的類路徑設置
 * a layer over the different cluster managers and deploy modes that Spark supports.   在Spark支持的不一樣部署模式下提供一個抽象層,封裝了不一樣集羣模式的差別  
object SparkSubmit {

  0.2 任務解析--運行提交任務時設置的主類async


    a. CLIENT模式(本地模式),此時任務設置的主類直接在提交節點運行。fetch

    b. CLUSTER模式,此時會由集羣調度,在集羣分配的節點上運行任務設置的主類。(具體分配的細節略過)ui

1. 千呼萬喚始出來--SparkContext  this

 * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark   SparkContext是Spark集羣的主入口,負責與集羣創建鏈接,
 * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster.  同時能夠建立RDD,累加器和廣播變量。
 * Only one SparkContext may be active per JVM.  You must `stop()` the active SparkContext before   Spark中每個運行的JVM只有一個SparkContext實例。
 * creating a new one.  This limitation may eventually be removed; see SPARK-2243 for more details.
 * @param config a Spark Config object describing the application configuration. Any settings in
 *   this config overrides the default configs as well as system properties.
class SparkContext(config: SparkConf) extends Logging {

  在Spark中,SparkContext負責與集羣進行通訊、資源的申請、任務的分配和監控等。能夠理解爲一個SparkContext實例對應一個Spark Driver Program(Spark應用程序),存在於任務的整個生命週期中。spa


def getOrCreate(): SparkSession = synchronized {
        val sparkContext = userSuppliedContext.getOrElse {
              // set app name if not given
              val randomAppName = java.util.UUID.randomUUID().toString
              val sparkConf = new SparkConf()
              options.foreach { case (k, v) => sparkConf.set(k, v) }
              if (!sparkConf.contains("")) {
              val sc = SparkContext.getOrCreate(sparkConf)
private[streaming] val sc: SparkContext = {
    if (_sc != null) {
    } else if (isCheckpointPresent) {
    } else {
      throw new SparkException("Cannot create StreamingContext without a SparkContext")



     1. 負責與Master通訊,註冊當前程序RegisterWithMaster;



1.1 SparkContext在實例化過程

  1. SparkContext實例化時,全部不在方法中的成員都會被實例化。createTaskScheduler位於SparkContext的代碼塊中,在實例化時會被執行;

  2. createTaskScheduler會根據不一樣的集羣類型返回對應的二元組(SchedulerBackend,TaskScheduler),以Standalone爲例,返回(StandaloneSchedulerBackend,TaskSchedulerImpl);

// Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    _schedulerBackend = sched
    _taskScheduler = ts
    _dagScheduler = new DAGScheduler(this)

    // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
    // constructor

  3. 執行_taskScheduler.start()方法,該方法中調用SchedulerBackend(此處爲StandaloneSchedulerBackend)的start方法;

  4. 在StandaloneSchedulerBackend的start方法中將任務信息封裝,調用StandaloneAppClient的start方法(此處僅註冊任務信息,並不會提交任務);

// Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress ="")
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
  client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)

  5. StandaloneAppClient(向Master註冊任務的客戶端,並非Driver)的方法會建立ClientEndPoint對象,調用本身的onStart方法,onStart中會調用registerWithMaster方法向Master註冊;

2. Master註冊任務信息  


  1. 註冊應用信息app  

  2. 將註冊成功的信息driver.send(RegisteredApplication(, self))回傳給請求註冊的客戶端StandaloneAppClient(此時已經完成了任務的註冊)


case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " +
        val app = createApplication(description, driver)
        logInfo("Registered app " + + " with ID " +
        driver.send(RegisteredApplication(, self))


   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
  private def schedule(): Unit = {
    if (state != RecoveryState.ALIVE) {
    // Drivers take strict precedence over executors
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
    for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
      // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
      // start from the last worker that was assigned a driver, and continue onwards until we have
      // explored all alive workers.
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)          //加載Driver
          waitingDrivers -= driver
          launched = true
        curPos = (curPos + 1) % numWorkersAlive
    startExecutorsOnWorkers()                //啓動Workers

3. Worker中啓動Driver

3.1 Driver啓動過程


case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
      val driver = new DriverRunner



private[worker] def prepareAndRunDriver(): Int = {
    val driverDir = createWorkingDirectory()
    val localJarFilename = downloadUserJar(driverDir)

    def substituteVariables(argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other

    // TODO: If we add ability to submit multiple jars they should also be added here
    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)

3.2 關於Driver的幾個問題  

  a. Driver指的是提交的應用程序,以下在DriverDescription中的定義。

  override def toString: String = s"DriverDescription (${command.mainClass})"

  b. Driver是在Worker中加載運行的,具體在哪一個Worker上運行,由Master分配,worker.endpoint.send(LaunchDriver(, driver.desc))。

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + + " on worker " +
    driver.worker = Some(worker)
    worker.endpoint.send(LaunchDriver(, driver.desc))
    driver.state = DriverState.RUNNING

  c. Driver不是提交的應用程序,Driver是在Worker中實例化的DriverRunner對象,是Worker中一個獨立運行的進程,負責管理該Driver的execution以及失敗重啓。 

 * Manages the execution of one driver, including automatically restarting the driver on failure.
 * This is currently only used in standalone cluster deploy mode.
private[deploy] class DriverRunner(

  d. 從Master的註冊應用的過程及schedule方法中可知,一個應用程序對應一個Driver,能夠有多個Worker(Executors)。

4. Worker中啓動Executor

      Master中schedule方法調用startExecutorsOnWorkers最終會發送消息LaunchExecutor給Worker,經過val manager = new ExecutorRunner實例化一個ExecutorRunner對象來運行任務。


   * Download and run the executor described in our ApplicationDescription
  private def fetchAndRunExecutor() {
    try {
      // Launch the process
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),

** appDesc.command爲StandaloneSchedulerBackend傳入的command,即Worker節點中啓動ExecutorRunner時,ExecutorRunner中會啓動CoarseGrainedExecutorBackend進程

val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",


override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))


override def receive: PartialFunction[Any, Unit] = {
    case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)

  * 每一個Worker上能夠啓動多個Executors,每一個Executor是一個獨立的進程。

* 下圖爲用戶提交Spark任務的流程圖

