Spark 源碼分析(五): Executor 啓動

上一篇已經將 Application 註冊到了 master 上了,在 master 收到註冊消息後會進行一系列操做,最後調用 schedule 方法。java

這個 schedule 方法會去作兩件事,一件事是給等待調度的 driver 分配資源,另外一件事是給等待調度的 application 去分配資源啓動 Executor。web

給 application 分配資源啓動 Executor 的代碼最終會調用一個方法:launchExecutor(是 Master 中的代碼)。app

在 lauchExecutor 方法中會先向 worker 發送 lauchExecutor 消息,而後會向 driver 發送 executor 已經啓動的消息。ide

private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    // 向 worker 發送 LaunchExecutor 消息
    worker.endpoint.send(LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
     // 向 driver 發送 ExecutorAdded 的消息
    exec.application.driver.send(
      ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
  }
複製代碼

下面看 worker 端收到 launchExecutor 消息後是怎麼處理的。fetch

一樣的在 receive 的模式匹配中找到該消息的匹配,能夠看到作了這些事情:ui

1,先判斷髮消息的 master 是不是 alive 狀態,若是是纔會繼續執行;this

2,建立 executor 的工做目錄和本地臨時目錄;url

3,將 master 發來的消息封裝爲 ExecutorRunner 對象,而後調用其 start 方法啓動線程;spa

4,向 master 發送消息,告訴當前 executor 的狀態;線程

// 模式匹配,是 LaunchExecutor 消息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  		// 若是發送消息的 master 不是 active 的則不執行 
  		if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))

          // 建立 executor 的工做目錄
          val executorDir = new File(workDir, appId + "/" + execId)
          if (!executorDir.mkdirs()) {
            throw new IOException("Failed to create directory " + executorDir)
          }

          // 經過 SPARK_EXECUTOR_DIRS 環境變量配置建立 Executor 的本地目錄
          // Worker 會在當前 application 執行結束後刪除這個目錄
          val appLocalDirs = appDirectories.getOrElse(appId,
            Utils.getOrCreateLocalRootDirs(conf).map { dir =>
              val appDir = Utils.createDirectory(dir, namePrefix = "executor")
              Utils.chmod700(appDir)
              appDir.getAbsolutePath()
            }.toSeq)
          appDirectories(appId) = appLocalDirs
          // 將接收到的 application 中 Executor 的相關信息封裝爲一個 ExecutorRunner 對象
          val manager = new ExecutorRunner(
            appId,
            execId,
            appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
            cores_,
            memory_,
            self,
            workerId,
            host,
            webUi.boundPort,
            publicAddress,
            sparkHome,
            executorDir,
            workerUri,
            conf,
            appLocalDirs, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          // 啓動這個線程
          manager.start()
          // 更新 worker 的資源利用狀況
          coresUsed += cores_
          memoryUsed += memory_
          // 給 master 回覆消息
          sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
        } catch {
          case e: Exception =>
            logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
              Some(e.toString), None))
        }
      }
複製代碼

這裏主要看 ExecutorRunner 調用 start 以後作了什麼。

其實是建立了一個線程,線程運行時會去執行 fetchAndRunExecutor 這個方法。

private[worker] def start() {
  	// 建立線程
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() }
    }
  	// 啓動線程
    workerThread.start()
    // Shutdown hook that kills actors on shutdown.
    shutdownHook = ShutdownHookManager.addShutdownHook { () =>
      // It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
      // be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
      if (state == ExecutorState.RUNNING) {
        state = ExecutorState.FAILED
      }
      killProcess(Some("Worker shutting down")) }
  }
複製代碼

fetchAndRunExecutor 這個方法將接收到的信息作以下一些事情:

1,建立 ProcessBuilder,準備執行本地命令;

2,爲 ProcessBuilder 建立執行目錄,設置環境變量;

3,啓動 ProcessBuilder,生成 Executor 進程,這個進程的名稱通常爲:CoarseGrainedExecutorBackend;

4,重定向輸出流和錯誤文件流;

5,等待獲取 executor 的退出碼,而後發送給 worker;

private def fetchAndRunExecutor() {
    try {
      // 建立 ProcessBuilder
      val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
        memory, sparkHome.getAbsolutePath, substituteVariables)
      // 封裝指令
      val command = builder.command()
      val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
      logInfo(s"Launch command: $formattedCommand")

      builder.directory(executorDir)
      builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

      // Add webUI log urls
      val baseUrl =
        if (conf.getBoolean("spark.ui.reverseProxy", false)) {
          s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
        } else {
          s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
        }
      builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
      builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

      // 啓動進程
      process = builder.start()
      val header = "Spark Executor Command: %s\n%s\n\n".format(
        formattedCommand, "=" * 40)

      // 重定向標準輸出流
      // Redirect its stdout and stderr to files
      val stdout = new File(executorDir, "stdout")
      stdoutAppender = FileAppender(process.getInputStream, stdout, conf)

      // 重定向錯誤輸出流
      val stderr = new File(executorDir, "stderr")
      Files.write(header, stderr, StandardCharsets.UTF_8)
      stderrAppender = FileAppender(process.getErrorStream, stderr, conf)

      // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
      // or with nonzero exit code
      // 等待退出碼
      val exitCode = process.waitFor()
      state = ExecutorState.EXITED
      val message = "Command exited with code " + exitCode
      // 將退出碼發送給 worker
      worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
    } catch {
      case interrupted: InterruptedException =>
        logInfo("Runner thread for executor " + fullId + " interrupted")
        state = ExecutorState.KILLED killProcess(None) case e: Exception =>
        logError("Error running executor", e)
        state = ExecutorState.FAILED killProcess(Some(e.toString)) } } 複製代碼

至此,Executor 是啓動完成了。

相關文章
相關標籤/搜索