Spark一級資源調度Shedule機制及SpreadOut模式源碼深刻剖析

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。web

1 Shedule 在哪裏?幹什麼?

Shedule()發生在Master中,那麼Master都負責什麼呢?能夠看到只要發生如下任何事件,就會從新執行Shedule()算法

  • RegisterWorker
  • RegisterApplication
  • ExecutorStateChanged
  • RequestSubmitDriver
  • completeRecovery
  • relaunchDriver
  • removeApplication
  • handleRequestExecutors
  • handleKillExecutors
  • removeDriver
所以,能夠看出所謂的一級資源調度,在Local-cluster部署模式和Standalone部署模式中,其實就是基於Master實現的資源調度,更確切的說是對Driver的資源調度和對Application(參數指定數量的Executor)的資源調度。

2 Master的天子王權(除Yarn資源和K8s容器編排)

Master是Local-cluster部署模式和Standalone部署模式中,最爲核心的管理組件。Master將直接決定整個集羣的可用性,容錯性,可用性。可謂位於整個Spark集羣中最重要,最核心的位置。職責以下:apache

  • Worker的管理
  • Application的管理
  • Driver的管理
  • 統一管理和分配集羣中的資源(如內存和cpu)
  • 接收各個Worker的註冊,狀態更新,心跳
  • Driver和Application的註冊

3 Driver 的前世此生?是什麼?如何納管?

  • Driver的誕生來源於Master接收到RequestSubmitDriver請求,那麼RequestSubmitDriver來源於何處,這又要從SparkSubmit類提及,先上代碼段,看看STANDALONE_CLUSTER_SUBMIT_CLASS,就從這裏開始:數組

    private[deploy] val YARN_CLUSTER_SUBMIT_CLASS = "org.apache.spark.deploy.yarn.YarnClusterApplication"   
        private[deploy] val REST_CLUSTER_SUBMIT_CLASS = classOf[RestSubmissionClientApp].getName()
        private[deploy] val STANDALONE_CLUSTER_SUBMIT_CLASS = classOf[ClientApp].getName()
        private[deploy] val KUBERNETES_CLUSTER_SUBMIT_CLASS ="org.apache.spark.deploy.k8s.submit.KubernetesClientApplication"
    複製代碼
  • 這裏開始封裝Spark-submit提交的各個參數,同時在StandAlone模式下,咱們開始關注ClientEndpoint它是一個終端.緩存

    // In standalone cluster mode, use the REST client to submit the application (Spark 1.3+).
      // All Spark parameters are expected to be passed to the client through system properties.
      if (args.isStandaloneCluster) {
        if (args.useRest) {
          childMainClass = REST_CLUSTER_SUBMIT_CLASS
          childArgs += (args.primaryResource, args.mainClass)
        } else {
        
        
          // In legacy standalone cluster mode, use Client as a wrapper around the user class
          childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS   <= 神來之筆ClientApp
          
          
          if (args.supervise) { childArgs += "--supervise" }
          Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
          Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
          childArgs += "launch"
          childArgs += (args.master, args.primaryResource, args.mainClass)
        }
        if (args.childArgs != null) {
          childArgs ++= args.childArgs
        }
      }
    複製代碼
  • ClientApp (ClientEndpoint) 開始向Master異步發送RequestSubmitDriver請求,也就是說:一次Spark-Submit提交,就會發送一次RequestSubmitDriver請求,進而生成一個資源申請的Driver。架構

    val driverDescription = new DriverDescription(
           driverArgs.jarUrl,
           driverArgs.memory,
           driverArgs.cores,
           driverArgs.supervise,
           command)
         asyncSendToMasterAndForwardReply[SubmitDriverResponse](
           RequestSubmitDriver(driverDescription))
    複製代碼
  • Master接收到提交的資源申請,開始向本身的成員變量drivers中放入一個Driver,也即每一次任務提交的的資源申請驅動。app

    case RequestSubmitDriver(description) =>
            if (state != RecoveryState.ALIVE) {
              val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
                "Can only accept driver submissions in ALIVE state."
              context.reply(SubmitDriverResponse(self, false, None, msg))
            } else {
              logInfo("Driver submitted " + description.command.mainClass)
              val driver = createDriver(description)
              persistenceEngine.addDriver(driver)
              waitingDrivers += driver
              drivers.add(driver)
              schedule()
      
              // TODO: It might be good to instead have the submission client poll the master to determine
              //       the current status of the driver. For now it's simply "fire and forget".
      
              context.reply(SubmitDriverResponse(self, true, Some(driver.id),
                s"Driver successfully submitted as ${driver.id}"))
            }
    複製代碼
  • 封裝資源申請實體DriverInfo框架

    private def createDriver(desc: DriverDescription): DriverInfo = {
      val now = System.currentTimeMillis()
      val date = new Date(now)
      new DriverInfo(now, newDriverId(date), desc, date)
    }
    複製代碼
總結:Master的receiveAndReply方法接收ClientEndpoint發送的消息RequestSubmitDriver,將收到的Driver註冊到waitingDrivers。基於此,纔會有後面的基於Driver的一級資源調度。
RequestSubmitDriver詳情請參考這篇博客,比個人更詳細。https://blog.csdn.net/u011564172/article/details/68496848
複製代碼

4 Application 的前世此生?和Driver淵源?如何納管?

  • 差一點就瘋了,Application和Driver徹底不是一個概念。Driver的誕生髮生在Spark-submit階段。而Application的誕生髮生在DAG調度階段,也即SparkContext實例化階段。拼了非講清不可。less

  • Master 最終會根據Application的資源申請,把appDesc放入apps隊列中,並對Application進行資源調度。dom

    val appDesc = ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
           webUrl, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor, initialExecutorLimit)
          
      SparkContext:
      
      SparkContext -> StandaloneSchedulerBackend -> StandaloneAppClient.start() 
      -> registerWithMaster -> masterRef.send(RegisterApplication(appDescription, self)) -> 
      
      Master:
      
      -> apps += app ->
      shedule()[Driver啓動後,調用startExecutorsOnWorkers()->allocateWorkerResourceToExecutors]
    複製代碼
  • Master端點registerApplication

    private def registerApplication(app: ApplicationInfo): Unit = { val appAddress = app.driver.address if (addressToApp.contains(appAddress)) { logInfo("Attempted to re-register application at same address: " + appAddress) return }

    applicationMetricsSystem.registerSource(app.appSource)
      apps += app
      idToApp(app.id) = app
      endpointToApp(app.driver) = app
      addressToApp(appAddress) = app
      waitingApps += app
    }
    複製代碼

5 Master的職責再講

  • 首先集羣啓動以後,Worker會向Master註冊,同時攜帶身份標識和資源狀況(如ID,host,port,cpu核數,內存大小),那麼這些資源交由Master納管後,Master會按照必定的資源調度策略分配給Driver和Application。
  • Master給Driver分配完資源後,將會向Worker發送啓動Driver命令,Worker接收到命令後,開始啓動Driver。
  • Master給Application分配完資源後,將向Worker發送啓動Executor命令,Worker接收到命令後,開始啓動Executor。

6 Shedule()神祕面紗

6.1 Shedule 核心思想

  • 打亂洗牌存活的Worker,在Worker資源知足的狀況下,啓動Executor。

  • 神來之筆(Driver資源調度)==> launchDriver(worker, driver)

  • 神來之筆(Executor調度)==> startExecutorsOnWorkers()

    * Schedule the currently available resources among waiting apps. This method will be called
        * every time a new app joins or resource availability changes.
        
        private def schedule(): Unit = {
          if (state != RecoveryState.ALIVE) {
            return
          }
          // Drivers take strict precedence over executors
          val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
          val numWorkersAlive = shuffledAliveWorkers.size
          var curPos = 0
          for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
            // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
            // start from the last worker that was assigned a driver, and continue onwards until we have
            // explored all alive workers.
            var launched = false
            var numWorkersVisited = 0
            while (numWorkersVisited < numWorkersAlive && !launched) {
              val worker = shuffledAliveWorkers(curPos)
              numWorkersVisited += 1
              if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
              
                launchDriver(worker, driver)      <= 神來之筆(Driver資源調度)
                
                waitingDrivers -= driver
                launched = true
              }
              curPos = (curPos + 1) % numWorkersAlive
            }
          }
          
          
          startExecutorsOnWorkers()               <= 神來之筆(Executor調度)
          
        }
    複製代碼

6.2 launchDriver 挖一挖

  • 發送到Worker開始啓動driver

    private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
      logInfo("Launching driver " + driver.id + " on worker " + worker.id)
      worker.addDriver(driver)
      driver.worker = Some(worker)
      worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
      driver.state = DriverState.RUNNING
    }
    複製代碼
  • Worker端的回饋

    case LaunchDriver(driverId, driverDesc) =>
            logInfo(s"Asked to launch driver $driverId")
            val driver = new DriverRunner(
              conf,
              driverId,
              workDir,
              sparkHome,
              driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
              self,
              workerUri,
              securityMgr)
            drivers(driverId) = driver
            driver.start()
      
            coresUsed += driverDesc.cores
            memoryUsed += driverDesc.mem
    複製代碼

6.3 startExecutorsOnWorkers 鑽一鑽

  • coresPerExecutor:參數設置的每個Executor所使用的內核數,默認爲1。

  • app.desc.memoryPerExecutorMB :參數設置的ExecutorMemory。

  • scheduleExecutorsOnWorkers :返回各個Worker上分配的內核數

  • allocateWorkerResourceToExecutors:

    private def startExecutorsOnWorkers(): Unit = {
          // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
          // in the queue, then the second app, etc.
          
          for (app <- waitingApps) {
          
            val coresPerExecutor = app.desc.coresPerExecutor.getOrElse(1) <= 神來之筆(Worker資源狀況判斷)
            
            // If the cores left is less than the coresPerExecutor,the cores left will not be allocated
            if (app.coresLeft >= coresPerExecutor) {
              // Filter out workers that don't have enough resources to launch an executor
              
              val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
                .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
                  worker.coresFree >= coresPerExecutor)
                .sortBy(_.coresFree).reverse                 <= 神來之筆(Worker資源狀況判斷)
                
              val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps) <= 神來之筆
      
              // Now that we've decided how many cores to allocate on each worker, let's allocate them
              for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
              
                allocateWorkerResourceToExecutors(
                  app, assignedCores(pos), app.desc.coresPerExecutor, usableWorkers(pos)) <= 神來之筆
              }
            }
          }
        }
    複製代碼

6.3 scheduleExecutorsOnWorkers 較較真

* Schedule executors to be launched on the workers.
   * Returns an array containing number of cores assigned to each worker.
   *
   * There are two modes of launching executors. The first attempts to spread out an application's
   * executors on as many workers as possible, while the second does the opposite (i.e. launch them
   * on as few workers as possible). The former is usually better for data locality purposes and is
   * the default.
   *
   * The number of cores assigned to each executor is configurable. When this is explicitly set,
   * multiple executors from the same application may be launched on the same worker if the worker
   * has enough cores and memory. Otherwise, each executor grabs all the cores available on the
   * worker by default, in which case only one executor per application may be launched on each
   * worker during one single schedule iteration.
   * Note that when `spark.executor.cores` is not set, we may still launch multiple executors from
   * the same application on the same worker. Consider appA and appB both have one executor running
   * on worker1, and appA.coresLeft > 0, then appB is finished and release all its cores on worker1,
   * thus for the next schedule iteration, appA launches a new executor that grabs all the free
   * cores on worker1, therefore we get multiple executors from appA running on worker1.
   *
   * It is important to allocate coresPerExecutor on each worker at a time (instead of 1 core
   * at a time). Consider the following example: cluster has 4 workers with 16 cores each.
   * User requests 3 executors (spark.cores.max = 48, spark.executor.cores = 16). If 1 core is
   * allocated at a time, 12 cores from each worker would be assigned to each executor.
   * Since 12 < 16, no executors would launch [SPARK-8881].
複製代碼
  • spreadOutApps 決定了Executor的分配是集中的,仍是按照順序分散的。

  • oneExecutorPerWorker :若是沒有指定coresPerExecutor,那麼就說明每個Worker上只有一個Executor,不然就是多個

  • assignedCores(pos)是返回的數組,其中freeWorkers就是索引0,1,2。對應的可分配的Cores就會是指定Worker上可以分配的。

  • allocateWorkerResourceToExecutors:就是根據打散後的Worker索引,進行Executor的啓動,玄機在於每個Worker是否須要啓動多個Executor

    private def scheduleExecutorsOnWorkers(
           app: ApplicationInfo,
           usableWorkers: Array[WorkerInfo],
           spreadOutApps: Boolean): Array[Int] = {
         val coresPerExecutor = app.desc.coresPerExecutor
         val minCoresPerExecutor = coresPerExecutor.getOrElse(1)
         val oneExecutorPerWorker = coresPerExecutor.isEmpty
         val memoryPerExecutor = app.desc.memoryPerExecutorMB
         val numUsable = usableWorkers.length
         val assignedCores = new Array[Int](numUsable) // Number of cores to give to each worker
         val assignedExecutors = new Array[Int](numUsable) // Number of new executors on each worker
         var coresToAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
     
         /** Return whether the specified worker can launch an executor for this app. */
         def canLaunchExecutor(pos: Int): Boolean = {
           val keepScheduling = coresToAssign >= minCoresPerExecutor
           val enoughCores = usableWorkers(pos).coresFree - assignedCores(pos) >= minCoresPerExecutor
     
           // If we allow multiple executors per worker, then we can always launch new executors.
           // Otherwise, if there is already an executor on this worker, just give it more cores.
           val launchingNewExecutor = !oneExecutorPerWorker || assignedExecutors(pos) == 0
           if (launchingNewExecutor) {
             val assignedMemory = assignedExecutors(pos) * memoryPerExecutor
             val enoughMemory = usableWorkers(pos).memoryFree - assignedMemory >= memoryPerExecutor
             val underLimit = assignedExecutors.sum + app.executors.size < app.executorLimit
             keepScheduling && enoughCores && enoughMemory && underLimit
           } else {
             // We're adding cores to an existing executor, so no need
             // to check memory and executor limits
             keepScheduling && enoughCores
           }
         }
     
         // Keep launching executors until no more workers can accommodate any
         // more executors, or if we have reached this application's limits
         var freeWorkers = (0 until numUsable).filter(canLaunchExecutor)
         while (freeWorkers.nonEmpty) {
           freeWorkers.foreach { pos =>
             var keepScheduling = true
             while (keepScheduling && canLaunchExecutor(pos)) {
               coresToAssign -= minCoresPerExecutor
               assignedCores(pos) += minCoresPerExecutor
     
               // If we are launching one executor per worker, then every iteration assigns 1 core
               // to the executor. Otherwise, every iteration assigns cores to a new executor.
               if (oneExecutorPerWorker) {
                 assignedExecutors(pos) = 1
               } else {
                 assignedExecutors(pos) += 1
               }
     
               // Spreading out an application means spreading out its executors across as
               // many workers as possible. If we are not spreading out, then we should keep
               // scheduling executors on this worker until we use all of its resources.
               // Otherwise, just move on to the next worker.
               if (spreadOutApps) {
                 keepScheduling = false
               }
             }
           }
           freeWorkers = freeWorkers.filter(canLaunchExecutor)
         }
         assignedCores
       }
    複製代碼

6.3 allocateWorkerResourceToExecutors 探究竟

  • 通知Worker根據Application的要求,也即根據應用提交時的要求,開始啓動Executor。

    private def allocateWorkerResourceToExecutors(
            app: ApplicationInfo,
            assignedCores: Int,
            coresPerExecutor: Option[Int],
            worker: WorkerInfo): Unit = {
          // If the number of cores per executor is specified, we divide the cores assigned
          // to this worker evenly among the executors with no remainder.
          // Otherwise, we launch a single executor that grabs all the assignedCores on this worker.
          val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
          val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
          for (i <- 1 to numExecutors) {
            val exec = app.addExecutor(worker, coresToAssign)
            launchExecutor(worker, exec)
            app.state = ApplicationState.RUNNING
          }
    }
    複製代碼
  • Master 終端發送 LaunchExecutor

    private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
          logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
          worker.addExecutor(exec)
          worker.endpoint.send(LaunchExecutor(masterUrl,
            exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory))
          exec.application.driver.send(
            ExecutorAdded(exec.id, worker.id, worker.hostPort, exec.cores, exec.memory))
        }
    複製代碼

7 總結

至此,一級資源調度Shedule機制剖析完畢,真的是剖析的體無完膚啊。貼一張圖,該休息了。由於已經0:18了。

秦凱新 於深圳 香港太平山全景 人定勝天

再見 2018 11 12

相關文章
相關標籤/搜索