Spark 源碼分析(二): Driver 註冊及啓動

上一篇文章已經已經執行到 Client 向 masterEndpoint 發送了 RequestSubmitDriver 信息,下面就看看 master 怎麼註冊 driver 信息,而且怎麼讓 worker 去啓動 driver 的。java

一,org.apache.spark.deploy.master.Master

這個 Master 就是前面 Client 發送的對象,是一個 ThreadSafeRpcEndpoint。內部的 receiveAndReply 這個方法在監聽外部發來信息。下面就來看這個方法。web

1,receiveAndReply 方法

這個方法內部會根據發送過來的消息作模式匹配,咱們找到 Client 發送過來的 RequestSubmitDriver 這個消息對應代碼,以下:算法

// 匹配到 Client 發送過來的消息
case RequestSubmitDriver(description) =>
  		// 判斷當前 master 的狀態是否爲 alive
      if (state != RecoveryState.ALIVE) {
        // 若是不是 alive 則回覆 driver 提交失敗
        val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
          "Can only accept driver submissions in ALIVE state."
        context.reply(SubmitDriverResponse(self, false, None, msg))
      } else {
        logInfo("Driver submitted " + description.command.mainClass)
        // 根據 client 發過來的 driver 信息建立 driver,而後持久化 driver
        // 而後將 driver 加入到等待隊列中去
        val driver = createDriver(description)
        persistenceEngine.addDriver(driver)
        waitingDrivers += driver
        // 將 driver 加入到 HashSet 中去
        drivers.add(driver)
          
        // 開始調度
        schedule()

        // TODO: It might be good to instead have the submission client poll the master to determine
        // the current status of the driver. For now it's simply "fire and forget".

        context.reply(SubmitDriverResponse(self, true, Some(driver.id),
          s"Driver successfully submitted as ${driver.id}"))
      }
複製代碼

這段代碼,作了這麼一些操做:判斷當前 master 的狀態是否爲 alive ,若是不是則回覆消息說:提交失敗,若是是則根據傳遞過來的 driver 信息建立 driver 對象(經過 createDriver 方法建立)並將其持久化,加入到等待隊列中去,而後開始執行調度算法 schduler。apache

這裏涉及到連個方法,分別能夠看一下,一個是 createDriver 方法,一個是 schduler 方法。app

2,createDriver 方法

// 建立 driver 對象
private def createDriver(desc: DriverDescription): DriverInfo = {
    val now = System.currentTimeMillis()
    val date = new Date(now)
    // 經過系統當前時間生成一個 driverId
    // 而後將系統當前時間,driverId,DriverDescription,日期 這些信息封裝成一個 DriverInfo
    new DriverInfo(now, newDriverId(date), desc, date)
  }
複製代碼

這個方法主要是經過當前時間生成一個 driverId,而後將當前時間,DriverDescription 等參數封裝成一個 DriverInfo 對象。dom

3,schduler 方法

該方法在 master 中會被屢次調用,每當 driver 的等待隊列中數據發生變更或者集羣資源發生變化都會掉用這個方法。這個方法主要是爲當前 driver 的等待隊列分配資源的。ide

private def schedule(): Unit = {
  	// 首先判斷當前 master 的狀態是否爲 alive 的,若是不是 alive 則不往下執行
    if (state != RecoveryState.ALIVE) {
      return
    }
    // Random.shuffle 這個方法主要是隨機分散各個元素,具體代碼能夠點進去看
  	// 這裏主要是將集羣中 state 爲 alive 的 worker 帥選出來,而後隨機打亂
    val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
    
    // 當前 alive 的 worker 數量
    val numWorkersAlive = shuffledAliveWorkers.size
    var curPos = 0
      
    // 將等待分配資源的 driver 隊列中的全部 driver 進行遍歷
    // 而後爲每一個 driver 遍歷一遍全部的 alive worker,當碰到 worker 的可用內存和比當前隊列中
    // 等待的 driver 所須要的內存要大而且 worker 的 core 數量也知足 driver 的需求時
    // 就會調用 launcherDriver 方法去將 driver 發送對應的 worker 上去執行
    for (driver <- waitingDrivers.toList) { 
      var launched = false
      var numWorkersVisited = 0
      while (numWorkersVisited < numWorkersAlive && !launched) {
        val worker = shuffledAliveWorkers(curPos)
        numWorkersVisited += 1
        // 找到符合條件的 worker
        if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
          launchDriver(worker, driver)
          // 將該 driver 從等待隊列中移除
          waitingDrivers -= driver
          // 標記當前 driver 爲 launched
          launched = true
        }
        
        // 移到下一個 driver 上
        curPos = (curPos + 1) % numWorkersAlive
      }
    }
  	
  	// 調用 startExecutorsOnWorkers 方法
    startExecutorsOnWorkers()
  }
複製代碼

這個 schduler 方法會遍歷等待分配資源的 driver 隊列,爲每一個 driver 遍歷一遍 alive 的 worker,找到資源知足的 worker,而後調用 launchDriver 方法,將該 driver 在這個 worker 上啓動,移除掉等待隊列中當前 driver,而後調用 startExecutorsOnWorkers 啓動 executor。ui

這裏又有兩個方法,一個是 launchDriver 方法,一個是 startExecutorsOnWorkers 方法去啓動 executor,startExecutorsOnWorkers 這個方法放到下面文章裏講,這篇文章主要講 driver 註冊和啓動。this

4,launchDriver 方法

這個方法主要是更新一些信息(worker 中的資源變動,worker 中啓動的 driver 信息記錄;driver 中添加上 worker 的信息),而後將向對應的 worker 發送 LaunchDriver 的消息。spa

private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
    logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    // 這裏是將 workerInfo 中添加上啓動 driver 的信息,內部也會減去 driver 使用掉的資源
    worker.addDriver(driver)
    // 將 driver 啓動的 worker 信息記錄到 driver 中
    driver.worker = Some(worker)
    // 給 worker 發送 LaunchDriver 的信息
    worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    // 標記當前 driver 狀態爲 running 狀態
    driver.state = DriverState.RUNNING
  }
複製代碼

經過把啓動的 driver 信息記錄到對應的 worker 信息中,再將對應的 worker 信息記錄到 driver 裏,而後給 worker 發送消息讓 worker 啓動 driver,標記當前的 driver 狀態爲 running。

這裏會給 worker 發送 LaunchDriver 的消息,下面去看下 worker 中是怎麼處理這個消息的。

二,org.apache.spark.deploy.worker.Worker

private[deploy] class Worker(
    override val rpcEnv: RpcEnv,
    webUiPort: Int,
    cores: Int,
    memory: Int,
    masterRpcAddresses: Array[RpcAddress],
    endpointName: String,
    workDirPath: String = null,
    val conf: SparkConf,
    val securityMgr: SecurityManager)
  extends ThreadSafeRpcEndpoint with Logging
複製代碼

從繼承關係上能夠看出 worker 也是 RpcEndPoint,因此直接找到它的 receive 方法,而後根據模式匹配找到 LaunchDriver 這個匹配下看操做邏輯便可。

case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
  		// 將 driver 信息封裝到一個 runner 內
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
  		// 而後將這個 runner 保存到一個 HashMap 中
      drivers(driverId) = driver
      // 啓動這個 runner
      driver.start()
			// 更新當前 worker 的資源信息
      coresUsed += driverDesc.cores
      memoryUsed += driverDesc.mem
複製代碼

這裏會將 driver 的信息封裝到一個 DriverRunner 裏面,而後再降這個 runner 保存到內存的一個 HashMap 中,而後開啓這個 ruuner,更新當前 worker 的資源信息。

到這裏咱們須要去看 DriverRunner 裏是怎麼操做的。

三,org.apache.spark.deploy.worker.DriverRunner

DriverRunner 是在 standalone cluster 部署模式下用來執行 driver 操做的,包括當 driver 掛掉以後的自動重啓。

1,start 方法

前面調用的是 runner 的 start 方法,因此咱們直接看這個 start 方法:

private[worker] def start() = {
  	// 開一個線程
    new Thread("DriverRunner for " + driverId) {
      override def run() {
        var shutdownHook: AnyRef = null
        try {
          shutdownHook = ShutdownHookManager.addShutdownHook { () =>
            logInfo(s"Worker shutting down, killing driver $driverId")
            kill()
          }

          // 準備 driver 的 jar 包而且執行 driver,並返回一個 exitCode
          val exitCode = prepareAndRunDriver()

          // 根據 exitCode 設置 finalState,一共有三種,分別爲:FINISHED,KILLED,FAILED
          finalState = if (exitCode == 0) {
            Some(DriverState.FINISHED)
          } else if (killed) {
            Some(DriverState.KILLED)
          } else {
            Some(DriverState.FAILED)
          }
        } catch {
          case e: Exception =>
            kill()
            finalState = Some(DriverState.ERROR)
            finalException = Some(e)
        } finally {
          if (shutdownHook != null) {
            ShutdownHookManager.removeShutdownHook(shutdownHook)
          }
        }

        // 而後將 driverId 和 driver 執行結果 finalState 以及一些異常信息發送給 worker
        worker.send(DriverStateChanged(driverId, finalState.get, finalException))
      }
    }.start()
  }
複製代碼

這裏主要是調用了一個 prepareAndRunDriver 這個方法,返回了一個結果碼,而後把結果碼轉換爲 finalState ,而後發送給 worker。

因此咱們直接去找 prepareAndRunDriver 這個方法。

2,prepareAndRunDriver 方法

private[worker] def prepareAndRunDriver(): Int = {
  	
    // 建立 driver 的工做目錄
    val driverDir = createWorkingDirectory()
    // 下載 driver 的 jar 包到工做目錄下
    val localJarFilename = downloadUserJar(driverDir)

    def substituteVariables(argument: String): String = argument match {
      case "{{WORKER_URL}}" => workerUrl
      case "{{USER_JAR}}" => localJarFilename
      case other => other
    }

    // 建立 ProcessBuilder
    val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
      driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)

    runDriver(builder, driverDir, driverDesc.supervise)
  }
複製代碼

這個方法主要作了這些事:建立 driver 的工做目錄,將 driver 的 jar 包下載到工做目錄下,而後建立 ProcessBuilder,傳入 driver 的執行命令,而後調用 runDriver 方法。

下面咱們看下 runDriver 方法。

3,runDriver 方法

private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
    builder.directory(baseDir)
    // 初始化操做
    def initialize(process: Process): Unit = {
      // 建立 stout 文件
      val stdout = new File(baseDir, "stdout")
      // 將 process 的 InputStream 流重定向爲 stout 文件
      CommandUtils.redirectStream(process.getInputStream, stdout)
			
      // 建立 stderr 文件
      val stderr = new File(baseDir, "stderr")
      // 將 builder 命令格式化處理
      val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
      val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
      Files.append(header, stderr, StandardCharsets.UTF_8)
      // 將 process 的 ErrStream 重定向到 stderr 文件
      CommandUtils.redirectStream(process.getErrorStream, stderr)
    }
  	// 調用 runCommandWithRetry
    runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
  }
複製代碼

該方法主要是定義了一個 initialize 方法,裏面會將傳入的 process 的輸入流和 err 流重定向到自定義的兩個文件中去,而後調用 runCommandWithRetry 這個方法。

看下 runCommandWithRetry 這個方法。

4,runCommandWithRetry 方法

private[worker] def runCommandWithRetry( command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
  	// 退出碼 
  	var exitCode = -1
    // 提交重試的燈帶時間
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5
    var keepTrying = !killed while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))

      synchronized {
        // 若是被 kill 則返回 exitcode
        if (killed) { return exitCode }
        // 執行 command 命令,啓動 driver 進程
        process = Some(command.start())
        // 調用上面定義好的 initialize 方法,將一些流的輸出文件作重定向
        initialize(process.get)
      }

      val processStart = clock.getTimeMillis()
      exitCode = process.get.waitFor()

      // check if attempting another run
      keepTrying = supervise && exitCode != 0 && !killed if (keepTrying) {
        if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000) {
          waitSeconds = 1
        }
        logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
        sleeper.sleep(waitSeconds)
        waitSeconds = waitSeconds * 2 // exponential back-off
      }
    }

    exitCode
  }
複製代碼

這裏是真正運行 driver 進程的地方,開啓 driver 進程後會使用上面 runDriver 中定義好的 initialize 方法去將 driver 進程中的一些流的輸出文件作重定向操做,並返回 exitcode。

至此,driver 就已經在 master 上註冊好了,而且 master 也分配合適的 worker 啓動了該 driver 進程。

咱們在 DriverRunner start 方法的最後會調用 worker.send(DriverStateChanged(driverId, finalState.get, finalException)) 這個方法,給 worker 發送 driver 狀態變化的消息。

四,org.apache.spark.deploy.worker.Worker

這裏咱們看下 worker 是怎麼處理的。

在 woker 的 receive 方法的模式匹配中是這麼操做的:

case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
      handleDriverStateChanged(driverStateChanged)
複製代碼

會去調用 handleDriverStateChanged 這個方法。

1,handleDriverStateChanged 方法

咱們再看下 handleDriverStateChanged 這個方法:

private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
    val driverId = driverStateChanged.driverId
    val exception = driverStateChanged.exception
    val state = driverStateChanged.state
    // 根據 state 作匹配打印日誌
    state match {
      case DriverState.ERROR =>
        logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
      case DriverState.FAILED =>
        logWarning(s"Driver $driverId exited with failure")
      case DriverState.FINISHED =>
        logInfo(s"Driver $driverId exited successfully")
      case DriverState.KILLED =>
        logInfo(s"Driver $driverId was killed by user")
      case _ =>
        logDebug(s"Driver $driverId changed state to $state")
    }
  	// 向 master 發送 driverStateChanged 消息
    sendToMaster(driverStateChanged)
    // 將該 driver 從 drivers 移除到 finishedDrivers 中去
    val driver = drivers.remove(driverId).get finishedDrivers(driverId) = driver trimFinishedDriversIfNecessary() // 更新 worker 節點的資源狀況 memoryUsed -= driver.driverDesc.mem
    coresUsed -= driver.driverDesc.cores
  }
複製代碼

主要是作了這些事:根據發送過來的 state 作模式匹配,打印對應的 log。而後把這個 driverStateChanged 消息轉發給 master,最後再更新下當前 worker 的一些存儲數據。

最後在看下 master 收到這個 driverStateChanged 消息是怎麼處理的。

五,org.apache.spark.deploy.master.Master

在其 recieve 方法中能夠找到匹配到 driverStageChanged 消息後的操做:

case DriverStateChanged(driverId, state, exception) =>
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
          // 調用 removeDriver 方法
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }
複製代碼

在這裏是調用了 removeDriver 方法,咱們下面就看下這個方法。

1,removeDriver 方法

private def removeDriver( driverId: String, finalState: DriverState, exception: Option[Exception]) {
  	// 根據 driver id 進行模式匹配
    drivers.find(d => d.id == driverId) match {
      case Some(driver) =>
        logInfo(s"Removing driver: $driverId")
        // 從 drivers 集合中移除當前 driver
        drivers -= driver if (completedDrivers.size >= RETAINED_DRIVERS) {
          val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
          completedDrivers.trimStart(toRemove)
        }
      	// 將 driver 添加到 completedDrivers 中去
        completedDrivers += driver
        // 從持久化引擎中移除
        persistenceEngine.removeDriver(driver)
        // 更新 driver 的狀態和 exception 並從 driver 的 worker 中移除掉當前 driver
        driver.state = finalState
        driver.exception = exception
        driver.worker.foreach(w => w.removeDriver(driver))
        schedule()
      case None =>
        logWarning(s"Asked to remove unknown driver: $driverId")
    }
  }
複製代碼

這個方法主要是將 master 中資源作恢復操做,會根據當前退出的 driver 作模式匹配,找到這個 driver,而後將其從 drivers 的集合中移除,添加到 completedDrivers 中去,而後從持久化引擎中移除掉,更新 driver 的狀態,並從 driver 持有的 worker 中移除掉結束的這個 driver。而後再調用 schedule 方法,讓釋放資源從新調度。

至此,driver 的註冊,啓動,以及退出後資源回收,都結束了。

相關文章
相關標籤/搜索