前一篇文章介紹了 Executor 進程的啓動,最後啓動的是 CoarseGrainedExecutorBackend,執行啓動命令後會執行它的 main 方法,啓動 CoarseGrainedExecutorBackend 進程。java
CoarseGrainedExecutorBackend 進程是 Executor 的守護進程,用戶 Executor 的建立和維護。app
首先咱們先看下 main 方法,主要就是獲取相關參數,而後調用 run 方法。async
def main(args: Array[String]) {
// 申明一些變量
var driverUrl: String = null
var executorId: String = null
var hostname: String = null
var cores: Int = 0
var appId: String = null
var workerUrl: Option[String] = None
val userClassPath = new mutable.ListBuffer[URL]()
var argv = args.toList while (!argv.isEmpty) {
argv match {
case ("--driver-url") :: value :: tail =>
driverUrl = value
argv = tail case ("--executor-id") :: value :: tail =>
executorId = value
argv = tail case ("--hostname") :: value :: tail =>
hostname = value
argv = tail case ("--cores") :: value :: tail =>
cores = value.toInt
argv = tail case ("--app-id") :: value :: tail =>
appId = value
argv = tail case ("--worker-url") :: value :: tail =>
// Worker url is used in spark standalone mode to enforce fate-sharing with worker
workerUrl = Some(value)
argv = tail case ("--user-class-path") :: value :: tail =>
userClassPath += new URL(value)
argv = tail
case Nil =>
case tail =>
// scalastyle:off println
System.err.println(s"Unrecognized options: ${tail.mkString(" ")}")
// scalastyle:on println
printUsageAndExit()
}
}
// 判斷變量的合法性
if (driverUrl == null || executorId == null || hostname == null || cores <= 0 ||
appId == null) {
printUsageAndExit()
}
// 將參數傳遞給 run 方法去執行
run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
System.exit(0)
}
複製代碼
run 方法位於 CoarseGrainedExecutorBackend 的伴生對象中,這裏主要看建立了一個 Executor 的 sparkEnv,而後往這個 sparkEnv 中註冊了兩個 rpcEndpoint,一個是名爲 Executor 的 CoarseGrainedExecutorBackend 對象,一個是名爲 WorkerWatcher 的 WorkerWatcher 對象。ide
// 建立 Executor 的 SparkEnv
val env = SparkEnv.createExecutorEnv(
driverConf, executorId, hostname, port, cores, cfg.ioEncryptionKey, isLocal = false)
// 建立 CoarseGrainedExecutorBackend 實例,並註冊到上面的 sparkEnv 中
env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
// 建立 WorkerWatcher,用於檢測 worker 的狀態,碰到異常狀況就關閉 CoarseGrainedExecutorBackend
workerUrl.foreach { url =>
env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url))
}
env.rpcEnv.awaitTermination()
複製代碼
最主要的仍是看 CoarseGrainedExecutorBackend 這個 rpcEndpoint 在建立完註冊到 rpcEnv 中觸發的 onstart 方法。post
在其 onstart 方法中會向 driver 發送 RegisterExecutor 的消息。也就是向 taskSchedulerImpl 中的 StandaloneSchedulerBackend 發送消息。this
override def onStart() {
logInfo("Connecting to driver: " + driverUrl)
rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref =>
// This is a very fast action so we can use "ThreadUtils.sameThread"
driver = Some(ref)
// 給 driver 發送 RegisterExecutor 的消息
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
}(ThreadUtils.sameThread).onComplete {
// This is a very fast action so we can use "ThreadUtils.sameThread"
case Success(msg) =>
// Always receive `true`. Just ignore it
case Failure(e) =>
exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false)
}(ThreadUtils.sameThread)
}
複製代碼
而 StandaloneSchedulerBackend 是繼承於 CoarseGrainedSchedulerBackend,因此消息接收處理都在 CoarseGrainedSchedulerBackend 這個裏面。url
下面就看 CoarseGrainedSchedulerBackend 中的 receive 方法的模式匹配到 RegisterExecutor 這條消息後會作這些操做:spa
1,判斷 executor 是否重複註冊,若是重複註冊直接回復消息;scala
2,更新內存中存儲的關於 executor 的一些數據;code
3,回覆註冊成功 executor 消息;
// 匹配到 RegisterExecutor 消息
case RegisterExecutor(executorId, executorRef, hostname, cores, logUrls) =>
// 防止重複註冊
if (executorDataMap.contains(executorId)) {
executorRef.send(RegisterExecutorFailed("Duplicate executor ID: " + executorId))
context.reply(true)
} else {
// If the executor's rpc env is not listening for incoming connections, `hostPort`
// will be null, and the client connection should be used to contact the executor.
val executorAddress = if (executorRef.address != null) {
executorRef.address
} else {
context.senderAddress
}
logInfo(s"Registered executor $executorRef ($executorAddress) with ID $executorId")
addressToExecutorId(executorAddress) = executorId
totalCoreCount.addAndGet(cores)
totalRegisteredExecutors.addAndGet(1)
val data = new ExecutorData(executorRef, executorRef.address, hostname,
cores, cores, logUrls)
// This must be synchronized because variables mutated
// in this block are read when requesting executors
CoarseGrainedSchedulerBackend.this.synchronized {
executorDataMap.put(executorId, data)
if (currentExecutorIdCounter < executorId.toInt) {
currentExecutorIdCounter = executorId.toInt
}
if (numPendingExecutors > 0) {
numPendingExecutors -= 1
logDebug(s"Decremented number of pending executors ($numPendingExecutors left)")
}
}
// 回覆註冊成功的消息
executorRef.send(RegisteredExecutor)
// Note: some tests expect the reply to come after we put the executor in the map
context.reply(true)
listenerBus.post(
SparkListenerExecutorAdded(System.currentTimeMillis(), executorId, data))
makeOffers()
}
複製代碼
而後 CoarseGrainedExecutorBackend 端收到註冊成功的消息後回去建立 Executor 對象。
case RegisteredExecutor =>
logInfo("Successfully registered with driver")
try {
// 建立 Executor 對象
executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
} catch {
case NonFatal(e) =>
exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
}
複製代碼
至此,Executor 已經在 driver 註冊完了。