Client在SparkDeploySchedulerBackend被start的時候, 被建立, 表明一個application和spark cluster進行通訊
Client的邏輯很簡單, 封裝ClientActor, 並負責該Actor的start和stop
而ClientActor的關鍵在於preStart的時候, 向master註冊該application, 而且在執行過程當中接收master發來的eventjava
/** * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description, * and a listener for cluster events, and calls back the listener when various events occur. */ private[spark] class Client( actorSystem: ActorSystem, masterUrl: String, appDescription: ApplicationDescription, listener: ClientListener) extends Logging { var actor: ActorRef = null var appId: String = null class ClientActor extends Actor with Logging { var master: ActorRef = null var masterAddress: Address = null var alreadyDisconnected = false // To avoid calling listener.disconnected() multiple times override def preStart() { try { master = context.actorFor(Master.toAkkaUrl(masterUrl)) // 建立master ActorRef, 用於和master通訊 masterAddress = master.path.address master ! RegisterApplication(appDescription) // 向master註冊該application context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent]) context.watch(master) // Doesn't work with remote actors, but useful for testing } catch { case e: Exception => markDisconnected() context.stop(self) } } override def receive = { // 接收master發來的各類events case RegisteredApplication(appId_) => case ApplicationRemoved(message) => case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) => case ExecutorUpdated(id, state, message, exitStatus) => case Terminated(actor_) if actor_ == master => case RemoteClientDisconnected(transport, address) if address == masterAddress => case RemoteClientShutdown(transport, address) if address == masterAddress => case StopClient => } } def start() { // Just launch an actor; it will call back into the listener. actor = actorSystem.actorOf(Props(new ClientActor)) } def stop() { if (actor != null) { try { val future = actor.ask(StopClient)(timeout) Await.result(future, timeout) } catch { case e: TimeoutException => logInfo("Stop request to Master timed out; it may already be shut down.") } actor = null } } }
client負責提交application給master, 而worker也會向master註冊
因此Master做爲Spark cluster的接口, 負責從client接收application請求, 並分配相應的worker資源給這個app
處理的關鍵消息, RegisterWorker, RegisterApplication或ExecutorStateChanged, 最終都會調用schedule
schedule是他的核心函數, 這裏首先只會根據worker的CPU cores進行schedule, 而不會考慮其餘的資源, 可用考慮讓app儘量分佈在更多或更少的workers上
最後向worker actor發送LaunchExecutor, 真正啓動ExecutorBackendnode
private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { var nextAppNumber = 0 val workers = new HashSet[WorkerInfo] // track workers val idToWorker = new HashMap[String, WorkerInfo] val actorToWorker = new HashMap[ActorRef, WorkerInfo] val addressToWorker = new HashMap[Address, WorkerInfo] val apps = new HashSet[ApplicationInfo] // track apps val idToApp = new HashMap[String, ApplicationInfo] val actorToApp = new HashMap[ActorRef, ApplicationInfo] val addressToApp = new HashMap[Address, ApplicationInfo] val waitingApps = new ArrayBuffer[ApplicationInfo] // 未完成的apps, schedule的對象 val completedApps = new ArrayBuffer[ApplicationInfo]
override def receive = { case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( host, workerPort, cores, Utils.megabytesToString(memory))) if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) context.watch(sender) // This doesn't work with remote actors but helps for testing sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get) schedule() // 從新schedule } } case RegisterApplication(description) => { logInfo("Registering app " + description.name) val app = addApplication(description, sender) logInfo("Registered app " + description.name + " with ID " + app.id) waitingApps += app context.watch(sender) // This doesn't work with remote actors but helps for testing sender ! RegisteredApplication(app.id) schedule() // 從新schedule } // 當executor的狀態發生變化時, 這裏只處理失敗的case
case ExecutorStateChanged(appId, execId, state, message, exitStatus) => { val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId)) execOption match { case Some(exec) => { // 說明該executor是有記錄的,合法的 exec.state = state exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) // 向driver actor發送ExecutorUpdated事件 if (ExecutorState.isFinished(state)) { // isFinished, means KILLED, FAILED, LOST, 即失敗的case,名字起的很差 val appInfo = idToApp(appId) // 先刪除該executor, 釋放出coresLeft, 從新schedule
// Remove this executor from the worker and app logInfo("Removing executor " + exec.fullId + " because it is " + state) appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) // Only retry certain number of times so we don't go into an infinite loop.
// 在retry次數之內, 則從新schedule執行 if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) { schedule() } else { // 超過retry次數, 則整個application失敗 logError("Application %s with ID %s failed %d times, removing it".format( appInfo.desc.name, appInfo.id, appInfo.retryCount)) removeApplication(appInfo, ApplicationState.FAILED) } } } case None => logWarning("Got status update for unknown executor " + appId + "/" + execId) } } case Heartbeat(workerId) => { // 更新worker的hb idToWorker.get(workerId) match { case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() case None => logWarning("Got heartbeat from unregistered worker " + workerId) } } case Terminated(actor)
case RemoteClientDisconnected(transport, address)
case RemoteClientShutdown(transport, address)
case RequestMasterState
case CheckForWorkerTimeOut
case RequestWebUIPort
}
/** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ def schedule() { // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app // in the queue, then the second app, etc. if (spreadOutApps) { // 讓app分佈到儘量多的worker上去 // Try to spread out each app among all the nodes, until it has all its cores for (app <- waitingApps if app.coresLeft > 0) { // coresLeft表示該app是否還須要更多的cores, 表示併發度 val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) // 找出能夠使用的workers,自己alive,能夠run這個app,最終按coresFree排序 .filter(canUse(app, _)).sortBy(_.coresFree).reverse // canUse的定義,worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app) val numUsable = usableWorkers.length val assigned = new Array[Int](numUsable) // Number of cores to give on each node var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) // 一共能夠分配的cores數,取須要的和可用的的min // 下面的過程是平均從每一個可用的workers上獲取cores
var pos = 0 while (toAssign > 0) { if (usableWorkers(pos).coresFree - assigned(pos) > 0) { // 不能assign的比free的多 toAssign -= 1 assigned(pos) += 1 } pos = (pos + 1) % numUsable // 若是一輪不夠,就須要循環分配 } // Now that we've decided how many cores to give on each node, let's actually give them for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome) //launch executorbackend app.state = ApplicationState.RUNNING } } } } else { // 讓app分配到儘量少的workers上去, 邏輯更簡單點 // Pack each app into as few nodes as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec, app.desc.sparkHome) app.state = ApplicationState.RUNNING } } } } } }
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor( // 向work actor發送LaunchExecutor事件 exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome) exec.application.driver ! ExecutorAdded( // 向driver actor發送ExecutorAdded事件 exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }
Worker做爲actor進程, 在啓動時首先建立工做目錄, 並向master註冊本身
最主要是接收LaunchExecutor事件, 使用ExecutorRunner來run executorbackendweb
private[spark] class Worker( host: String, port: Int, webUiPort: Int, cores: Int, memory: Int, masterUrl: String, workDirPath: String = null) extends Actor with Logging {
override def preStart() { sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse(".")) logInfo("Spark home: " + sparkHome) createWorkDir() // 根據用戶配置的sparkHome建立工做目錄 connectToMaster() // 向master註冊本身 }
override def receive = { case RegisteredWorker(url) => // 註冊成功,master的反饋 masterWebUiUrl = url logInfo("Successfully registered with master") context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) { master ! Heartbeat(workerId) // 設置scheduler.schedule按期發送hb } case RegisterWorkerFailed(message) => logError("Worker registration failed: " + message) System.exit(1) case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) => logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner( // 建立 appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None) // 發送給master ExecutorStateChanged事件,彙報ExecutorState.RUNNING case ExecutorStateChanged(appId, execId, state, message, exitStatus) => // 接收 master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) // 轉發給master val fullId = appId + "/" + execId if (ExecutorState.isFinished(state)) { val executor = executors(fullId) logInfo("Executor " + fullId + " finished with state " + state + message.map(" message " + _).getOrElse("") + exitStatus.map(" exitStatus " + _).getOrElse("")) finishedExecutors(fullId) = executor executors -= fullId coresUsed -= executor.cores memoryUsed -= executor.memory } case KillExecutor(appId, execId) => val fullId = appId + "/" + execId executors.get(fullId) match { case Some(executor) => logInfo("Asked to kill executor " + fullId) executor.kill() case None => logInfo("Asked to kill unknown executor " + fullId) } case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) => masterDisconnected() case RequestWorkerState => { sender ! WorkerStateResponse(host, port, workerId, executors.values.toList, finishedExecutors.values.toList, masterUrl, cores, memory, coresUsed, memoryUsed, masterWebUiUrl) } }ExecutorRunner, 並startExecutorRunner發來的ExecutorStateChanged事件
ExecutorRunner
建立線程執行fetchAndRunExecutor
而且在線程中, 使用ProcessBuilder啓動StandaloneExecutorBackend子進程apache
val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}") val command = Command("org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
爲什麼不直接建立子進程?併發
/** * Manages the execution of one executor process. */ private[spark] class ExecutorRunner( val appId: String, val execId: Int, val appDesc: ApplicationDescription, val cores: Int, val memory: Int, val worker: ActorRef, val workerId: String, val host: String, val sparkHome: File, val workDir: File) extends Logging {
val fullId = appId + "/" + execId var workerThread: Thread = null var process: Process = null var shutdownHook: Thread = null
def start() { workerThread = new Thread("ExecutorRunner for " + fullId) { override def run() { fetchAndRunExecutor() } // 建立線程執行fetchAndRunExecutor } workerThread.start()
}
def buildCommandSeq(): Seq[String] = { val command = appDesc.command val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java") // SPARK-698: do not call the run.cmd script, as process.destroy() // fails to kill a process tree on Windows Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ //java執行command.mainClass command.arguments.map(substituteVariables) }
/** * Download and run the executor described in our ApplicationDescription */ def fetchAndRunExecutor() { try { //調用ProcessBuilder, 使用進程執行command
//Launch the process val command = buildCommandSeq() val builder = new ProcessBuilder(command: _*).directory(executorDir) val env = builder.environment() for ((key, value) <- appDesc.command.environment) { env.put(key, value) } // In case we are running this from within the Spark Shell, avoid creating a "scala" // parent process for the executor command env.put("SPARK_LAUNCH_WITH_SCALA", "0") process = builder.start() // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run // long-lived processes only. However, in the future, we might restart the executor a few // times on the same machine. val exitCode = process.waitFor() worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), Some(exitCode)) } catch { worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None) } } } }