【原創】大叔經驗分享(19)spark on yarn提交任務以後執行進度老是10%

spark 2.1.1html

系統中但願監控spark on yarn任務的執行進度,可是監控過程發現提交任務以後執行進度老是10%,直到執行成功或者失敗,進度會忽然變爲100%,很神奇,apache

 

 下面看spark on yarn任務提交過程:app

 

spark on yarn提交任務時會把mainClass修改成Clientide

childMainClass = "org.apache.spark.deploy.yarn.Client"oop

spark-submit過程詳見:http://www.javashuo.com/article/p-gzsdbzra-bo.htmlui

 

下面看Client執行過程:this

org.apache.spark.deploy.yarn.Client編碼

  def main(argStrings: Array[String]) {
...
    val sparkConf = new SparkConf
    // SparkSubmit would use yarn cache to distribute files & jars in yarn mode,
    // so remove them from sparkConf here for yarn mode.
    sparkConf.remove("spark.jars")
    sparkConf.remove("spark.files")
    val args = new ClientArguments(argStrings)
    new Client(args, sparkConf).run()
...

  def run(): Unit = {
    this.appId = submitApplication()
...

  def submitApplication(): ApplicationId = {
...
      val containerContext = createContainerLaunchContext(newAppResponse)
...

  private def createContainerLaunchContext(newAppResponse: GetNewApplicationResponse)
    : ContainerLaunchContext = {
...
    val amClass =
      if (isClusterMode) {
        Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster").getName
      } else {
        Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
      }

這裏調用過程爲Client.main->run->submitApplication->createContainerLaunchContext,而後會設置amClass,最終都會調用到ApplicationMaster,由於ExecutorLauncher內部也是調用ApplicationMaster,以下:spa

org.apache.spark.deploy.yarn.ExecutorLaunchercode

object ExecutorLauncher {

  def main(args: Array[String]): Unit = {
    ApplicationMaster.main(args)
  }

}

 

下面看ApplicationMaster:

org.apache.spark.deploy.yarn.ApplicationMaster

  def main(args: Array[String]): Unit = {
...
    SparkHadoopUtil.get.runAsSparkUser { () =>
      master = new ApplicationMaster(amArgs, new YarnRMClient)
      System.exit(master.run())
    }
...

  final def run(): Int = {
...
      if (isClusterMode) {
        runDriver(securityMgr)
      } else {
        runExecutorLauncher(securityMgr)
      }
...

  private def registerAM(
      _sparkConf: SparkConf,
      _rpcEnv: RpcEnv,
      driverRef: RpcEndpointRef,
      uiAddress: String,
      securityMgr: SecurityManager) = {
...
    allocator = client.register(driverUrl,
      driverRef,
      yarnConf,
      _sparkConf,
      uiAddress,
      historyAddress,
      securityMgr,
      localResources)

    allocator.allocateResources()
    reporterThread = launchReporterThread()
...
  private def launchReporterThread(): Thread = {
    // The number of failures in a row until Reporter thread give up
    val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)

    val t = new Thread {
      override def run() {
        var failureCount = 0
        while (!finished) {
          try {
            if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
              finish(FinalApplicationStatus.FAILED,
                ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
                s"Max number of executor failures ($maxNumExecutorFailures) reached")
            } else {
              logDebug("Sending progress")
              allocator.allocateResources()
            }
...

這裏調用過程爲ApplicationMaster.main->run,run中會調用runDriver或者runExecutorLauncher,最終都會調用到registerAM,其中會調用YarnAllocator.allocateResources,而後在launchReporterThread中會啓動一個thread,其中也會不斷調用YarnAllocator.allocateResources,下面看YarnAllocator:

org.apache.spark.deploy.yarn.YarnAllocator

  def allocateResources(): Unit = synchronized {
    updateResourceRequests()

    val progressIndicator = 0.1f
    // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
    // requests.
    val allocateResponse = amClient.allocate(progressIndicator)

可見這裏會設置進度爲0.1,即10%,並且是硬編碼,因此spark on yarn的執行進度一直爲10%,因此想監控spark on yarn的任務進度看來是徒勞的;

相關文章
相關標籤/搜索