Apache Spark源碼走讀之19 -- standalone cluster模式下資源的申請與釋放

歡迎轉載,轉載請註明出處,徽滬一郎。java

概要

本文主要講述在standalone cluster部署模式下,Spark Application在整個運行期間,資源(主要是cpu core和內存)的申請與釋放。web

構成Standalone cluster部署模式的四大組成部件以下圖所示,分別爲Master, worker, executor和driver,它們各自運行於獨立的JVM進程。sass

從資源管理的角度來講app

  • Master  掌管整個cluster的資源,主要是指cpu core和memory,但Master自身並不擁有這些資源
  • Worker 計算資源的實際貢獻者,須向Master彙報自身擁有多少cpu core和memory, 在master的指示下負責啓動executor
  • Executor 執行真正計算的苦力,由master來決定該進程擁有的core和memory數值
  • Driver 資源的實際佔用者,Driver會提交一到多個job,每一個job在拆分紅多個task以後,會分發到各個executor真正的執行

這些內容在standalone cluster模式下的容錯性分析中也有所涉及,今天主要講一下資源在分配以後不一樣場景下是如何被順利回收的。ide

資源上報匯聚過程

standalone cluster下最主要的固然是master,master必須先於worker和driver程序正常啓動。函數

當master順利啓動完畢,能夠開始worker的啓動工做,worker在啓動的時候須要向master發起註冊,在註冊消息中帶有本worker節點的cpu core和內存。post

調用順序以下preStart->registerWithMaster->tryRegisterAllMastersui

看一看tryRegisterAllMasters的代碼this

def tryRegisterAllMasters() {
    for (masterUrl <- masterUrls) {
      logInfo("Connecting to master " + masterUrl + "...")
      val actor = context.actorSelection(Master.toAkkaUrl(masterUrl))
      actor ! RegisterWorker(workerId, host, port, cores, memory, webUi.boundPort, publicAddress)
    }
  }

咱們的疑問是RegisterWorker構造函數所需的參數memory和cores是從哪裏獲取的呢?spa

注意一下Worker中的main函數會建立WorkerArguments,

def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val args = new WorkerArguments(argStrings)
    val (actorSystem, _) = startSystemAndActor(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir)
    actorSystem.awaitTermination()
  }

 memory經過函數inferDefaultMemory獲取,而cores經過inferDefaultCores獲取。

def inferDefaultCores(): Int = {
    Runtime.getRuntime.availableProcessors()
  }

  def inferDefaultMemory(): Int = {
    val ibmVendor = System.getProperty("java.vendor").contains("IBM")
    var totalMb = 0
    try {
      val bean = ManagementFactory.getOperatingSystemMXBean()
      if (ibmVendor) {
        val beanClass = Class.forName("com.ibm.lang.management.OperatingSystemMXBean")
        val method = beanClass.getDeclaredMethod("getTotalPhysicalMemory")
        totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
      } else {
        val beanClass = Class.forName("com.sun.management.OperatingSystemMXBean")
        val method = beanClass.getDeclaredMethod("getTotalPhysicalMemorySize")
        totalMb = (method.invoke(bean).asInstanceOf[Long] / 1024 / 1024).toInt
      }
    } catch {
      case e: Exception => {
        totalMb = 2*1024
        System.out.println("Failed to get total physical memory. Using " + totalMb + " MB")
      }
    }
    // Leave out 1 GB for the operating system, but don't return a negative memory size
    math.max(totalMb - 1024, 512)
  }

 若是已經在配置文件中爲顯示指定了每一個worker的core和memory,則使用配置文件中的值,具體配置參數爲SPARK_WORKER_CORESSPARK_WORKER_MEMORY

Master在收到RegisterWork消息以後,根據上報的信息爲每個worker建立相應的WorkerInfo.

case RegisterWorker(id, workerHost, workerPort, cores, memory, workerUiPort, publicAddress) =>
    {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        workerHost, workerPort, cores, Utils.megabytesToString(memory)))
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
          sender, workerUiPort, publicAddress)
        if (registerWorker(worker)) {
          persistenceEngine.addWorker(worker)
          sender ! RegisteredWorker(masterUrl, masterWebUiUrl)
          schedule()
        } else {
          val workerAddress = worker.actor.path.address
          logWarning("Worker registration failed. Attempted to re-register worker at same " +
            "address: " + workerAddress)
          sender ! RegisterWorkerFailed("Attempted to re-register worker at same address: "
            + workerAddress)
        }
      }

資源分配過程

若是在worker註冊上來的時候,已經有Driver Application註冊上來,那麼就須要將原先處於未分配資源狀態的driver application啓動相應的executor。

WorkerInfo在schedule函數中會被使用到,schedule函數處理邏輯概述以下

  1. 查看目前存活的worker中剩餘的內存是否可以知足application每一個task的最低需求,若是是則將該worker加入到可分配資源的隊列
  2. 根據分發策略,若是是決定將工做平攤到每一個worker,則每次在一個worker上佔用一個core,直到全部可分配資源耗盡或已經知足driver的需求
  3. 若是分發策略是分發到儘量少的worker,則一次佔用盡worker上的可分配core,直到driver的core需求獲得知足
  4. 根據步驟2或3的結果在每一個worker上添加相應的executor,處理函數是addExecutor

爲了敘述簡單,現僅列出平攤到各個worker的分配處理過程

for (worker > workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          if (canUse(app, worker)) {
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              val exec = app.addExecutor(worker, coresToUse)
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }

launchExecutor主要負責兩件事情

  1. 記錄下新添加的executor使用掉的cpu core和內存數目,記錄過程發生在worker.addExecutor
  2. 向worker發送LaunchExecutor指令
def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

worker在收到LaunchExecutor指令後,也會記一筆帳,將要使用掉的cpu core和memory從可用資源中減去,而後使用ExecutorRunner來負責生成Executor進程,注意Executor運行於獨立的進程。代碼以下

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
      if (masterUrl != activeMasterUrl) {
        logWarning("Invalid Master (" + masterUrl + ") attempted to launch executor.")
      } else {
        try {
          logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
          val manager = new ExecutorRunner(appId, execId, appDesc, cores_, memory_,
            self, workerId, host,
            appDesc.sparkHome.map(userSparkHome => new File(userSparkHome)).getOrElse(sparkHome),
            workDir, akkaUrl, conf, ExecutorState.RUNNING)
          executors(appId + "/" + execId) = manager
          manager.start()
          coresUsed += cores_
          memoryUsed += memory_
          masterLock.synchronized {
            master ! ExecutorStateChanged(appId, execId, manager.state, None, None)
          }
        } catch {
          case e: Exception => {
            logError("Failed to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
            if (executors.contains(appId + "/" + execId)) {
              executors(appId + "/" + execId).kill()
              executors -= appId + "/" + execId
            }
            masterLock.synchronized {
              master ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, None, None)
            }
          }
        }
      }

在資源分配過程當中須要注意到的是若是有多個Driver Application處於等待狀態,資源分配的原則是FIFO,先到先得。

資源回收過程

worker中上報的資源最終被driver application中提交的job task所佔用,若是application結束(包括正常和異常退出),application所佔用的資源就應該被順利回收,即將佔用的資源從新納入可分配資源行列。

如今的問題轉換成Master和Executor如何知道Driver Application已經退出了呢?

有兩種不一樣的處理方式,一種是先作別後離開,一種是不告而別。現分別闡述。

何爲先作別後離開,即driver application顯式的通知master和executor,任務已經完成了,我要bye了。應用程序顯式的調用SparkContext.stop

def stop() {
    postApplicationEnd()
    ui.stop()
    // Do this only if not stopped already - best case effort.
    // prevent NPE if stopped more than once.
    val dagSchedulerCopy = dagScheduler
    dagScheduler = null
    if (dagSchedulerCopy != null) {
      metadataCleaner.cancel()
      cleaner.foreach(_.stop())
      dagSchedulerCopy.stop()
      taskScheduler = null
      // TODO: Cache.stop()?
      env.stop()
      SparkEnv.set(null)
      ShuffleMapTask.clearCache()
      ResultTask.clearCache()
      listenerBus.stop()
      eventLogger.foreach(_.stop())
      logInfo("Successfully stopped SparkContext")
    } else {
      logInfo("SparkContext already stopped")
    }
  }

顯式調用SparkContext.stop的一個主要功能是會去顯式的中止Executor,具體下達StopExecutor指令的代碼見於CoarseGrainedSchedulerBackend中的stop函數

override def stop() {
    stopExecutors()
    try {
      if (driverActor != null) {
        val future = driverActor.ask(StopDriver)(timeout)
        Await.ready(future, timeout)
      }
    } catch {
      case e: Exception =>
        throw new SparkException("Error stopping standalone scheduler's driver actor", e)
    }
  }

那麼Master又是如何知道Driver Application退出的呢?這要歸功於Akka的通信機制了,當相互通信的任意一方異常退出,另外一方都會收到DisassociatedEvent, Master也就是在這個消息處理中移除已經中止的Driver Application。

case DisassociatedEvent(_, address, _) => {
      // The disconnected client could've been either a worker or an app; remove whichever it was
      logInfo(s"$address got disassociated, removing it.")
      addressToWorker.get(address).foreach(removeWorker)
      addressToApp.get(address).foreach(finishApplication)
      if (state == RecoveryState.RECOVERING && canCompleteRecovery) { completeRecovery() }
    }

不告而別的方式下Executor是如何知道本身所服務的application已經順利完成使命了呢?道理和master的同樣,仍是經過DisassociatedEvent來感知。詳見CoarseGrainedExecutorBackend中的receive函數

case x: DisassociatedEvent =>
      logError(s"Driver $x disassociated! Shutting down.")
      System.exit(1)

異常狀況下的資源回收

因爲Master和Worker之間的心跳機制,若是worker異常退出, Master會由心跳機制感知到其消亡,進而將其上報的資源移除。

Executor異常退出時,Worker中的監控線程ExecutorRunner會當即感知,進而上報給Master,Master會回收資源,並從新要求worker啓動executor。

相關文章
相關標籤/搜索