上一篇已經將 Application 註冊到了 master 上了,在 master 收到註冊消息後會進行一系列操做,最後調用 schedule 方法。java
這個 schedule 方法會去作兩件事,一件事是給等待調度的 driver 分配資源,另外一件事是給等待調度的 application 去分配資源啓動 Executor。web
給 application 分配資源啓動 Executor 的代碼最終會調用一個方法:launchExecutor(是 Master 中的代碼)。app
在 lauchExecutor 方法中會先向 worker 發送 lauchExecutor 消息,而後會向 driver 發送 executor 已經啓動的消息。ide
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
worker.addExecutor(exec)
// 向 worker 發送 LaunchExecutor 消息
worker.endpoint.send(LaunchExecutor(masterUrl,
exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
// 向 driver 發送 ExecutorAdded 的消息
exec.application.driver.send(
ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
}
複製代碼
下面看 worker 端收到 launchExecutor 消息後是怎麼處理的。fetch
一樣的在 receive 的模式匹配中找到該消息的匹配,能夠看到作了這些事情:ui
1,先判斷髮消息的 master 是不是 alive 狀態,若是是纔會繼續執行;this
2,建立 executor 的工做目錄和本地臨時目錄;url
3,將 master 發來的消息封裝爲 ExecutorRunner 對象,而後調用其 start 方法啓動線程;spa
4,向 master 發送消息,告訴當前 executor 的狀態;線程
// 模式匹配,是 LaunchExecutor 消息
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
// 若是發送消息的 master 不是 active 的則不執行
if (masterUrl != activeMasterUrl) {
logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
} else {
try {
logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
// 建立 executor 的工做目錄
val executorDir = new File(workDir, appId + "/" + execId)
if (!executorDir.mkdirs()) {
throw new IOException("Failed to create directory " + executorDir)
}
// 經過 SPARK_EXECUTOR_DIRS 環境變量配置建立 Executor 的本地目錄
// Worker 會在當前 application 執行結束後刪除這個目錄
val appLocalDirs = appDirectories.getOrElse(appId,
Utils.getOrCreateLocalRootDirs(conf).map { dir =>
val appDir = Utils.createDirectory(dir, namePrefix = "executor")
Utils.chmod700(appDir)
appDir.getAbsolutePath()
}.toSeq)
appDirectories(appId) = appLocalDirs
// 將接收到的 application 中 Executor 的相關信息封裝爲一個 ExecutorRunner 對象
val manager = new ExecutorRunner(
appId,
execId,
appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
cores_,
memory_,
self,
workerId,
host,
webUi.boundPort,
publicAddress,
sparkHome,
executorDir,
workerUri,
conf,
appLocalDirs, ExecutorState.RUNNING)
executors(appId + "/" + execId) = manager
// 啓動這個線程
manager.start()
// 更新 worker 的資源利用狀況
coresUsed += cores_
memoryUsed += memory_
// 給 master 回覆消息
sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))
} catch {
case e: Exception =>
logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e)
if (executors.contains(appId + "/" + execId)) {
executors(appId + "/" + execId).kill()
executors -= appId + "/" + execId
}
sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED,
Some(e.toString), None))
}
}
複製代碼
這裏主要看 ExecutorRunner 調用 start 以後作了什麼。
其實是建立了一個線程,線程運行時會去執行 fetchAndRunExecutor 這個方法。
private[worker] def start() {
// 建立線程
workerThread = new Thread("ExecutorRunner for " + fullId) {
override def run() { fetchAndRunExecutor() }
}
// 啓動線程
workerThread.start()
// Shutdown hook that kills actors on shutdown.
shutdownHook = ShutdownHookManager.addShutdownHook { () =>
// It's possible that we arrive here before calling `fetchAndRunExecutor`, then `state` will
// be `ExecutorState.RUNNING`. In this case, we should set `state` to `FAILED`.
if (state == ExecutorState.RUNNING) {
state = ExecutorState.FAILED
}
killProcess(Some("Worker shutting down")) }
}
複製代碼
fetchAndRunExecutor 這個方法將接收到的信息作以下一些事情:
1,建立 ProcessBuilder,準備執行本地命令;
2,爲 ProcessBuilder 建立執行目錄,設置環境變量;
3,啓動 ProcessBuilder,生成 Executor 進程,這個進程的名稱通常爲:CoarseGrainedExecutorBackend;
4,重定向輸出流和錯誤文件流;
5,等待獲取 executor 的退出碼,而後發送給 worker;
private def fetchAndRunExecutor() {
try {
// 建立 ProcessBuilder
val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
memory, sparkHome.getAbsolutePath, substituteVariables)
// 封裝指令
val command = builder.command()
val formattedCommand = command.asScala.mkString("\"", "\" \"", "\"")
logInfo(s"Launch command: $formattedCommand")
builder.directory(executorDir)
builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
// In case we are running this from within the Spark Shell, avoid creating a "scala"
// parent process for the executor command
builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")
// Add webUI log urls
val baseUrl =
if (conf.getBoolean("spark.ui.reverseProxy", false)) {
s"/proxy/$workerId/logPage/?appId=$appId&executorId=$execId&logType="
} else {
s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
}
builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")
// 啓動進程
process = builder.start()
val header = "Spark Executor Command: %s\n%s\n\n".format(
formattedCommand, "=" * 40)
// 重定向標準輸出流
// Redirect its stdout and stderr to files
val stdout = new File(executorDir, "stdout")
stdoutAppender = FileAppender(process.getInputStream, stdout, conf)
// 重定向錯誤輸出流
val stderr = new File(executorDir, "stderr")
Files.write(header, stderr, StandardCharsets.UTF_8)
stderrAppender = FileAppender(process.getErrorStream, stderr, conf)
// Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
// or with nonzero exit code
// 等待退出碼
val exitCode = process.waitFor()
state = ExecutorState.EXITED
val message = "Command exited with code " + exitCode
// 將退出碼發送給 worker
worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode)))
} catch {
case interrupted: InterruptedException =>
logInfo("Runner thread for executor " + fullId + " interrupted")
state = ExecutorState.KILLED killProcess(None) case e: Exception =>
logError("Error running executor", e)
state = ExecutorState.FAILED killProcess(Some(e.toString)) } } 複製代碼
至此,Executor 是啓動完成了。