歡迎轉載,轉載請註明出處,徽滬一郎。java
本文主要講述在standalone cluster部署模式下,Spark Application在整個運行期間,資源(主要是cpu core和內存)的申請與釋放。web
構成Standalone cluster部署模式的四大組成部件以下圖所示,分別爲Master, worker, executor和driver,它們各自運行於獨立的JVM進程。sass
從資源管理的角度來講app
這些內容在standalone cluster模式下的容錯性分析中也有所涉及,今天主要講一下資源在分配以後不一樣場景下是如何被順利回收的。ide
standalone cluster下最主要的固然是master,master必須先於worker和driver程序正常啓動。函數
當master順利啓動完畢,能夠開始worker的啓動工做,worker在啓動的時候須要向master發起註冊,在註冊消息中帶有本worker節點的cpu core和內存。post
調用順序以下preStart->registerWithMaster->tryRegisterAllMastersui
看一看tryRegisterAllMasters的代碼this
def tryRegisterAllMasters() { for (masterUrl <- masterUrls) { logInfo("Connecting to master " + masterUrl + "...") val actor = context.actorSelection(Master.toAkkaUrl(masterUrl)) actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress) } }
咱們的疑問是RegisterWorker構造函數所需的參數memory和cores是從哪裏獲取的呢?spa
注意一下Worker中的main函數會建立WorkerArguments,
def main(argStrings: Array[String]) { SignalLogger.register(log) val args = new WorkerArguments(argStrings) val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores, args.memory, args.masters, args.workDir) actorSystem.awaitTermination() }
memory經過函數inferDefaultMemory獲取,而cores經過inferDefaultCores獲取。
def inferDefaultCores(): Int = { Runtime.getRuntime.availableProcessors() } def inferDefaultMemory(): Int = { val ibmVendor = System.getProperty("java.vendor").contains("IBM") var totalMb = 0 try { val bean = ManagementFactory.getOperatingSystemMXBean() if (ibmVendor) { val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean") val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory") totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt } else { val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean") val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize") totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt } } catch { case e: Exception => { totalMb = 2*1024 System.out.println("Failed to get total physical memory. Using " + totalMb + " MB") } } // Leave out 1 GB for the operating system, but don't return a negative memory size math.max(totalMb - 1024, 512) }
若是已經在配置文件中爲顯示指定了每一個worker的core和memory,則使用配置文件中的值,具體配置參數爲SPARK_WORKER_CORES和SPARK_WORKER_MEMORY
Master在收到RegisterWork消息以後,根據上報的信息爲每個worker建立相應的WorkerInfo.
case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) => { logInfo("Registering worker %s:%d with %d cores, %s RAM".format( workerHost, workerPort, cores, Utils.megabytesToString(memory))) if (state == RecoveryState.STANDBY) { // ignore, don't send response } else if (idToWorker.contains(id)) { sender ! RegisterWorkerFailed("Duplicate worker ID") } else { val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory, sender, workerUiPort, publicAddress) if (registerWorker(worker)) { persistenceEngine.addWorker(worker) sender ! RegisteredWorker(masterUrl, masterWebUiUrl) schedule() } else { val workerAddress = worker.actor.path.address logWarning("Worker registration failed. Attempted to re-register worker at same " + "address: " + workerAddress) sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: " + workerAddress) } }
若是在worker註冊上來的時候,已經有Driver Application註冊上來,那麼就須要將原先處於未分配資源狀態的driver application啓動相應的executor。
WorkerInfo在schedule函數中會被使用到,schedule函數處理邏輯概述以下
爲了敘述簡單,現僅列出平攤到各個worker的分配處理過程
for (worker > workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec) app.state = ApplicationState.RUNNING } } } }
launchExecutor主要負責兩件事情
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) { logInfo("Launching executor " + exec.fullId + " on worker " + worker.id) worker.addExecutor(exec) worker.actor ! LaunchExecutor(masterUrl, exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory) exec.application.driver ! ExecutorAdded( exec.id, worker.id, worker.hostPort, exec.cores, exec.memory) }
worker在收到LaunchExecutor指令後,也會記一筆帳,將要使用掉的cpu core和memory從可用資源中減去,而後使用ExecutorRunner來負責生成Executor進程,注意Executor運行於獨立的進程。代碼以下
case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) => if (masterUrl != activeMasterUrl) { logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.") } else { try { logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_, self, workerId, host, appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome), workDir, akkaUrl, conf, ExecutorState.RUNNING) executors(appId + "/" + execId) = manager manager.start() coresUsed += cores_ memoryUsed += memory_ masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, manager.state, None, None) } } catch { case e: Exception => { logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name)) if (executors.contains(appId + "/" + execId)) { executors(appId + "/" + execId).kill() executors -= appId + "/" + execId } masterLock.synchronized { master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None) } } } }
在資源分配過程當中須要注意到的是若是有多個Driver Application處於等待狀態,資源分配的原則是FIFO,先到先得。
worker中上報的資源最終被driver application中提交的job task所佔用,若是application結束(包括正常和異常退出),application所佔用的資源就應該被順利回收,即將佔用的資源從新納入可分配資源行列。
如今的問題轉換成Master和Executor如何知道Driver Application已經退出了呢?
有兩種不一樣的處理方式,一種是先作別後離開,一種是不告而別。現分別闡述。
何爲先作別後離開,即driver application顯式的通知master和executor,任務已經完成了,我要bye了。應用程序顯式的調用SparkContext.stop
def stop() { postApplicationEnd() ui.stop() // Do this only if not stopped already - best case effort. // prevent NPE if stopped more than once. val dagSchedulerCopy = dagScheduler dagScheduler = null if (dagSchedulerCopy != null) { metadataCleaner.cancel() cleaner.foreach(_.stop()) dagSchedulerCopy.stop() taskScheduler = null // TODO: Cache.stop()? env.stop() SparkEnv.set(null) ShuffleMapTask.clearCache() ResultTask.clearCache() listenerBus.stop() eventLogger.foreach(_.stop()) logInfo("Successfully stopped SparkContext") } else { logInfo("SparkContext already stopped") } }
顯式調用SparkContext.stop的一個主要功能是會去顯式的中止Executor,具體下達StopExecutor指令的代碼見於CoarseGrainedSchedulerBackend中的stop函數
override def stop() { stopExecutors() try { if (driverActor != null) { val future = driverActor.ask(StopDriver)(timeout) Await.ready(future, timeout) } } catch { case e: Exception => throw new SparkException("Error stopping standalone scheduler's driver actor", e) } }
那麼Master又是如何知道Driver Application退出的呢?這要歸功於Akka的通信機制了,當相互通信的任意一方異常退出,另外一方都會收到DisassociatedEvent, Master也就是在這個消息處理中移除已經中止的Driver Application。
case DisassociatedEvent(_, address, _) => { // The disconnected client could've been either a worker or an app; remove whichever it was logInfo(s"$address got disassociated, removing it.") addressToWorker.get(address).foreach(removeWorker) addressToApp.get(address).foreach(finishApplication) if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() } }
不告而別的方式下Executor是如何知道本身所服務的application已經順利完成使命了呢?道理和master的同樣,仍是經過DisassociatedEvent來感知。詳見CoarseGrainedExecutorBackend中的receive函數
case x: DisassociatedEvent => logError(s"Driver $x disassociated! Shutting down.") System.exit(1)
因爲Master和Worker之間的心跳機制,若是worker異常退出, Master會由心跳機制感知到其消亡,進而將其上報的資源移除。
Executor異常退出時,Worker中的監控線程ExecutorRunner會當即感知,進而上報給Master,Master會回收資源,並從新要求worker啓動executor。