Spark源碼分析 – Deploy

參考, Spark源碼分析之-deploy模塊

 

Client

Client在SparkDeploySchedulerBackend被start的時候, 被建立, 表明一個application和spark cluster進行通訊
Client的邏輯很簡單, 封裝ClientActor, 並負責該Actor的start和stop
而ClientActor的關鍵在於preStart的時候, 向master註冊該application, 而且在執行過程當中接收master發來的eventjava

/**
 * The main class used to talk to a Spark deploy cluster. Takes a master URL, an app description,
 * and a listener for cluster events, and calls back the listener when various events occur.
 */
private[spark] class Client(
    actorSystem: ActorSystem,
    masterUrl: String,
    appDescription: ApplicationDescription,
    listener: ClientListener)
  extends Logging {
  var actor: ActorRef = null
  var appId: String = null

  class ClientActor extends Actor with Logging {
    var master: ActorRef = null
    var masterAddress: Address = null
    var alreadyDisconnected = false  // To avoid calling listener.disconnected() multiple times

    override def preStart() {
      try {
        master = context.actorFor(Master.toAkkaUrl(masterUrl)) // 建立master ActorRef, 用於和master通訊
        masterAddress = master.path.address
        master ! RegisterApplication(appDescription) // 向master註冊該application
        context.system.eventStream.subscribe(self, classOf[RemoteClientLifeCycleEvent])
        context.watch(master)  // Doesn't work with remote actors, but useful for testing
      } catch {
        case e: Exception =>
          markDisconnected()
          context.stop(self)
      }
    }

    override def receive = { // 接收master發來的各類events
      case RegisteredApplication(appId_) =>
      case ApplicationRemoved(message) =>
      case ExecutorAdded(id: Int, workerId: String, hostPort: String, cores: Int, memory: Int) =>
      case ExecutorUpdated(id, state, message, exitStatus) =>
      case Terminated(actor_) if actor_ == master =>
      case RemoteClientDisconnected(transport, address) if address == masterAddress =>
      case RemoteClientShutdown(transport, address) if address == masterAddress =>
      case StopClient =>
    }

  }

  def start() {
    // Just launch an actor; it will call back into the listener.
    actor = actorSystem.actorOf(Props(new ClientActor))
  }

  def stop() {
    if (actor != null) {
      try {
        val future = actor.ask(StopClient)(timeout)
        Await.result(future, timeout)
      } catch {
        case e: TimeoutException =>
          logInfo("Stop request to Master timed out; it may already be shut down.")
      }
      actor = null
    }
  }
}

Master

client負責提交application給master, 而worker也會向master註冊
因此Master做爲Spark cluster的接口, 負責從client接收application請求, 並分配相應的worker資源給這個app 
處理的關鍵消息, RegisterWorker, RegisterApplication或ExecutorStateChanged, 最終都會調用schedule
schedule是他的核心函數, 這裏首先只會根據worker的CPU cores進行schedule, 而不會考慮其餘的資源, 可用考慮讓app儘量分佈在更多或更少的workers上
最後向worker actor發送LaunchExecutor, 真正啓動ExecutorBackendnode

private[spark] class Master(host: String, port: Int, webUiPort: Int) extends Actor with Logging { 
  var nextAppNumber = 0
  val workers = new HashSet[WorkerInfo] // track workers
  val idToWorker = new HashMap[String, WorkerInfo]
  val actorToWorker = new HashMap[ActorRef, WorkerInfo]
  val addressToWorker = new HashMap[Address, WorkerInfo]

  val apps = new HashSet[ApplicationInfo] // track apps 
  val idToApp = new HashMap[String, ApplicationInfo]
  val actorToApp = new HashMap[ActorRef, ApplicationInfo]
  val addressToApp = new HashMap[Address, ApplicationInfo]

  val waitingApps = new ArrayBuffer[ApplicationInfo] // 未完成的apps, schedule的對象 
  val completedApps = new ArrayBuffer[ApplicationInfo]
  override def receive = {
    case RegisterWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress) => {
      logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
        host, workerPort, cores, Utils.megabytesToString(memory)))
      if (idToWorker.contains(id)) {
        sender ! RegisterWorkerFailed("Duplicate worker ID")
      } else {
        addWorker(id, host, workerPort, cores, memory, worker_webUiPort, publicAddress)
        context.watch(sender)  // This doesn't work with remote actors but helps for testing
        sender ! RegisteredWorker("http://" + masterPublicAddress + ":" + webUi.boundPort.get)
        schedule() // 從新schedule 
      }
    }

    case RegisterApplication(description) => {
      logInfo("Registering app " + description.name)
      val app = addApplication(description, sender)
      logInfo("Registered app " + description.name + " with ID " + app.id)
      waitingApps += app
      context.watch(sender)  // This doesn't work with remote actors but helps for testing
      sender ! RegisteredApplication(app.id)
      schedule() // 從新schedule
    }

    // 當executor的狀態發生變化時, 這裏只處理失敗的case
    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => {
      val execOption = idToApp.get(appId).flatMap(app => app.executors.get(execId))
      execOption match {
        case Some(exec) => { // 說明該executor是有記錄的,合法的
          exec.state = state
          exec.application.driver ! ExecutorUpdated(execId, state, message, exitStatus) // 向driver actor發送ExecutorUpdated事件
          if (ExecutorState.isFinished(state)) { // isFinished, means KILLED, FAILED, LOST, 即失敗的case,名字起的很差
            val appInfo = idToApp(appId)
            // 先刪除該executor, 釋放出coresLeft, 從新schedule
// Remove this executor from the worker and app
logInfo("Removing executor " + exec.fullId + " because it is " + state) appInfo.removeExecutor(exec) exec.worker.removeExecutor(exec) // Only retry certain number of times so we don't go into an infinite loop.
// 在retry次數之內, 則從新schedule執行
if (appInfo.incrementRetryCount < ApplicationState.MAX_NUM_RETRY) { schedule() } else { // 超過retry次數, 則整個application失敗 logError("Application %s with ID %s failed %d times, removing it".format( appInfo.desc.name, appInfo.id, appInfo.retryCount)) removeApplication(appInfo, ApplicationState.FAILED) } } } case None => logWarning("Got status update for unknown executor " + appId + "/" + execId) } } case Heartbeat(workerId) => { // 更新worker的hb idToWorker.get(workerId) match { case Some(workerInfo) => workerInfo.lastHeartbeat = System.currentTimeMillis() case None => logWarning("Got heartbeat from unregistered worker " + workerId) } } case Terminated(actor)
    case RemoteClientDisconnected(transport, address) 
    case RemoteClientShutdown(transport, address) 
    case RequestMasterState 
    case CheckForWorkerTimeOut 
    case RequestWebUIPort 
  }
  
  /**
   * Schedule the currently available resources among waiting apps. This method will be called
   * every time a new app joins or resource availability changes.
   */
  def schedule() {
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    if (spreadOutApps) { // 讓app分佈到儘量多的worker上去
      // Try to spread out each app among all the nodes, until it has all its cores
      for (app <- waitingApps if app.coresLeft > 0) { // coresLeft表示該app是否還須要更多的cores, 表示併發度
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE) // 找出能夠使用的workers,自己alive,能夠run這個app,最終按coresFree排序
                                   .filter(canUse(app, _)).sortBy(_.coresFree).reverse // canUse的定義,worker.memoryFree >= app.desc.memoryPerSlave && !worker.hasExecutor(app)
        val numUsable = usableWorkers.length
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum) // 一共能夠分配的cores數,取須要的和可用的的min
        // 下面的過程是平均從每一個可用的workers上獲取cores
var pos = 0 while (toAssign > 0) { if (usableWorkers(pos).coresFree - assigned(pos) > 0) { // 不能assign的比free的多 toAssign -= 1 assigned(pos) += 1 } pos = (pos + 1) % numUsable // 若是一輪不夠,就須要循環分配 } // Now that we've decided how many cores to give on each node, let's actually give them for (pos <- 0 until numUsable) { if (assigned(pos) > 0) { val exec = app.addExecutor(usableWorkers(pos), assigned(pos)) launchExecutor(usableWorkers(pos), exec, app.desc.sparkHome) //launch executorbackend app.state = ApplicationState.RUNNING } } } } else { // 讓app分配到儘量少的workers上去, 邏輯更簡單點 // Pack each app into as few nodes as possible until we've assigned all its cores for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) { for (app <- waitingApps if app.coresLeft > 0) { if (canUse(app, worker)) { val coresToUse = math.min(worker.coresFree, app.coresLeft) if (coresToUse > 0) { val exec = app.addExecutor(worker, coresToUse) launchExecutor(worker, exec, app.desc.sparkHome) app.state = ApplicationState.RUNNING } } } } } }
 
  def launchExecutor(worker: WorkerInfo, exec: ExecutorInfo, sparkHome: String) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    worker.addExecutor(exec)
    worker.actor ! LaunchExecutor( // 向work actor發送LaunchExecutor事件
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory, sparkHome)
    exec.application.driver ! ExecutorAdded( // 向driver actor發送ExecutorAdded事件
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

 

Worker

Worker做爲actor進程, 在啓動時首先建立工做目錄, 並向master註冊本身
最主要是接收LaunchExecutor事件, 使用ExecutorRunner來run executorbackendweb

private[spark] class Worker(
    host: String,
    port: Int,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterUrl: String,
    workDirPath: String = null)
  extends Actor with Logging {
 
  override def preStart() {
    sparkHome = new File(Option(System.getenv("SPARK_HOME")).getOrElse("."))
    logInfo("Spark home: " + sparkHome)
    createWorkDir() // 根據用戶配置的sparkHome建立工做目錄
    connectToMaster() // 向master註冊本身
  }
  override def receive = {
    case RegisteredWorker(url) => // 註冊成功,master的反饋
      masterWebUiUrl = url
      logInfo("Successfully registered with master")
      context.system.scheduler.schedule(0 millis, HEARTBEAT_MILLIS millis) {
        master ! Heartbeat(workerId) // 設置scheduler.schedule按期發送hb
      }

    case RegisterWorkerFailed(message) =>
      logError("Worker registration failed: " + message)
      System.exit(1)

    case LaunchExecutor(appId, execId, appDesc, cores_, memory_, execSparkHome_) =>
      logInfo("Asked to launch executor %s/%d for %s".format(appId, execId, appDesc.name))
      val manager = new ExecutorRunner( // 建立
        appId, execId, appDesc, cores_, memory_, self, workerId, host, new File(execSparkHome_), workDir)
      executors(appId + "/" + execId) = manager
      manager.start()
      coresUsed += cores_
      memoryUsed += memory_
      master ! ExecutorStateChanged(appId, execId, ExecutorState.RUNNING, None, None) // 發送給master ExecutorStateChanged事件,彙報ExecutorState.RUNNING

    case ExecutorStateChanged(appId, execId, state, message, exitStatus) => // 接收
      master ! ExecutorStateChanged(appId, execId, state, message, exitStatus) // 轉發給master
      val fullId = appId + "/" + execId
      if (ExecutorState.isFinished(state)) {
        val executor = executors(fullId)
        logInfo("Executor " + fullId + " finished with state " + state +
          message.map(" message " + _).getOrElse("") +
          exitStatus.map(" exitStatus " + _).getOrElse(""))
        finishedExecutors(fullId) = executor
        executors -= fullId
        coresUsed -= executor.cores
        memoryUsed -= executor.memory
      }

    case KillExecutor(appId, execId) =>
      val fullId = appId + "/" + execId
      executors.get(fullId) match {
        case Some(executor) =>
          logInfo("Asked to kill executor " + fullId)
          executor.kill()
        case None =>
          logInfo("Asked to kill unknown executor " + fullId)
      }

    case Terminated(_) | RemoteClientDisconnected(_, _) | RemoteClientShutdown(_, _) =>
      masterDisconnected()

    case RequestWorkerState => {
      sender ! WorkerStateResponse(host, port, workerId, executors.values.toList,
        finishedExecutors.values.toList, masterUrl, cores, memory,
        coresUsed, memoryUsed, masterWebUiUrl)
    }
  }ExecutorRunner, 並startExecutorRunner發來的ExecutorStateChanged事件

 

ExecutorRunner
建立線程執行fetchAndRunExecutor
而且在線程中, 使用ProcessBuilder啓動StandaloneExecutorBackend子進程apache

val args = Seq(driverUrl, "{{EXECUTOR_ID}}", "{{HOSTNAME}}", "{{CORES}}")
val command = Command("org.apache.spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)

爲什麼不直接建立子進程?併發

 

/**
 * Manages the execution of one executor process.
 */
private[spark] class ExecutorRunner(
    val appId: String,
    val execId: Int,
    val appDesc: ApplicationDescription,
    val cores: Int,
    val memory: Int,
    val worker: ActorRef,
    val workerId: String,
    val host: String,
    val sparkHome: File,
    val workDir: File)
  extends Logging {
 
  val fullId = appId + "/" + execId
  var workerThread: Thread = null
  var process: Process = null
  var shutdownHook: Thread = null
 
  def start() {
    workerThread = new Thread("ExecutorRunner for " + fullId) {
      override def run() { fetchAndRunExecutor() } // 建立線程執行fetchAndRunExecutor
    }
    workerThread.start()
  }
 
  def buildCommandSeq(): Seq[String] = {
    val command = appDesc.command
    val runner = getAppEnv("JAVA_HOME").map(_ + "/bin/java").getOrElse("java")
    // SPARK-698: do not call the run.cmd script, as process.destroy()
    // fails to kill a process tree on Windows
    Seq(runner) ++ buildJavaOpts() ++ Seq(command.mainClass) ++ //java執行command.mainClass
      command.arguments.map(substituteVariables)
  }
 
  /**
   * Download and run the executor described in our ApplicationDescription
   */
  def fetchAndRunExecutor() {
    try {
      //調用ProcessBuilder, 使用進程執行command
      //Launch the process
      val command = buildCommandSeq()
      val builder = new ProcessBuilder(command: _*).directory(executorDir)
      val env = builder.environment()
      for ((key, value) <- appDesc.command.environment) {
        env.put(key, value)
      }
      // In case we are running this from within the Spark Shell, avoid creating a "scala"
      // parent process for the executor command
      env.put("SPARK_LAUNCH_WITH_SCALA", "0")
      process = builder.start()

      // Wait for it to exit; this is actually a bad thing if it happens, because we expect to run
      // long-lived processes only. However, in the future, we might restart the executor a few
      // times on the same machine.
      val exitCode = process.waitFor()
      worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), Some(exitCode))
    } catch {
        worker ! ExecutorStateChanged(appId, execId, ExecutorState.FAILED, Some(message), None)
      }
    }
  }
}
相關文章
相關標籤/搜索