上篇《Spark(四十九):Spark On YARN啓動流程源碼分析(一)》咱們講到啓動SparkContext初始化,ApplicationMaster啓動資源中,講解的內容明顯不完整。html
本章將針對yarn-cluster(--master yarn –deploy-mode cluster)模式下全面進行代碼補充解讀:java
使用spark-submit.sh提交任務:node
#/bin/sh #LANG=zh_CN.utf8 #export LANG export SPARK_KAFKA_VERSION=0.10 export LANG=zh_CN.UTF-8 jarspath='' for file in `ls /home/dx/works/myapp001/sparks/*.jar` do jarspath=${file},$jarspath done jarspath=${jarspath%?} echo $jarspath spark-submit \ --jars $jarspath \ --properties-file ./conf/spark-properties-myapp001.conf \ --verbose \ --master yarn \ --deploy-mode cluster \#或者client --name Streaming-$1-$2-$3-$4-$5-Agg-Parser \ --num-executors 16 \ --executor-memory 6G \ --executor-cores 2 \ --driver-memory 2G \ --driver-java-options "-XX:+TraceClassPaths" \ --class com.dx.myapp001.Main \ /home/dx/works/myapp001/lib/application-jar.jar $1 $2 $3 $4 $5
運行spark-submit.sh,實際上執行的是org.apache.spark.deploy.SparkSubmit的main:git
1)--master yarn --deploy-mode:clusterweb
調用YarnClusterApplication進行提交sql
YarnClusterApplication這是org.apache.spark.deploy.yarn.Client中的一個內部類,在YarnClusterApplication中new了一個Client對象,並調用了run方法apache
private[spark] class YarnClusterApplication extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. conf.remove("spark.jars") conf.remove("spark.files") new Client(new ClientArguments(args), conf).run() } }
2)--master yarn --deploy-mode:client[可忽略]sass
調用application-jar.jar自身main函數,執行的是JavaMainApplication
/** * Implementation of SparkApplication that wraps a standard Java class with a "main" method. * * Configuration is propagated to the application via system properties, so running multiple * of these in the same JVM may lead to undefined behavior due to configuration leaks. */ private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication { override def start(args: Array[String], conf: SparkConf): Unit = { val mainMethod = klass.getMethod("main", new Array[String](0).getClass) if (!Modifier.isStatic(mainMethod.getModifiers)) { throw new IllegalStateException("The main method in the given main class must be static") } val sysProps = conf.getAll.toMap sysProps.foreach { case (k, v) => sys.props(k) = v } mainMethod.invoke(null, args) } }
從JavaMainApplication實現能夠發現,JavaSparkApplication中調用start方法時,只是經過反射執行application-jar.jar的main函數。
當yarn-custer模式中,YarnClusterApplication類中運行的是Client中run方法,Client#run()中實現了任務提交流程:
/** * Submit an application to the ResourceManager. * If set spark.yarn.submit.waitAppCompletion to true, it will stay alive * reporting the application's status until the application has exited for any reason. * Otherwise, the client process will exit after submission. * If the application finishes with a failed, killed, or undefined status, * throw an appropriate SparkException. */ def run(): Unit = { this.appId = submitApplication() if (!launcherBackend.isConnected() && fireAndForget) { val report = getApplicationReport(appId) val state = report.getYarnApplicationState logInfo(s"Application report for $appId (state: $state)") logInfo(formatReportDetails(report)) if (state == YarnApplicationState.FAILED || state == YarnApplicationState.KILLED) { throw new SparkException(s"Application $appId finished with status: $state") } } else { val YarnAppReport(appState, finalState, diags) = monitorApplication(appId) if (appState == YarnApplicationState.FAILED || finalState == FinalApplicationStatus.FAILED) { diags.foreach { err => logError(s"Application diagnostics message: $err") } throw new SparkException(s"Application $appId finished with failed status") } if (appState == YarnApplicationState.KILLED || finalState == FinalApplicationStatus.KILLED) { throw new SparkException(s"Application $appId is killed") } if (finalState == FinalApplicationStatus.UNDEFINED) { throw new SparkException(s"The final status of application $appId is undefined") } } }
其中run的方法流程:
1) 運行submitApplication()初始化yarn,使用yarn進行資源管理,並運行spark任務提交接下來的流程:分配driver container,而後在Driver Containe中啓動ApplicaitonMaster,ApplicationMaster中初始化SparkContext。
2) 狀態成功,上報執行進度等信息。
3) 狀態失敗,報告執行失敗。
其中submitApplication()的實現流程:
/** * Submit an application running our ApplicationMaster to the ResourceManager. * * The stable Yarn API provides a convenience method (YarnClient#createApplication) for * creating applications and setting up the application submission context. This was not * available in the alpha API. */ def submitApplication(): ApplicationId = { var appId: ApplicationId = null try { launcherBackend.connect() yarnClient.init(hadoopConf) yarnClient.start() logInfo("Requesting a new application from cluster with %d NodeManagers" .format(yarnClient.getYarnClusterMetrics.getNumNodeManagers)) // Get a new application from our RM val newApp = yarnClient.createApplication() val newAppResponse = newApp.getNewApplicationResponse() appId = newAppResponse.getApplicationId() new CallerContext("CLIENT", sparkConf.get(APP_CALLER_CONTEXT), Option(appId.toString)).setCurrentContext() // Verify whether the cluster has enough resources for our AM verifyClusterResources(newAppResponse) // Set up the appropriate contexts to launch our AM val containerContext = createContainerLaunchContext(newAppResponse) val appContext = createApplicationSubmissionContext(newApp, containerContext) // Finally, submit and monitor the application logInfo(s"Submitting application $appId to ResourceManager") yarnClient.submitApplication(appContext) launcherBackend.setAppId(appId.toString) reportLauncherState(SparkAppHandle.State.SUBMITTED) appId } catch { case e: Throwable => if (appId != null) { cleanupStagingDir(appId) } throw e } }
這段代碼主要實現向ResourceManager申請資源,啓動Container並運行ApplicationMaster。
其中createContainerLaunchContext(newAppResponse)中對應的啓動主類amClass分支邏輯以下:
val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName }
當yarn-cluster模式下,會先經過Client#run()方法中調用Client#submitApplication()向Yarn的Resource Manager申請一個container,來啓動ApplicationMaster。
啓動ApplicationMaster的執行腳本示例:
[dx@hadoop143 bin]$ps -ef|grep ApplicationMaster # yarn帳戶在執行 /bin/bash -c /usr/java/jdk1.8.0_171-amd64/bin/java \ -server \ -Xmx2048m \ -Djava.io.tmpdir=/mnt/data3/yarn/nm/usercache/dx/appcache/application_1554704591622_0340/container_1554704591622_0340_01_000001/tmp \ -Dspark.yarn.app.container.log.dir=/mnt/data4/yarn/container-logs/application_1554704591622_0340/container_1554704591622_0340_01_000001 \ org.apache.spark.deploy.yarn.ApplicationMaster \ --class 'com.dx.myapp001.Main' \ --jar file:/home/dx/works/myapp001/lib/application-jar.jar \ --arg '-type' \ --arg '0' \ --properties-file /mnt/data3/yarn/nm/usercache/dx/appcache/application_1554704591622_0340/container_1554704591622_0340_01_000001/__spark_conf__/__spark_conf__.properties \ 1> /mnt/data4/yarn/container-logs/application_1554704591622_0340/container_1554704591622_0340_01_000001/stdout \ 2> /mnt/data4/yarn/container-logs/application_1554704591622_0340/container_1554704591622_0340_01_000001/stderr
ApplicaitonMaster啓動過程會經過半生類ApplicationMaster的main做爲入口,執行:
private var master: ApplicationMaster = _ def main(args: Array[String]): Unit = { SignalUtils.registerLogger(log) val amArgs = new ApplicationMasterArguments(args) master = new ApplicationMaster(amArgs) System.exit(master.run()) }
經過ApplicationMasterArguments類對args進行解析,而後將解析後的amArgs做爲master初始化的參數,並執行master#run()方法啓動ApplicationMaster。
在ApplicationMaster類實例化中,ApplicationMaster的屬性包含如下:
private val isClusterMode = args.userClass != null private val sparkConf = new SparkConf() if (args.propertiesFile != null) { Utils.getPropertiesFromFile(args.propertiesFile).foreach { case (k, v) => sparkConf.set(k, v) } } private val securityMgr = new SecurityManager(sparkConf) private var metricsSystem: Option[MetricsSystem] = None // Set system properties for each config entry. This covers two use cases: // - The default configuration stored by the SparkHadoopUtil class // - The user application creating a new SparkConf in cluster mode // // Both cases create a new SparkConf object which reads these configs from system properties. sparkConf.getAll.foreach { case (k, v) => sys.props(k) = v } private val yarnConf = new YarnConfiguration(SparkHadoopUtil.newConfiguration(sparkConf)) private val userClassLoader = { val classpath = Client.getUserClasspath(sparkConf) val urls = classpath.map { entry => new URL("file:" + new File(entry.getPath()).getAbsolutePath()) } if (isClusterMode) { if (Client.isUserClassPathFirst(sparkConf, isDriver = true)) { new ChildFirstURLClassLoader(urls, Utils.getContextOrSparkClassLoader) } else { new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) } } else { new MutableURLClassLoader(urls, Utils.getContextOrSparkClassLoader) } } private val client = doAsUser { new YarnRMClient() } private var rpcEnv: RpcEnv = null // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. private val sparkContextPromise = Promise[SparkContext]()
ApplicationMaster屬性解釋:
ApplicationMaster#run()->ApplicationMaster#runImpl,在ApplicationMaster#runImpl方法中包含如下比較重要分支邏輯:
if (isClusterMode) { runDriver() } else { runExecutorLauncher() }
由於args.userClass不爲null,所以isCusterMode爲true,則執行runDriver()方法。
ApplicationMaster#runDriver以下:
private def runDriver(): Unit = { addAmIpFilter(None) userClassThread = startUserApplication() // This a bit hacky, but we need to wait until the spark.driver.port property has // been set by the Thread executing the user class. logInfo("Waiting for spark context initialization...") val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl)) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() } }
其執行流程以下:
1) 初始化userClassThread=startUserApplication(),運行用戶定義的代碼,經過反射運行application_jar.jar(sparksubmit命令中--class指定的類)的main函數;
2) 初始化SparkContext,經過sparkContextPromise來獲取初始化SparkContext,並設定最大等待時間。
a) 這也充分證明了driver是運行在ApplicationMaster上(SparkContext至關於driver);
b) 該SparkContext的真正初始化是在application_jar.jar的代碼中執行,經過反射執行的。
3) resumeDriver()當初始化SparkContext完成後,恢復用戶線程。
4) userClassThread.join()阻塞方式等待反射application_jar.jar的main執行完成。
在spark-submit任務提交過程當中,當採用spark-submit --master yarn --deploy-mode cluster時,SparkContext(driver)初始化是在ApplicationMaster中子線程中,SparkContext初始化是運行在該
@volatile private var userClassThread: Thread = _
// In cluster mode, used to tell the AM when the user's SparkContext has been initialized. private val sparkContextPromise = Promise[SparkContext]()
線程下
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
sparkContextPromise是怎麼拿到userClassThread(反射執行用戶代碼線程)中的SparkContext的實例呢?
回答:
這個是在SparkContext初始化TaskScheduler時,yarn-cluster模式對應的是YarnClusterScheduler,它裏邊有一個後啓動鉤子:
/** * This is a simple extension to ClusterScheduler - to ensure that appropriate initialization of * ApplicationMaster, etc is done */ private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) { logInfo("Created YarnClusterScheduler") override def postStartHook() { ApplicationMaster.sparkContextInitialized(sc) super.postStartHook() logInfo("YarnClusterScheduler.postStartHook done") } }
調用的ApplicationMaster.sparkContextInitialized()方法把SparkContext實例賦給前面的Promise對象:
private def sparkContextInitialized(sc: SparkContext) = { sparkContextPromise.synchronized { // Notify runDriver function that SparkContext is available sparkContextPromise.success(sc) // Pause the user class thread in order to make proper initialization in runDriver function. sparkContextPromise.wait() } }
而後userClassThread是調用startUserApplication()方法產生的,這以後就是列舉的那一句:
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
這句就是在超時時間內等待sparkContextPromise的Future對象返回SparkContext實例。
其實能夠理解爲下邊這個模擬代碼:
object App { def main(args: Array[String]): Unit = { val userClassThread = startUserApplication() val totalWaitTime = 15000 try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { println("the sc has initialized") val rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => println( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") } finally { resumeDriver() } } /** * Start the user class, which contains the spark driver, in a separate Thread. * If the main routine exits cleanly or exits with System.exit(N) for any N * we assume it was successful, for all other cases we assume failure. * * Returns the user thread that was started. */ private def startUserApplication(): Thread = { val userThread = new Thread { override def run() { try { val conf = new SparkConf().setMaster("local[*]").setAppName("appName") val sc = new SparkContext(conf) sparkContextInitialized(sc) } catch { case e: Exception => sparkContextPromise.tryFailure(e.getCause()) } finally { sparkContextPromise.trySuccess(null) } } } userThread.setName("Driver") userThread.start() userThread } // In cluster mode, used to tell the AM when the user's SparkContext has been initialized. private val sparkContextPromise = Promise[SparkContext]() private def resumeDriver(): Unit = { // When initialization in runDriver happened the user class thread has to be resumed. sparkContextPromise.synchronized { sparkContextPromise.notify() } } private def sparkContextInitialized(sc: SparkContext) = { sparkContextPromise.synchronized { // Notify runDriver function that SparkContext is available sparkContextPromise.success(sc) // Pause the user class thread in order to make proper initialization in runDriver function. sparkContextPromise.wait() } } }
ApplicationMaster#runDriver中邏輯包含內容挺多,所以單獨提到這個小節來說解。
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME) try { val sc = ThreadUtils.awaitResult(sparkContextPromise.future, Duration(totalWaitTime, TimeUnit.MILLISECONDS)) if (sc != null) { rpcEnv = sc.env.rpcEnv val userConf = sc.getConf val host = userConf.get("spark.driver.host") val port = userConf.get("spark.driver.port").toInt registerAM(host, port, userConf, sc.ui.map(_.webUrl)) val driverRef = rpcEnv.setupEndpointRef( RpcAddress(host, port), YarnSchedulerBackend.ENDPOINT_NAME) createAllocator(driverRef, userConf) } else { // Sanity check; should never happen in normal operation, since sc should only be null // if the user app did not create a SparkContext. throw new IllegalStateException("User did not initialize spark context!") } resumeDriver() userClassThread.join() } catch { case e: SparkException if e.getCause().isInstanceOf[TimeoutException] => logError( s"SparkContext did not initialize after waiting for $totalWaitTime ms. " + "Please check earlier log output for errors. Failing the application.") finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_SC_NOT_INITED, "Timed out waiting for SparkContext.") } finally { resumeDriver() }
ü SparkContext初始化過程,經過startUserApplication()反射application_jar.jar(用來代碼)中的main初始化SparkContext。
ü 若是SparkContext初始化成功,就進入:
i. 給rpcEnv賦值爲初始化的SparkContext對象sc的env對象的rpcEnv.
ii. 從sc獲取到userConf(SparkConf),driver host,driver port,sc.ui,並將他們做爲registerAM(註冊ApplicationMaster)的參數。
iii. 根據driver host、driver port和driver rpc server名稱YarnSchedulerBackend.ENDPOINT_NAME獲取到driver的EndpointRef對象driverRef。
iv. 調用createAllocator(driverRef, userConf)
v. resumeDriver() ---SparkContext初始化線程釋放信號量(或者歸還主線程)
vi. userClassThread.join()等待運行application_jar.jar的程序運行完成。
ü 若是SparkContext初始化失敗,則拋出異常throw new IllegalStateException("User did not initialize spark context!")
初始化SparkContext成功後將返回sc(SparkContext實例對象),而後從sc中獲取到userConf(SparkConf),driver host,driver port,sc.ui,並將它們做爲registerAM()方法的參數。其中registerAM()方法就是註冊AM(ApplicationMaster)。
private val client = doAsUser { new YarnRMClient() } private def registerAM( host: String, port: Int, _sparkConf: SparkConf, uiAddress: Option[String]): Unit = { val appId = client.getAttemptId().getApplicationId().toString() val attemptId = client.getAttemptId().getAttemptId().toString() val historyAddress = ApplicationMaster .getHistoryServerAddress(_sparkConf, yarnConf, appId, attemptId) client.register(host, port, yarnConf, _sparkConf, uiAddress, historyAddress) registered = true }
ü client是private val client = doAsUser { new YarnRMClient() }
ü 註冊ApplicationMaster須要調用client#register(..)方法,該方法須要傳入driver host、driver port、historyAddress
ü 在client#register(…)內部是經過org.apache.hadoop.yarn.client.api.AMRMClient#registerApplicationMaster(driverHost, driverPort, trackingUrl)方法來實現向YARN ResourceManager註冊ApplicationMaster的。
上述代碼中client#register(…)client是YarmRMClient實例,register方式具體實現以下:
/** * Handles registering and unregistering the application with the YARN ResourceManager. */ private[spark] class YarnRMClient extends Logging { private var amClient: AMRMClient[ContainerRequest] = _ private var uiHistoryAddress: String = _ private var registered: Boolean = false /** * Registers the application master with the RM. * * @param driverHost Host name where driver is running. * @param driverPort Port where driver is listening. * @param conf The Yarn configuration. * @param sparkConf The Spark configuration. * @param uiAddress Address of the SparkUI. * @param uiHistoryAddress Address of the application on the History Server. */ def register( driverHost: String, driverPort: Int, conf: YarnConfiguration, sparkConf: SparkConf, uiAddress: Option[String], uiHistoryAddress: String): Unit = { amClient = AMRMClient.createAMRMClient() amClient.init(conf) amClient.start() this.uiHistoryAddress = uiHistoryAddress val trackingUrl = uiAddress.getOrElse { if (sparkConf.get(ALLOW_HISTORY_SERVER_TRACKING_URL)) uiHistoryAddress else "" } logInfo("Registering the ApplicationMaster") synchronized { amClient.registerApplicationMaster(driverHost, driverPort, trackingUrl) registered = true } } 。。。 }
運行過程:
1)須要先初始化 AMRMClient[ContainerRequest] 對象amClient並調動amClient#start()啓動;
2)同步方式執行 amClient#registerApplicationMaster(driverHost, driverPort, trackingUrl) 方法,使用YarnRMClient對象向Yarn Resource Manager註冊ApplicationMaster;
3)註冊時,會傳遞一個trackingUrl,記錄的是經過UI方式查看應用程序的運行狀態的地址。
備註:ApplicationMaster 向 ResourceManager 註冊,這樣用戶能夠直接經過ResourceManage查看應用程序的運行狀態,而後它將採用輪詢的方式經過RPC協議爲各個任務申請資源,並監控它們的運行狀態直到運行結束。
接着向下分析ApplicationMaster#runDriver中邏輯,上邊咱們看到SparkContext初始化成功後返回sc對象,並將AM註冊到RM,接下來:
1)根據driver host、driver port和driver rpc server名稱YarnSchedulerBackend.ENDPOINT_NAME獲取到driver的EndpointRef對象driverRef,方便AM與Driver通訊;同時Container中Executor啓動時也傳遞了driverRef的host、port等信息,這樣Executor就能夠經過driver host,dirver port獲取到driverEndpointRef實現:executor與driver之間的RPC通訊。
2)調用createAllocator(driverRef, userConf),使用YarnRMClient對象向RM申請Container資源,並啓動Executor。
在ApplicationMaster中定義了createAllocator(driverRef, userConf)方法以下
private val client = doAsUser { new YarnRMClient() } private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = { val appId = client.getAttemptId().getApplicationId().toString() val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port, CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString // Before we initialize the allocator, let's log the information about how executors will // be run up front, to avoid printing this out for every single executor being launched. // Use placeholders for information that changes such as executor IDs. logInfo { val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt val executorCores = _sparkConf.get(EXECUTOR_CORES) val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>", "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources) dummyRunner.launchContextDebugInfo() } allocator = client.createAllocator( yarnConf, _sparkConf, driverUrl, driverRef, securityMgr, localResources) credentialRenewer.foreach(_.setDriverRef(driverRef)) // Initialize the AM endpoint *after* the allocator has been initialized. This ensures // that when the driver sends an initial executor request (e.g. after an AM restart), // the allocator is ready to service requests. rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef)) allocator.allocateResources() val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr) val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId) ms.registerSource(new ApplicationMasterSource(prefix, allocator)) ms.start() metricsSystem = Some(ms) reporterThread = launchReporterThread() }
上邊這段代碼主要經過client(YarnRMClient對象)建立的YarnAllocator對象allocator來進行container申請,經過ExecutorRunnable來啓動executor,下邊咱們看下具體執行步驟:
1)經過yarn#createAllocator(yarnConf,_sparkConf,driverUrl,driverRef,securityMgr,localResources)建立allocator,該allocator是YarnAllocator的對象
2) 初始化AMEndpoint(它是ApplicationMaster下的一個內部類)對象,用來實現與driver之間rpc通訊。
其中須要注意:AMEndpoint初始化時傳入了dirverRef的,在AMEndpoint的onStart()方法中調用driver.send(RegisterClusterManager(self)),這時driver端接收該信息類是:
3)調用allocator.allocateResources()其內部實現是循環申請container,並經過ExecutorRunnable啓動executor。
4)上報測量數據,allocationThreadImpl()收集錯誤信息並作出響應。
上邊提到AMEndpoint類(它是ApplicationMaster的一個內部類),下邊看下他的具體實現:
/** * An [[RpcEndpoint]] that communicates with the driver's scheduler backend. */ private class AMEndpoint(override val rpcEnv: RpcEnv, driver: RpcEndpointRef) extends RpcEndpoint with Logging { override def onStart(): Unit = { driver.send(RegisterClusterManager(self)) } override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = { case r: RequestExecutors => Option(allocator) match { case Some(a) => if (a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal, r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)) { resetAllocatorInterval() } context.reply(true) case None => logWarning("Container allocator is not ready to request executors yet.") context.reply(false) } case KillExecutors(executorIds) => logInfo(s"Driver requested to kill executor(s) ${executorIds.mkString(", ")}.") Option(allocator) match { case Some(a) => executorIds.foreach(a.killExecutor) case None => logWarning("Container allocator is not ready to kill executors yet.") } context.reply(true) case GetExecutorLossReason(eid) => Option(allocator) match { case Some(a) => a.enqueueGetLossReasonRequest(eid, context) resetAllocatorInterval() case None => logWarning("Container allocator is not ready to find executor loss reasons yet.") } } override def onDisconnected(remoteAddress: RpcAddress): Unit = { // In cluster mode, do not rely on the disassociated event to exit // This avoids potentially reporting incorrect exit codes if the driver fails if (!isClusterMode) { logInfo(s"Driver terminated or disconnected! Shutting down. $remoteAddress") finish(FinalApplicationStatus.SUCCEEDED, ApplicationMaster.EXIT_SUCCESS) } } }
須要來講說這類:
1)在onStart()時,會向driver(SparkContext實例)發送一個RegisterClusterManager(self)請求,該用意用來告知driver,ClusterManger權限交給我,其中driver接收該AM參數代碼在YarnSchedulerBackend(該對象是SparkContext的schedulerBackend屬性)
備註:
YarnClusterSchedulerBackend、YarnSchedulerBackend、CoarseGrainedSchedulerBackend三者之間關係:
YarnClusterSchedulerBackend extends YarnSchedulerBackend
YarnSchedulerBackend extends CoarseGrainedSchedulerBackend
2)在receiveAndReply()方法包含了三種處理:
2.1)RequestExecutors請求分配executor;
2.2)KillExecutors殺掉全部executor;
2.3)GetExecutorLossReason獲取executor丟失緣由。
上邊講到調用ApplicationMaster.allocator.allocateResources()其內部實現是循環申請container,並經過ExecutorRunnable啓動executor。ExecutorRunnable是用來啓動CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend實際上是一個進程,ExecutorRunnable包裝了CoarseGrainedExecutorBackend進程啓動腳本,並提供了經過nmClient(NameNode Client)啓動Conatiner,啓動container時附帶CoarseGrainedExecutorBackend進程啓動腳本。
private[yarn] class ExecutorRunnable( container: Option[Container], conf: YarnConfiguration, sparkConf: SparkConf, masterAddress: String, executorId: String, hostname: String, executorMemory: Int, executorCores: Int, appId: String, securityMgr: SecurityManager, localResources: Map[String, LocalResource]) extends Logging { var rpc: YarnRPC = YarnRPC.create(conf) var nmClient: NMClient = _ def run(): Unit = { logDebug("Starting Executor Container") nmClient = NMClient.createNMClient() nmClient.init(conf) nmClient.start() startContainer() } def launchContextDebugInfo(): String = { val commands = prepareCommand() val env = prepareEnvironment() s""" |=============================================================================== |YARN executor launch context: | env: |${Utils.redact(sparkConf, env.toSeq).map { case (k, v) => s" $k -> $v\n" }.mkString} | command: | ${commands.mkString(" \\ \n ")} | | resources: |${localResources.map { case (k, v) => s" $k -> $v\n" }.mkString} |===============================================================================""".stripMargin } def startContainer(): java.util.Map[String, ByteBuffer] = { val ctx = Records.newRecord(classOf[ContainerLaunchContext]) .asInstanceOf[ContainerLaunchContext] val env = prepareEnvironment().asJava ctx.setLocalResources(localResources.asJava) ctx.setEnvironment(env) val credentials = UserGroupInformation.getCurrentUser().getCredentials() val dob = new DataOutputBuffer() credentials.writeTokenStorageToStream(dob) ctx.setTokens(ByteBuffer.wrap(dob.getData())) val commands = prepareCommand() ctx.setCommands(commands.asJava) ctx.setApplicationACLs( YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava) // If external shuffle service is enabled, register with the Yarn shuffle service already // started on the NodeManager and, if authentication is enabled, provide it with our secret // key for fetching shuffle files later if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) { val secretString = securityMgr.getSecretKey() val secretBytes = if (secretString != null) { // This conversion must match how the YarnShuffleService decodes our secret JavaUtils.stringToBytes(secretString) } else { // Authentication is not enabled, so just provide dummy metadata ByteBuffer.allocate(0) } ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes)) } // Send the start request to the ContainerManager try { nmClient.startContainer(container.get, ctx) } catch { case ex: Exception => throw new SparkException(s"Exception while starting container ${container.get.getId}" + s" on host $hostname", ex) } } private def prepareCommand(): List[String] = { // Extra options for the JVM val javaOpts = ListBuffer[String]() // Set the JVM memory val executorMemoryString = executorMemory + "m" javaOpts += "-Xmx" + executorMemoryString // Set extra Java options for the executor, if defined sparkConf.get(EXECUTOR_JAVA_OPTIONS).foreach { opts => val subsOpt = Utils.substituteAppNExecIds(opts, appId, executorId) javaOpts ++= Utils.splitCommandString(subsOpt).map(YarnSparkHadoopUtil.escapeForShell) } // Set the library path through a command prefix to append to the existing value of the // env variable. val prefixEnv = sparkConf.get(EXECUTOR_LIBRARY_PATH).map { libPath => Client.createLibraryPathPrefix(libPath, sparkConf) } javaOpts += "-Djava.io.tmpdir=" + new Path(Environment.PWD.$$(), YarnConfiguration.DEFAULT_CONTAINER_TEMP_DIR) // Certain configs need to be passed here because they are needed before the Executor // registers with the Scheduler and transfers the spark configs. Since the Executor backend // uses RPC to connect to the scheduler, the RPC settings are needed as well as the // authentication settings. sparkConf.getAll .filter { case (k, v) => SparkConf.isExecutorStartupConf(k) } .foreach { case (k, v) => javaOpts += YarnSparkHadoopUtil.escapeForShell(s"-D$k=$v") } // Commenting it out for now - so that people can refer to the properties if required. Remove // it once cpuset version is pushed out. // The context is, default gc for server class machines end up using all cores to do gc - hence // if there are multiple containers in same node, spark gc effects all other containers // performance (which can also be other spark containers) // Instead of using this, rely on cpusets by YARN to enforce spark behaves 'properly' in // multi-tenant environments. Not sure how default java gc behaves if it is limited to subset // of cores on a node. /* else { // If no java_opts specified, default to using -XX:+CMSIncrementalMode // It might be possible that other modes/config is being done in // spark.executor.extraJavaOptions, so we don't want to mess with it. // In our expts, using (default) throughput collector has severe perf ramifications in // multi-tenant machines // The options are based on // http://www.oracle.com/technetwork/java/gc-tuning-5-138395.html#0.0.0.%20When%20to%20Use // %20the%20Concurrent%20Low%20Pause%20Collector|outline javaOpts += "-XX:+UseConcMarkSweepGC" javaOpts += "-XX:+CMSIncrementalMode" javaOpts += "-XX:+CMSIncrementalPacing" javaOpts += "-XX:CMSIncrementalDutyCycleMin=0" javaOpts += "-XX:CMSIncrementalDutyCycle=10" } */ // For log4j configuration to reference javaOpts += ("-Dspark.yarn.app.container.log.dir=" + ApplicationConstants.LOG_DIR_EXPANSION_VAR) val userClassPath = Client.getUserClasspath(sparkConf).flatMap { uri => val absPath = if (new File(uri.getPath()).isAbsolute()) { Client.getClusterPath(sparkConf, uri.getPath()) } else { Client.buildPath(Environment.PWD.$(), uri.getPath()) } Seq("--user-class-path", "file:" + absPath) }.toSeq YarnSparkHadoopUtil.addOutOfMemoryErrorArgument(javaOpts) val commands = prefixEnv ++ Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++ javaOpts ++ Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend", "--driver-url", masterAddress, "--executor-id", executorId, "--hostname", hostname, "--cores", executorCores.toString, "--app-id", appId) ++ userClassPath ++ Seq( s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout", s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr") // TODO: it would be nicer to just make sure there are no null commands here commands.map(s => if (s == null) "null" else s).toList } private def prepareEnvironment(): HashMap[String, String] = { val env = new HashMap[String, String]() Client.populateClasspath(null, conf, sparkConf, env, sparkConf.get(EXECUTOR_CLASS_PATH)) // lookup appropriate http scheme for container log urls val yarnHttpPolicy = conf.get( YarnConfiguration.YARN_HTTP_POLICY_KEY, YarnConfiguration.YARN_HTTP_POLICY_DEFAULT ) val httpScheme = if (yarnHttpPolicy == "HTTPS_ONLY") "https://" else "http://" System.getenv().asScala.filterKeys(_.startsWith("SPARK")) .foreach { case (k, v) => env(k) = v } sparkConf.getExecutorEnv.foreach { case (key, value) => if (key == Environment.CLASSPATH.name()) { // If the key of env variable is CLASSPATH, we assume it is a path and append it. // This is kept for backward compatibility and consistency with hadoop YarnSparkHadoopUtil.addPathToEnvironment(env, key, value) } else { // For other env variables, simply overwrite the value. env(key) = value } } // Add log urls container.foreach { c => sys.env.get("SPARK_USER").foreach { user => val containerId = ConverterUtils.toString(c.getId) val address = c.getNodeHttpAddress val baseUrl = s"$httpScheme$address/node/containerlogs/$containerId/$user" env("SPARK_LOG_URL_STDERR") = s"$baseUrl/stderr?start=-4096" env("SPARK_LOG_URL_STDOUT") = s"$baseUrl/stdout?start=-4096" } } env } }
ExecutorRunable該類包含如下方法:
1)prepareEnvironment():準備executor運行環境
2)prepareCommand():生成啓動CoarseGrainedExecutorBackend進程啓動腳本
3)startContainer():
1)初始化executor運行環境;
2)生成啓動CoarseGrainedExecutorBackend進程啓動腳本
3)將生成啓動CoarseGrainedExecutorBackend進程啓動腳本附加到container中,並調用nmClient.startContainer(container.get, ctx)實現container啓動,在container中運行啓動CoarseGrainedExecutorBackend進程啓動腳原本啓動executor。
4)launchContextDebugInfo():打印測試日誌
5)run(): 經過NMClient.createNMClient()初始化nmClient ,並啓動nmClient ,並調用startContainer()。
啓動CoarseGrainedExecutorBackend進程腳本示例:
/data3/yarn/nm/usercache/dx/appcache/application_1559203334026_0010/container_1559203334026_0010_01_000003/launch_container.sh
launch_container.sh內容
#!/bin/bash 。。。 exec /bin/bash -c "$JAVA_HOME/bin/java -server -Xmx6144m -Djava.io.tmpdir=$PWD/tmp '-Dspark.driver.port=50365' '-Dspark.network.timeout=10000000' '-Dspark.port.maxRetries=32' -Dspark.yarn.app.container.log.dir=/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003 -XX:OnOutOfMemoryError='kill %p' org.apache.spark.executor.CoarseGrainedExecutorBackend --driver-url spark://CoarseGrainedScheduler@CDH-143:50365 --executor-id 2 --hostname CDH-141 --cores 2 --app-id application_1559203334026_0010 --user-class-path file:$PWD/__app__.jar --user-class-path file:$PWD/spark-sql-kafka-0-10_2.11-2.4.0.jar --user-class-path file:$PWD/spark-avro_2.11-3.2.0.jar --user-class-path file:$PWD/bijection-core_2.11-0.9.5.jar --user-class-path file:$PWD/bijection-avro_2.11-0.9.5.jar 1>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stdout 2>/data4/yarn/container-logs/application_1559203334026_0010/container_1559203334026_0010_01_000003/stderr" 。。。。
和SparkSubmit半生對象、AppllicationMaster半生對象同樣,CoarseGrainedExecutorBackend也包含一個半生對象,一樣也包含了入口main函數。該main函數執行:
def main(args: Array[String]) { var driverUrl: String = null var executorId: String = null var hostname: String = null var cores: Int = 0 var appId: String = null var workerUrl: Option[String] = None val userClassPath = new mutable.ListBuffer[URL]() var argv = args.toList while (!argv.isEmpty) { argv match { case ("--driver-url") :: value :: tail => driverUrl = value argv = tail case ("--executor-id") :: value :: tail => executorId = value argv = tail case ("--hostname") :: value :: tail => hostname = value argv = tail case ("--cores") :: value :: tail => cores = value.toInt argv = tail case ("--app-id") :: value :: tail => appId = value argv = tail case ("--worker-url") :: value :: tail => // Worker url is used in spark standalone mode to enforce fate-sharing with worker workerUrl = Some(value) argv = tail case ("--user-class-path") :: value :: tail => userClassPath += new URL(value) argv = tail case Nil => case tail => // scalastyle:off println System.err.println(s"Unrecognized options: ${tail.mkString(" ")}") // scalastyle:on println printUsageAndExit() } } if (driverUrl == null || executorId == null || hostname == null || cores <= 0 || appId == null) { printUsageAndExit() } run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath) System.exit(0) }
1) 解析ExecutorRunnable傳入的參數;
a) var driverUrl: String = null ---driver的Rpc通訊Url
b) var executorId: String = null ---executor的編號id(通常driver所在executor編號爲0,其餘一次加1,連續的)
c) var hostname: String = null --- executor運行的集羣節點的hostname
d) var cores: Int = 0 ---executor可以使用vcore個數
e) var appId: String = null ---當前應用程序的id
f) var workerUrl: Option[String] = None ---worker UI地址
g) val userClassPath = new mutable.ListBuffer[URL]() ---用戶代碼(當前應用程序)main所在的包和依賴包的路徑列表。
2) 將解析後的參數傳入run方法,執行CoarseGrainedExecutorBackend半生對象的run方法。
private def run( driverUrl: String, executorId: String, hostname: String, cores: Int, appId: String, workerUrl: Option[String], userClassPath: Seq[URL]) { Utils.initDaemon(log) SparkHadoopUtil.get.runAsSparkUser { () => // Debug code Utils.checkHost(hostname) // Bootstrap to fetch the driver's Spark properties. val executorConf = new SparkConf val fetcher = RpcEnv.create( "driverPropsFetcher", hostname, -1, executorConf, new SecurityManager(executorConf), clientMode = true) val driver = fetcher.setupEndpointRefByURI(driverUrl) val cfg = driver.askSync[SparkAppConfig](RetrieveSparkAppConfig) val props = cfg.sparkProperties ++ Seq[(String, String)](("spark.app.id", appId)) fetcher.shutdown() // Create SparkEnv using properties we fetched from the driver. val driverConf = new SparkConf() for ((key, value) <- props) { // this is required for SSL in standalone mode if (SparkConf.isExecutorStartupConf(key)) { driverConf.setIfMissing(key, value) } else { driverConf.set(key, value) } } cfg.hadoopDelegationCreds.foreach { tokens => SparkHadoopUtil.get.addDelegationTokens(tokens, driverConf) } val env = SparkEnv.createExecutorEnv( driverConf, executorId, hostname, cores, cfg.ioEncryptionKey, isLocal = false) env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend( env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env)) workerUrl.foreach { url => env.rpcEnv.setupEndpoint("WorkerWatcher", new WorkerWatcher(env.rpcEnv, url)) } env.rpcEnv.awaitTermination() } }
代碼執行邏輯:
1)經過driverUrl與driver Endpoint創建通訊,向driver需求Spark應用程序的配置信息,並來建立driverConf對象。RetrieveSparkAppConfig類型請求被driver的schedulerBackend屬性接收,接收代碼位置:https://github.com/apache/spark/blob/branch-2.4/core/src/main/scala/org/apache/spark/scheduler/cluster/CoarseGrainedSchedulerBackend.scala
2)經過SparkEnv.createExecutorEnv() 方法建立SparkEnv對象env ,SparkEnv#createExecutorEnv 內部會建立如下幾類組件:RpcEnv,securityManager,broadcastManager,mapOutputTracker,shuffleManager,memoryManager,blockTransferService,blockManagerMaster,blockManager,metricsSystem,outputCommitCorrdinator,outputCommitCoordinatorRef等。
3)經過env.rpcEnv對象開放RPC通訊接口「Executor」,對應RpcEndpoint類型是CoarseGrainedExecutorBackend類。
4)經過workerUrl開發RPC通訊接口「WorkerWatcher」,用來監控worker運行。WorkerWatcher的功能:鏈接到工做進程並在鏈接斷開時終止JVM的端點;提供工做進程及其關聯子進程之間的命運共享。
5)調用env.rpcEnv.awaitTermination()來阻塞程序,直到程序退出。
從該類的定義上能夠看出它是一個RpcEndpoint,所以它是實現RPC通訊數據處理功能類。
private[spark] class CoarseGrainedExecutorBackend( override val rpcEnv: RpcEnv, driverUrl: String, executorId: String, hostname: String, cores: Int, userClassPath: Seq[URL], env: SparkEnv) extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging { private[this] val stopping = new AtomicBoolean(false) var executor: Executor = null @volatile var driver: Option[RpcEndpointRef] = None // If this CoarseGrainedExecutorBackend is changed to support multiple threads, then this may need // to be changed so that we don't share the serializer instance across threads private[this] val ser: SerializerInstance = env.closureSerializer.newInstance() override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)) }(ThreadUtils.sameThread).onComplete { // This is a very fast action so we can use "ThreadUtils.sameThread" case Success(msg) => // Always receive `true`. Just ignore it case Failure(e) => exitExecutor(1, s"Cannot register with driver: $driverUrl", e, notifyDriver = false) }(ThreadUtils.sameThread) } def extractLogUrls: Map[String, String] = { val prefix = "SPARK_LOG_URL_" sys.env.filterKeys(_.startsWith(prefix)) .map(e => (e._1.substring(prefix.length).toLowerCase(Locale.ROOT), e._2)) } override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) } case RegisterExecutorFailed(message) => exitExecutor(1, "Slave registration failed: " + message) case LaunchTask(data) => if (executor == null) { exitExecutor(1, "Received LaunchTask command but executor was null") } else { val taskDesc = TaskDescription.decode(data.value) logInfo("Got assigned task " + taskDesc.taskId) executor.launchTask(this, taskDesc) } case KillTask(taskId, _, interruptThread, reason) => if (executor == null) { exitExecutor(1, "Received KillTask command but executor was null") } else { executor.killTask(taskId, interruptThread, reason) } case StopExecutor => stopping.set(true) logInfo("Driver commanded a shutdown") // Cannot shutdown here because an ack may need to be sent back to the caller. So send // a message to self to actually do the shutdown. self.send(Shutdown) case Shutdown => stopping.set(true) new Thread("CoarseGrainedExecutorBackend-stop-executor") { override def run(): Unit = { // executor.stop() will call `SparkEnv.stop()` which waits until RpcEnv stops totally. // However, if `executor.stop()` runs in some thread of RpcEnv, RpcEnv won't be able to // stop until `executor.stop()` returns, which becomes a dead-lock (See SPARK-14180). // Therefore, we put this line in a new thread. executor.stop() } }.start() case UpdateDelegationTokens(tokenBytes) => logInfo(s"Received tokens of ${tokenBytes.length} bytes") SparkHadoopUtil.get.addDelegationTokens(tokenBytes, env.conf) } override def onDisconnected(remoteAddress: RpcAddress): Unit = { if (stopping.get()) { logInfo(s"Driver from $remoteAddress disconnected during shutdown") } else if (driver.exists(_.address == remoteAddress)) { exitExecutor(1, s"Driver $remoteAddress disassociated! Shutting down.", null, notifyDriver = false) } else { logWarning(s"An unknown ($remoteAddress) driver disconnected.") } } override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) { val msg = StatusUpdate(executorId, taskId, state, data) driver match { case Some(driverRef) => driverRef.send(msg) case None => logWarning(s"Drop $msg because has not yet connected to driver") } } /** * This function can be overloaded by other child classes to handle * executor exits differently. For e.g. when an executor goes down, * back-end may not want to take the parent process down. */ protected def exitExecutor(code: Int, reason: String, throwable: Throwable = null, notifyDriver: Boolean = true) = { val message = "Executor self-exiting due to : " + reason if (throwable != null) { logError(message, throwable) } else { logError(message) } if (notifyDriver && driver.nonEmpty) { driver.get.send(RemoveExecutor(executorId, new ExecutorLossReason(reason))) } System.exit(code) } }
包含的屬性包含4個:
ü stopping ---標記executor運行狀態
ü executor ---存儲當前CoarseGrainedExecutorBackend進程中存儲的Executor對象。
ü driver---存儲與driver交互使用的RpcEndpointRef對象
ü ser---當前序列化使用的序列化工具
包含的方法解釋:
ü onStart():重寫RpcEndpoint的onStart()方法,在該方法rpcEnv.asyncSetupEndpointRefByURI(driverUrl)根據driverUrl異步的方式獲取driverEndpointRef並賦值給drvier屬性,併發送RegisterExecutor(executorId, self, hostname, cores, extractLogUrls)到driver(schedulerBackend)。
在當前提交模式(yarn-cluster)下,實際driver處理該信息的類是CoarseGrainedSchedulerBackend,driver接收到該信息後會調用CoarseGrainedSchedulerBackend#driverEndpoint#receiveAndReply(context: RpcCallContext)作出響應,receiveAndReply方法內部拿到了executorRef,並使用它發送信息executorRef.send(RegisteredExecutor)給executor(CoarseGrainedExecutorBackend的receive方法將接收到並處理)
ü receive():重寫RpcEndpoint的onStart()方法,接收如下消息並處理:
n RegisteredExecutor 接收到driver端已經註冊了executor(註冊時driver保留executorId,executorAddress等信息),此時纔在executor端調用executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)進行executor啓動,executor主要負責執行task,上報task執行狀態,進度,資源佔用狀況等。
n RegisterExecutorFailed(message) 註冊executor失敗
n LaunchTask(data) 加載任務,經過executor去執行 executor.launchTask(this, taskDesc)
n KillTask(taskId, _, interruptThread, reason) 殺掉task任務
n StopExecutor 中止executor
n Shutdown 關閉executor
n UpdateDelegationTokens(tokenBytes) 更新代理token
上邊這些參數類型定義在CoarseGrainedClusterMessages中,這些接收到的消息發送者是driver端SparkContext下的schedulerBackend(CoarseGrainedSchedulerBackend)。
ü onDisconnected(remoteAddress: RpcAddress) :重寫RpcEndpoint的onDisconnected()方法
ü statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer):重寫RpcEndpoint的statusUpdate()方法
ü exitExecutor(code: Int, reason: String, throwable: Throwable = null, notifyDriver: Boolean = true)
CoarseGrainedExecutorBackend在它重寫RpcEndpoint的onStart()方法中,經過driverUrl獲取到了driver的RpcEndpointRef,並給driver發送了請求:
ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
實際上這個接收對象是CoarseGrainedSchedulerBackend,對應的發送類型定義在CoarseGrainedClusterMessages中。
下面看下CoarseGrainedExecutorBackend引用的這個driver端schedulerBackend(CoarseGrainedSchedulerBackend)初始化過程具體過程。
在SparkContext初始化過程當中,會初始化schedulerBackend和taskScheduler
private var _schedulerBackend: SchedulerBackend = _ private var _taskScheduler: TaskScheduler = _ private[spark] def schedulerBackend: SchedulerBackend = _schedulerBackend private[spark] def taskScheduler: TaskScheduler = _taskScheduler private[spark] def taskScheduler_=(ts: TaskScheduler): Unit = { _taskScheduler = ts } // 構造函數中初始化賦值 // Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts
SparkContext中最終初始化schedulerBackend和taskScheduler的類是YarnClusterManager
/** * Cluster Manager for creation of Yarn scheduler and backend */ private[spark] class YarnClusterManager extends ExternalClusterManager { override def canCreate(masterURL: String): Boolean = { masterURL == "yarn" } override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = { sc.deployMode match { case "cluster" => new YarnClusterScheduler(sc) case "client" => new YarnScheduler(sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def createSchedulerBackend(sc: SparkContext, masterURL: String, scheduler: TaskScheduler): SchedulerBackend = { sc.deployMode match { case "cluster" => new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case "client" => new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc) case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn") } } override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) } }
YarnClusterManager#createTaskScheduler(...):在該方法中會根據SparkContext對象的deployMode屬性來進行分支判斷:
1)client時,返回YarnScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnScheduler.scala)實例對象;
2)cluster時,返回YarnClusterScheduler(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterScheduler.scala)實例對象。
YarnClusterManager#createSchedulerBackend(...):在該方法中會根據SparkContext對象的deployMode屬性來進行分支判斷:
1)client時,返回YarnClientSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClientSchedulerBackend.scala)實例對象;
2)cluster時,返回YarnClusterSchedulerBackend(https://github.com/apache/spark/blob/branch-2.4/resource-managers/yarn/src/main/scala/org/apache/spark/scheduler/cluster/YarnClusterSchedulerBackend.scala)實例對象。