spark 2.1.1html
系統中但願監控spark on yarn任務的執行進度,可是監控過程發現提交任務以後執行進度老是10%,直到執行成功或者失敗,進度會忽然變爲100%,很神奇,apache
下面看spark on yarn任務提交過程:app
spark on yarn提交任務時會把mainClass修改成Clientide
childMainClass = "org.apache.spark.deploy.yarn.Client"oop
spark-submit過程詳見:http://www.javashuo.com/article/p-gzsdbzra-bo.htmlui
下面看Client執行過程:this
org.apache.spark.deploy.yarn.Client編碼
def main(argStrings: Array[String]) { ... val sparkConf = new SparkConf // SparkSubmit would use yarn cache to distribute files & jars in yarn mode, // so remove them from sparkConf here for yarn mode. sparkConf.remove("spark.jars") sparkConf.remove("spark.files") val args = new ClientArguments(argStrings) new Client(args, sparkConf).run() ... def run(): Unit = { this.appId = submitApplication() ... def submitApplication(): ApplicationId = { ... val containerContext = createContainerLaunchContext(newAppResponse) ... private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse) : ContainerLaunchContext = { ... val amClass = if (isClusterMode) { Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName } else { Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName }
這裏調用過程爲Client.main->run->submitApplication->createContainerLaunchContext,而後會設置amClass,最終都會調用到ApplicationMaster,由於ExecutorLauncher內部也是調用ApplicationMaster,以下:spa
org.apache.spark.deploy.yarn.ExecutorLaunchercode
object ExecutorLauncher { def main(args: Array[String]): Unit = { ApplicationMaster.main(args) } }
下面看ApplicationMaster:
org.apache.spark.deploy.yarn.ApplicationMaster
def main(args: Array[String]): Unit = { ... SparkHadoopUtil.get.runAsSparkUser { () => master = new ApplicationMaster(amArgs, new YarnRMClient) System.exit(master.run()) } ... final def run(): Int = { ... if (isClusterMode) { runDriver(securityMgr) } else { runExecutorLauncher(securityMgr) } ... private def registerAM( _sparkConf: SparkConf, _rpcEnv: RpcEnv, driverRef: RpcEndpointRef, uiAddress: String, securityMgr: SecurityManager) = { ... allocator = client.register(driverUrl, driverRef, yarnConf, _sparkConf, uiAddress, historyAddress, securityMgr, localResources) allocator.allocateResources() reporterThread = launchReporterThread() ... private def launchReporterThread(): Thread = { // The number of failures in a row until Reporter thread give up val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES) val t = new Thread { override def run() { var failureCount = 0 while (!finished) { try { if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) { finish(FinalApplicationStatus.FAILED, ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES, s"Max number of executor failures ($maxNumExecutorFailures) reached") } else { logDebug("Sending progress") allocator.allocateResources() } ...
這裏調用過程爲ApplicationMaster.main->run,run中會調用runDriver或者runExecutorLauncher,最終都會調用到registerAM,其中會調用YarnAllocator.allocateResources,而後在launchReporterThread中會啓動一個thread,其中也會不斷調用YarnAllocator.allocateResources,下面看YarnAllocator:
org.apache.spark.deploy.yarn.YarnAllocator
def allocateResources(): Unit = synchronized { updateResourceRequests() val progressIndicator = 0.1f // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container // requests. val allocateResponse = amClient.allocate(progressIndicator)
可見這裏會設置進度爲0.1,即10%,並且是硬編碼,因此spark on yarn的執行進度一直爲10%,因此想監控spark on yarn的任務進度看來是徒勞的;