Spark 源碼解析(六): 向 driver 註冊 Executor

前一篇文章介紹了 Executor 進程的啓動,最後啓動的是 CoarseGrainedExecutorBackend,執行啓動命令後會執行它的 main 方法,啓動 CoarseGrainedExecutorBackend 進程。java

CoarseGrainedExecutorBackend 進程是 Executor 的守護進程,用戶 Executor 的建立和維護。app

首先咱們先看下 main 方法,主要就是獲取相關參數,而後調用 run 方法。async

def main(args: Array[String]) {
  
  	// 申明一些變量
    var driverUrl: String = null
    var executorId: String = null
    var hostname: String = null
    var cores: Int = 0
    var appId: String = null
    var workerUrl: Option[String] = None
    val userClassPath = new mutable.ListBuffer[URL]()

    var argv = args.toList while (!argv.isEmpty) {
      argv match {
        case ("--driver-url") :: value :: tail =>
          driverUrl = value
          argv = tail case ("--executor-id") :: value :: tail =>
          executorId = value
          argv = tail case ("--hostname") :: value :: tail =>
          hostname = value
          argv = tail case ("--cores") :: value :: tail =>
          cores = value.toInt
          argv = tail case ("--app-id") :: value :: tail =>
          appId = value
          argv = tail case ("--worker-url") :: value :: tail =>
          // Worker url is used in spark standalone mode to enforce fate-sharing with worker
          workerUrl = Some(value)
          argv = tail case ("--user-class-path") :: value :: tail =>
          userClassPath += new URL(value)
          argv = tail
        case Nil =>
        case tail =>
          // scalastyle:off println
          System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
          // scalastyle:on println
          printUsageAndExit()
      }
    }
		
  	// 判斷變量的合法性
    if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
      appId == null) {
      printUsageAndExit()
    }

  	// 將參數傳遞給 run 方法去執行
    run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
    System.exit(0)
  }
複製代碼

run 方法位於 CoarseGrainedExecutorBackend 的伴生對象中,這裏主要看建立了一個 Executor 的 sparkEnv,而後往這個 sparkEnv 中註冊了兩個 rpcEndpoint,一個是名爲 Executor 的 CoarseGrainedExecutorBackend 對象,一個是名爲 WorkerWatcher 的 WorkerWatcher 對象。ide

// 建立 Executor 的 SparkEnv 
		val env = SparkEnv.createExecutorEnv(
        driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
			// 建立 CoarseGrainedExecutorBackend 實例,並註冊到上面的 sparkEnv 中
      env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
        env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      // 建立 WorkerWatcher,用於檢測 worker 的狀態,碰到異常狀況就關閉 CoarseGrainedExecutorBackend
      workerUrl.foreach { url =>
        env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
      }
      env.rpcEnv.awaitTermination()
複製代碼

最主要的仍是看 CoarseGrainedExecutorBackend 這個 rpcEndpoint 在建立完註冊到 rpcEnv 中觸發的 onstart 方法。post

在其 onstart 方法中會向 driver 發送 RegisterExecutor 的消息。也就是向 taskSchedulerImpl 中的 StandaloneSchedulerBackend 發送消息。this

override def onStart() {
    logInfo("Connecting to driver: " + driverUrl)
    rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      driver = Some(ref)
     	// 給 driver 發送 RegisterExecutor 的消息
      ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
    }(ThreadUtils.sameThread).onComplete {
      // This is a very fast action so we can use "ThreadUtils.sameThread"
      case Success(msg) =>
        // Always receive `true`. Just ignore it
      case Failure(e) =>
        exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
    }(ThreadUtils.sameThread)
  }
複製代碼

而 StandaloneSchedulerBackend 是繼承於 CoarseGrainedSchedulerBackend,因此消息接收處理都在 CoarseGrainedSchedulerBackend 這個裏面。url

下面就看 CoarseGrainedSchedulerBackend 中的 receive 方法的模式匹配到 RegisterExecutor 這條消息後會作這些操做:spa

1,判斷 executor 是否重複註冊,若是重複註冊直接回復消息;scala

2,更新內存中存儲的關於 executor 的一些數據;code

3,回覆註冊成功 executor 消息;

// 匹配到 RegisterExecutor 消息
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
      // 防止重複註冊 
  		if (executorDataMap.contains(executorId)) {
          executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
          context.reply(true)
        } else {
          // If the executor's rpc env is not listening for incoming connections, `hostPort`
          // will be null, and the client connection should be used to contact the executor.
          val executorAddress = if (executorRef.address != null) {
              executorRef.address
            } else {
              context.senderAddress
            }
          logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
          addressToExecutorId(executorAddress) = executorId
          totalCoreCount.addAndGet(cores)
          totalRegisteredExecutors.addAndGet(1)
          val data = new ExecutorData(executorRef, executorRef.address, hostname,
            cores, cores, logUrls)
          // This must be synchronized because variables mutated
          // in this block are read when requesting executors
          CoarseGrainedSchedulerBackend.this.synchronized {
            executorDataMap.put(executorId, data)
            if (currentExecutorIdCounter < executorId.toInt) {
              currentExecutorIdCounter = executorId.toInt
            }
            if (numPendingExecutors > 0) {
              numPendingExecutors -= 1
              logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
            }
          }
        	// 回覆註冊成功的消息
          executorRef.send(RegisteredExecutor)
          // Note: some tests expect the reply to come after we put the executor in the map
          context.reply(true)
          listenerBus.post(
            SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
          makeOffers()
        }
複製代碼

而後 CoarseGrainedExecutorBackend 端收到註冊成功的消息後回去建立 Executor 對象。

case RegisteredExecutor =>
      logInfo("Successfully registered with driver")
      try {
        // 建立 Executor 對象
        executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      } catch {
        case NonFatal(e) =>
          exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
      }
複製代碼

至此,Executor 已經在 driver 註冊完了。

相關文章
相關標籤/搜索