worker啓動通常包含兩大部分:DriverRunner和ExcetorRunner。web
worker啓動driver的幾個基本原理,最核心的是。worker內部會啓動一個線程,這個線程能夠理解爲driverRunner。而後DriverRunner會去負責啓動driver進程,並在以後對driver進程進行管理。app
worker的啓動步驟:ui
1- master要求worker啓動driver和Excetor、launchDriver‘launchExcetor。this
2- worker在內部launchDriver啓動了一個線程DriverRunner;建立driver的工做目錄; 封裝啓動driver的命令,用processBuilder啓動Driver。spa
3- worker在內部經過LaunchExector啓動一個ExectorRunner;建立Excetor的工做目錄,封裝啓動Excetor的命令,用ProcessBuilder啓動Excetor。線程
4- Excetor找到對應的driver,去反向註冊本身。code
LaunchDriver源碼以下:
case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner( conf, driverId, workDir, sparkHome, driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)), self, workerUri, securityMgr) drivers(driverId) = driver driver.start() coresUsed += driverDesc.cores memoryUsed += driverDesc.mem case KillDriver(driverId) => logInfo(s"Asked to kill driver $driverId") drivers.get(driverId) match { case Some(runner) => runner.kill() case None => logError(s"Asked to kill unknown driver $driverId") } case driverStateChanged @ DriverStateChanged(driverId, state, exception) => handleDriverStateChanged(driverStateChanged) case ReregisterWithMaster => reregisterWithMaster() case ApplicationFinished(id) => finishedApps += id maybeCleanupApplication(id) }
LaunchExecutor的源碼以下:
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) // Create the executor's working directory val executorDir = new File(workDir, appId + "/" + execId) if (!executorDir.mkdirs()) { throw new IOException("Failed to create directory " + executorDir) } // Create local dirs for the executor. These are passed to the executor via the // SPARK_EXECUTOR_DIRS environment variable, and deleted by the Worker when the // application finishes. val appLocalDirs = appDirectories.getOrElse(appId, { val localRootDirs = Utils.getOrCreateLocalRootDirs(conf) val dirs = localRootDirs.flatMap { dir => try { val appDir = Utils.createDirectory(dir, namePrefix = "executor") Utils.chmod700(appDir) Some(appDir.getAbsolutePath()) } catch { case e: IOException => logWarning(s"${e.getMessage}. Ignoring this directory.") None } }.toSeq if (dirs.isEmpty) { throw new IOException("No subfolder can be created in " + s"${localRootDirs.mkString(",")}.") } dirs }) appDirectories(appId) = appLocalDirs val manager = new ExecutorRunner( appId, execId, appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)), cores_, memory_, self, workerId, host, webUi.boundPort, publicAddress, sparkHome, executorDir, workerUri, conf, appLocalDirs, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None)) } catch { case e: Exception => logError(s"Failed to launch executor $appId/$execId for ${appDesc.name}.", e) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } sendToMaster(ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(e.toString), None)) } }
拓展:中華石杉--spark從入門到精通-第49講orm