Spark 源碼分析(四): Application 的註冊

在前面一篇文章中分析到了 SparkContext 中的 TaskScheduler 建立及啓動。java

在 StandaloneSchedulerBackend start 代碼裏除了建立了一個 DriverEndpoint 用於 standalone 模式下用來和 Executor 通訊以外還會建立一個 AppClient。apache

這個 AppClient 會向 Master 註冊 Application,而後 Master 會經過 Application 的信息爲它分配 Worker。數組

建立這個 AppClient 對象以前會去獲取一些必要的參數。app

// 拿到 Driver RpcEndpoint 的地址
		val driverUrl = RpcEndpointAddress(
      sc.conf.get("spark.driver.host"),
      sc.conf.get("spark.driver.port").toInt,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
    // 一些啓動參數
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    // executor 額外的 java 選項
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    // executor 額外的環境變量
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    // executor 額外的依賴
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
      
   	val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    // 將上面的那些信息封裝成一個 command 對象
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    // 獲取 application UI 的地址
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    // 獲取 executor 配置的 core 數量
    val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
複製代碼

而後用上面的這些參數結合 SparkContext 中的一些數據封裝一個 ApplicationDescription 對象,這對象裏封裝了一些信息,能夠看看。異步

private[spark] case class ApplicationDescription( name: String,// 名字 maxCores: Option[Int],// 最多使用的 core 數量 memoryPerExecutorMB: Int,// 每一個 Executor 分配的內存 command: Command,// 啓動命令 appUiUrl: String,// application 的 UI Url 地址 eventLogDir: Option[URI] = None,// event 日誌目錄 // short name of compression codec used when writing event logs, if any (e.g. lzf) eventLogCodec: Option[String] = None,
    coresPerExecutor: Option[Int] = None,
    // number of executors this application wants to start with,
    // only used if dynamic allocation is enabled
    initialExecutorLimit: Option[Int] = None,
    user: String = System.getProperty("user.name", "<unknown>")) {

  override def toString: String = "ApplicationDescription(" + name + ")"
}
複製代碼

封裝好 ApplicationDescription 對象以後,根據這個對象建立一個 StandaloneAppClient 對象,而後調用 StandaloneAppClient 對象的 start 方法。ide

// 封裝成一個 AppclientDescription 對象 
val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
  // 建立 StandaloneAppClient 對象
    client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  // 調用 StandaloneAppClient 的 start 方法
    client.start()
    launcherBackend.setState(SparkAppHandle.State.SUBMITTED)
  // 等待註冊狀態的更新
    waitForRegistration()
    launcherBackend.setState(SparkAppHandle.State.RUNNING)
複製代碼

StandaloneAppClient 的 start 方法會去建立並註冊一個 ClientEndpoint 用來向 master 註冊 Application。ui

def start() {
    // Just launch an rpcEndpoint; it will call back into the listener.
    endpoint.set(rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv)))
  }
複製代碼

ClientEndPoint 是一個 RpcEndpoint,在初始化的時候會去調用其 onstart 方法。this

override def onStart(): Unit = {
      try {
        // 向 master 註冊
        registerWithMaster(1)
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          stop()
      }
    }
複製代碼

registerWithMaster 方法實際上會去調用 tryRegisterAllMasters 方法,向全部的 Master 去註冊。url

在 Spark 中,Master 可能會是高可靠的 (HA),這種狀況會有可能有多個 Master,不過只有一個 Master 處於 alive 狀態,其它處於 standby 狀態。spa

/** * 向全部的 master 進行一步註冊,將會一直調用 tryRegisterAllMasters 方法進行註冊,知道超出註冊時間 * 當成功鏈接到 master ,全部調度的工做和 Futures 都會被取消 */
private def registerWithMaster(nthRetry: Int) {
  		// 實際上調用了 tryRegisterAllMasters ,想全部 master 進行註冊
      registerMasterFutures.set(tryRegisterAllMasters())
      registrationRetryTimer.set(registrationRetryThread.schedule(new Runnable {
        override def run(): Unit = {
          if (registered.get) {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerMasterThreadPool.shutdownNow()
          } else if (nthRetry >= REGISTRATION_RETRIES) {
            markDead("All masters are unresponsive! Giving up.")
          } else {
            registerMasterFutures.get.foreach(_.cancel(true))
            registerWithMaster(nthRetry + 1)
          }
        }
      }, REGISTRATION_TIMEOUT_SECONDS, TimeUnit.SECONDS))
    }
複製代碼

tryRegisterAllMasters 方法的調用,向全部 master 註冊。

// 異步向全部 master 註冊,返回一個 [Future] 的數組用來之後取消
private def tryRegisterAllMasters(): Array[JFuture[_]] = {
      for (masterAddress <- masterRpcAddresses) yield {
        registerMasterThreadPool.submit(new Runnable {
          override def run(): Unit = try {
            if (registered.get) {
              return
            }
            logInfo("Connecting to master " + masterAddress.toSparkURL + "...")
            val masterRef = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
            // 向 master 發送註冊消息
            masterRef.send(RegisterApplication(appDescription, self))
          } catch {
            case ie: InterruptedException => // Cancelled
            case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
          }
        })
      }
    }
複製代碼

向 master 發送註冊消息後,master 收到消息後註冊完 application 以後會回覆一條消息。

case RegisterApplication(description, driver) =>
      // TODO Prevent repeated registrations from some driver
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        val app = createApplication(description, driver)
        // 將 master 中內存中等待調度的 app 隊列更新,
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        persistenceEngine.addApplication(app)
        // 向 driver 回覆一條註冊 Application 的處理結果的消息
        driver.send(RegisteredApplication(app.id, self))
        // 調度資源
        schedule()
      }
複製代碼

master 調用 shedule 方法,這個方法作兩件事,一個是給等待調度的 driver 分配資源,一個是給等待調度的 application 分配資源去啓動 executor。

在給等待調度的 application 分配資源的時候最後會走到 launchExecutor 方法,這個方法會經過給符合要求的 worker 發送啓動 executor 的消息去啓動 executor。

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    // 給 worker 發送 LaunchExecutor 消息
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }
複製代碼

在 worker 收到啓動 Executor 的消息後會去根據消息去啓動對應的 Executor。

至此,Application 的註冊就完成了。

相關文章
相關標籤/搜索