Spark Deploy 模塊

Spark Scheduler 模塊的文章中,介紹到 Spark 將底層的資源管理和上層的任務調度分離開來,通常而言,底層的資源管理會使用第三方的平臺,如 YARN 和 Mesos。爲了方便用戶測試和使用,Spark 也單獨實現了一個簡單的資源管理平臺,也就是本文介紹的 Deploy 模塊。java

一些有經驗的讀者已經使用過該功能。web

本文參考:http://jerryshao.me/architecture/2013/04/30/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B-deploy%E6%A8%A1%E5%9D%97/apache

Spark RPC 的實現restful

細心的讀者在閱讀 Scheduler 相關代碼時,已經注意到不少地方使用了 RPC 的方式通信,好比 driver 和 executor 之間傳遞消息。架構

在舊版本的 Spark 中,直接使用了 akka.Actor 做爲併發通信的基礎。不少模塊是繼承於 akka.Actor 的。爲了剝離對 akka 的依賴性, Spark 抽象出一個獨立的模塊,org.apache.spark.rpc。裏面定義了 RpcEndpoint 和 RpcEndpointRef,與 Actor 和 ActorRef 的意義和做用如出一轍。而且該 RPC 模塊僅有一個實現 org.apache.spark.rpc.akka。因此其通信方式依然使用了 akka。優點是接口已經抽象出來,隨時能夠用其餘方案替換 akka。併發

Spark 的風格彷佛就是這樣,什麼都喜歡本身實現,包括調度、存儲、shuffle,和剛推出的 Tungsten 項目(本身管理內存,而非 JVM 託管)。app

Deploy 模塊的總體架構dom

deploy 木塊主要包括三個模塊:master, worker, client。ide

Master:集羣的管理者,接受 worker 的註冊,接受 client 提交的 application,調度全部的 application。函數

Worker:一個 worker 上有多個 ExecutorRunner,這些 executor 是真正運行 task 的地方。worker 啓動時,會向 master 註冊本身。

Client:向 master 提交和監控 application。

代碼詳解

啓動 master 和 worker

object org.apache.spark.deploy.master.Master 中,有 master 啓動的 main 函數:

private[deploy] object Master extends Logging {
  val SYSTEM_NAME = "sparkMaster"
  val ENDPOINT_NAME = "Master"

  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new MasterArguments(argStrings, conf)
    val (rpcEnv, _, _) = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
    rpcEnv.awaitTermination()
  }

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
      new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) // 啓動 Master 和 master RPC
    val portsResponse = masterEndpoint.askWithRetry[BoundPortsResponse](BoundPortsRequest)
    (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
  }
}

這裏最主要的一行是:

    val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf)) // 啓動 Master 的 RPC

Master 繼承於 RpcEndpoint,因此這裏啓動工做,都是在 Master.onStart 中完成,主要是啓動了 restful 的 http 服務,用於展現狀態。

 

object org.apache.spark.deploy.worker.Worker 中,有 worker 啓動的 main 函數:

private[deploy] object Worker extends Logging {
  val SYSTEM_NAME = "sparkWorker"
  val ENDPOINT_NAME = "Worker"

  // 須要傳入 master 的 url
  def main(argStrings: Array[String]) {
    SignalLogger.register(log)
    val conf = new SparkConf
    val args = new WorkerArguments(argStrings, conf)
    val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
      args.memory, args.masters, args.workDir)
    rpcEnv.awaitTermination()
  }

  def startRpcEnvAndEndpoint(
      host: String,
      port: Int,
      webUiPort: Int,
      cores: Int,
      memory: Int,
      masterUrls: Array[String],
      workDir: String,
      workerNumber: Option[Int] = None,
      conf: SparkConf = new SparkConf): RpcEnv = {

    // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
    val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
    val securityMgr = new SecurityManager(conf)
    val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
    val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
    rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
      masterAddresses, systemName, ENDPOINT_NAME, workDir, conf, securityMgr)) // 啓動 Worker
    rpcEnv
  }
  ...
}

worker 啓動方式與 master 很是類似。而後

override def onStart() {
  assert(!registered)
  createWorkDir()  // 建立工做目錄
  shuffleService.startIfEnabled() // 啓動 shuffle 服務
  webUi = new WorkerWebUI(this, workDir, webUiPort) // 驅動 web 服務
  webUi.bind()
  registerWithMaster()  // 向 master 註冊本身

  metricsSystem.registerSource(workerSource) // 這側 worker 的資源
  metricsSystem.start()
  metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
}
private def registerWithMaster() {
  registrationRetryTimer match {
    case None =>
      registered = false
      registerMasterFutures = tryRegisterAllMasters()
      connectionAttemptCount = 0
      registrationRetryTimer = Some(forwordMessageScheduler.scheduleAtFixedRate( // 不斷向 master 註冊,直到成功
        new Runnable {
          override def run(): Unit = Utils.tryLogNonFatalError {
            self.send(ReregisterWithMaster)
          }
        },
        INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
        INITIAL_REGISTRATION_RETRY_INTERVAL_SECONDS,
        TimeUnit.SECONDS))
    ...
  }
}


override def receive: PartialFunction[Any, Unit] = {
  case RegisteredWorker(masterRef, masterWebUiUrl) =>  // master 告知 worker 已經註冊成功
    logInfo("Successfully registered with master " + masterRef.address.toSparkURL)
    registered = true
    changeMaster(masterRef, masterWebUiUrl)
    forwordMessageScheduler.scheduleAtFixedRate(new Runnable { // worker 不斷向 master 發送心跳
      override def run(): Unit = Utils.tryLogNonFatalError {
        self.send(SendHeartbeat)
      }
    }, 0, HEARTBEAT_MILLIS, TimeUnit.MILLISECONDS)
  ...
}

如此,master 和 worker 使用心跳的方式一直保持鏈接。

 

這裏有兩個 client,一是 org.apache.spark.deploy.Client,這個是咱們 spark-submit 使用的 client,另一個是 org.apache.spark.deploy.client.AppClient,這是用戶 application 中啓動的 client,也是本文介紹的 client。

client 提交 application

在 Spark Sceduler 模塊中,咱們有提到 AppClient 是在 SparkDeploySchedulerBackend 中被建立的,而 SparkDeploySchedulerBackend 是在 SparkContext 中被建立的。

// SparkDeploySchedulerBackend.scala
override def start() {
  super.start()

  // The endpoint for executors to talk to us
  val driverUrl = rpcEnv.uriOf(SparkEnv.driverActorSystemName,
      RpcAddress(sc.conf.get("spark.driver.host"), sc.conf.get("spark.driver.port").toInt),
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME)
  val args = Seq(
    "--driver-url", driverUrl,
    "--executor-id", "{{EXECUTOR_ID}}",
    "--hostname", "{{HOSTNAME}}",
    "--cores", "{{CORES}}",
    "--app-id", "{{APP_ID}}",
    "--worker-url", "{{WORKER_URL}}")  
  ....
  val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
  val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory,
      command, appUIAddress, sc.eventLogDir, sc.eventLogCodec, coresPerExecutor)
  client = new AppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
  client.start()
  waitForRegistration()
}

這裏的建立了一個 client:AppClient,它會鏈接到 masters(spark://master:7077) 上,具體是在 AppClient.start 方法中:

def start() {
  // Just launch an rpcEndpoint; it will call back into the listener.
  endpoint = rpcEnv.setupEndpoint("AppClient", new ClientEndpoint(rpcEnv))
}

ClientEndpoint 是一個 RpcEndpoint 的子類,被建立是會調用 onStart 方法,該方法向 master 註冊本身,並提交新的 application 請求:

private def tryRegisterAllMasters(): Array[JFuture[_]] = {
  for (masterAddress <- masterRpcAddresses) yield {
    registerMasterThreadPool.submit(new Runnable {
      override def run(): Unit = try {
        if (registered) {
          return
        }
        val masterRef = rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, masterAddress, Master.ENDPOINT_NAME)
        masterRef.send(RegisterApplication(appDescription, self)) // 向 master 發送 application 的註冊請求,而且 appDescription 包含如何啓動 executor 的命令
    ...

當 Master 接受到這個消息:

case RegisterApplication(description, driver) => {
  if (state == RecoveryState.STANDBY) {
  } else {
    logInfo("Registering app " + description.name)
    val app = createApplication(description, driver)
    registerApplication(app) // 加入等待列表
    logInfo("Registered app " + description.name + " with ID " + app.id)
    persistenceEngine.addApplication(app)
    driver.send(RegisteredApplication(app.id, self)) // 返回註冊成功的消息
    schedule() // 調度資源和 application
  }
}

schedule 是 master 最核心的方法,即資源調度和分配,這裏的資源是指 CPU(core) 數量和內存大小。

首先是把存在的 driver 的任務儘量運行起來:

private def schedule(): Unit = {
  if (state != RecoveryState.ALIVE) { return }
  val shuffledWorkers = Random.shuffle(workers) // Randomization helps balance drivers
  for (worker <- shuffledWorkers if worker.state == WorkerState.ALIVE) {
    for (driver <- waitingDrivers) {
      if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
        launchDriver(worker, driver) // 首先把 driver 的任務啓動起來
        waitingDrivers -= driver
      }
    }
  }
  startExecutorsOnWorkers()
}

而後給每一個 application 分配 executor:

private def startExecutorsOnWorkers(): Unit = {
  // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
  // in the queue, then the second app, etc.
  for (app <- waitingApps if app.coresLeft > 0) {
    val coresPerExecutor: Option[Int] = app.desc.coresPerExecutor
    // Filter out workers that don't have enough resources to launch an executor
    val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
      .filter(worker => worker.memoryFree >= app.desc.memoryPerExecutorMB &&
        worker.coresFree >= coresPerExecutor.getOrElse(1))
      .sortBy(_.coresFree).reverse
    // 在知足內存和cpu條件的 worker 中選擇一些 executor
    val assignedCores = scheduleExecutorsOnWorkers(app, usableWorkers, spreadOutApps)

    // Now that we've decided how many cores to allocate on each worker, let's allocate them
    for (pos <- 0 until usableWorkers.length if assignedCores(pos) > 0) {
      allocateWorkerResourceToExecutors(
        app, assignedCores(pos), coresPerExecutor, usableWorkers(pos))
    }
  }
}

// 給一個 worker 調度一些 executors
private def allocateWorkerResourceToExecutors(
    app: ApplicationInfo,
    assignedCores: Int,
    coresPerExecutor: Option[Int],
    worker: WorkerInfo): Unit = {
  val numExecutors = coresPerExecutor.map { assignedCores / _ }.getOrElse(1)
  val coresToAssign = coresPerExecutor.getOrElse(assignedCores)
  for (i <- 1 to numExecutors) {
    val exec = app.addExecutor(worker, coresToAssign)
    launchExecutor(worker, exec)
    app.state = ApplicationState.RUNNING
  }
}

// 發送註冊信息
private def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc): Unit = {
  worker.addExecutor(exec) // master 端記錄 worker 狀態
  worker.endpoint.send(LaunchExecutor(masterUrl,
    exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)) // 向 worker 端 rpc 發送註冊信息
  exec.application.driver.send(ExecutorAdded(
    exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)) // 向 driver 端 rpc 發送註冊信息
}

Worker 在接收到消息,會建立一個 ExecutorRunner,並向 master 更新 executor 信息。

case LaunchExecutor(masterUrl, appId, execId, appDesc, cores_, memory_) =>
  val manager = new ExecutorRunner(
          appId,
          execId,
          appDesc.copy(command = Worker.maybeUpdateSSLSettings(appDesc.command, conf)),
          cores_,
          memory_,
          self,
          workerId,
          host,
          webUi.boundPort,
          publicAddress,
          sparkHome,
          executorDir,
          workerUri,
          conf,
          appLocalDirs, ExecutorState.LOADING)
        executors(appId + "/" + execId) = manager
        manager.start()
        coresUsed += cores_
        memoryUsed += memory_
        sendToMaster(ExecutorStateChanged(appId, execId, manager.state, None, None))

ExecutorRunner.start 啓動一個獨立線程,具體的 task 運算邏輯:

private def fetchAndRunExecutor() {
  try {
    val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
      memory, sparkHome.getAbsolutePath, substituteVariables) // 新進程的準備工做
    val command = builder.command()
    logInfo("Launch command: " + command.mkString("\"", "\" \"", "\""))

    builder.directory(executorDir)
    builder.environment.put("SPARK_EXECUTOR_DIRS", appLocalDirs.mkString(File.pathSeparator))
    builder.environment.put("SPARK_LAUNCH_WITH_SCALA", "0")

    // Add webUI log urls
    val baseUrl =
      s"http://$publicAddress:$webUiPort/logPage/?appId=$appId&executorId=$execId&logType="
    builder.environment.put("SPARK_LOG_URL_STDERR", s"${baseUrl}stderr")
    builder.environment.put("SPARK_LOG_URL_STDOUT", s"${baseUrl}stdout")

    process = builder.start() // 啓動一個新的進程執行 application 的 task
    val header = "Spark Executor Command: %s\n%s\n\n".format(
      command.mkString("\"", "\" \"", "\""), "=" * 40)

    // Redirect its stdout and stderr to files
    val stdout = new File(executorDir, "stdout")
    stdoutAppender = FileAppender(process.getInputStream, stdout, conf) // 綁定 process 的標準輸入

    val stderr = new File(executorDir, "stderr")
    Files.write(header, stderr, UTF_8)
    stderrAppender = FileAppender(process.getErrorStream, stderr, conf) // 綁定 process 的標準錯誤輸出

    // Wait for it to exit; executor may exit with code 0 (when driver instructs it to shutdown)
    // or with nonzero exit code
    val exitCode = process.waitFor() // 等待線程執行完畢
    state = ExecutorState.EXITED
    val message = "Command exited with code " + exitCode
    worker.send(ExecutorStateChanged(appId, execId, state, Some(message), Some(exitCode))) // 通知 worker 任務結束,worker會收回一些資源,並通知 master 任務結束
  } catch {
    case interrupted: InterruptedException => {
      logInfo("Runner thread for executor " + fullId + " interrupted")
      state = ExecutorState.KILLED
      killProcess(None)
    }
    case e: Exception => {
      logError("Error running executor", e)
      state = ExecutorState.FAILED
      killProcess(Some(e.toString))
    }
  }
}

application 結束

若是 application 是非正常緣由被殺掉,master 會調用 handleKillExecutors,因而 master 通知 worker 殺掉 executor,executor 又interrupt 其內部進程,各個組件分別收回各自的資源。這個步驟 與http://jerryshao.me/architecture/2013/04/30/Spark%E6%BA%90%E7%A0%81%E5%88%86%E6%9E%90%E4%B9%8B-deploy%E6%A8%A1%E5%9D%97/ 描述是如出一轍的。

總結

至此,對於 Spark 自身的 Deploy 介紹已經完畢。這個模塊相對簡單,由於只是一個簡單的資源管理系統,應該也不會被用於實際的生產環境中。不過讀懂 Spark 的資源管理器,對於一些不熟悉 YARN 和 Mesos 的同窗,仍是頗有學習意義的。

相關文章
相關標籤/搜索