Spark 源碼解讀之SparkContext建立過程源碼解讀 (Yarn Cluster模式)

spark版本 2.2 源碼連接:github.com/apache/spar…java

  • 簡介

    在 YARN-Cluster 模式中,當用戶向 YARN 中提交一個應用程序後,YARN 將分兩個階段運行該 應用程序:第一個階段是把 Spark 的 Driver 做爲一個 ApplicationMaster 在 YARN 集羣中先啓 動;第二個階段是由 ApplicationMaster 建立應用程序,而後爲它向 ResourceManager 申請資 源,並啓動 Executor 來運行 Task,同時監控它的整個運行過程,直到運行完成。node

  • 文字說明

    YARN-cluster 的工做流程分爲如下幾個步驟:git

    1. Spark Yarn Client 向 YARN 中提交應用程序,包括 ApplicationMaster 程序、啓動 ApplicationMaster 的命令、須要在 Executor 中運行的程序等;
    2. ResourceManager 收到請求後,在集羣中選擇一個 NodeManager,爲該應用程序分配第 一個 Container,要求它在這個 Container 中啓動應用程序的 ApplicationMaster,其中 ApplicationMaster 進行 SparkContext 等的初始化;
    3. ApplicationMasterResourceManager 註冊,這樣用戶能夠直接經過 ResourceManage 查 看應用程序的運行狀態,而後它將採用輪詢的方式經過 RPC 協議爲各個任務申請資源,並 監控它們的運行狀態直到運行結束;
    4. 一旦 ApplicationMaster 申請到資源(也就是 Container)後,便與對應的 NodeManager 通 信 , 要 求 它 在 獲 得 的 Container 中 啓 動 啓 動 CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend 啓動後會向 ApplicationMaster 中的 SparkContext 註冊並申請 Task。這一點和 Standalone 模式同樣,只不過 SparkContextSpark Application 中初始化時, 使用 CoarseGrainedSchedulerBackend 配合 YarnClusterScheduler 進行任務的調度,其中 YarnClusterScheduler 只是對 TaskSchedulerImpl 的一個簡單包裝;
    5. ApplicationMaster 中的 SparkContext 分配 Task 給 CoarseGrainedExecutorBackend執行, CoarseGrainedExecutorBackend 運行 Task 並向 ApplicationMaster 彙報運行的狀態和進度,以 讓 ApplicationMaster 隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務;
    6. 應用程序運行完成後,ApplicationMasterResourceManager 申請註銷並關閉本身。
  • 圖解

  • 源碼解讀

    1. 提交命令

      用戶調用${SPARK_HOME}/bin/spark-submit 腳本提交命令,此時會執行SparkSubmit 類的main函數,啓動主進程github

      exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
      複製代碼
    2. 解析命令參數

      SparkSubmit 解析腳本參數, 腳本的 master 參數和deployMode 參數,若是是yarnCluster模式則會將接下來須要啓動的類childMainClass設置爲org.apache.spark.deploy.yarn.Client,而且將用戶設置的啓動類mainClass做爲--class,--jars參數,含義爲由先申請一個NodeManange做爲ApplicationMaster,並在ApplicationMaster中啓動Driver(用戶設置的主類)web

      org.apache.spark.deploy.SparkSubmit 584-603
      // In yarn-cluster mode, use yarn.Client as a wrapper around the user class
      if (isYarnCluster) {
        childMainClass = "org.apache.spark.deploy.yarn.Client"
        if (args.isPython) {
          childArgs += ("--primary-py-file", args.primaryResource)
          childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
        } else if (args.isR) {
          val mainFile = new Path(args.primaryResource).getName
          childArgs += ("--primary-r-file", mainFile)
          childArgs += ("--class", "org.apache.spark.deploy.RRunner")
        } else {
          if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
            childArgs += ("--jar", args.primaryResource)
          }
          childArgs += ("--class", args.mainClass)
        }
        if (args.childArgs != null) {
          args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
        }
      }
      複製代碼
    3. 啓動org.apache.spark.deploy.yarn.Client

      經過反射運行腳本參數解析後的childMainClassapache

      org.apache.spark.deploy.SparkSubmit#main
          119:case SparkSubmitAction.SUBMIT => submit(appArgs) 
      ->org.apache.spark.deploy.SparkSubmit:submit
          162:runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
      ->org.apache.spark.deploy.SparkSubmit#runMain
          // 注意這裏的childMainClass是org.apache.spark.deploy.yarn.Client,這個Client會向Yarn的RM申請的一個NM,並在NM中啓動AM
          // 而後由AM啓動用戶設置的mainClass參數
          712:mainClass = Utils.classForName(childMainClass)
          739:val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
          755:mainMethod.invoke(null, childArgs.toArray)
      複製代碼

      org.apache.spark.deploy.yarn.Client 使用org.apache.hadoop.yarn.client.api.YarnClient向Yarn提交應用程序api

      org.apache.spark.deploy.yarn.Client#main
          1150:new Client(args, sparkConf).run()
      ->org.apache.spark.deploy.yarn.Client#run
          1091:this.appId = submitApplication()
      org.apache.spark.deploy.yarn.Client:submitApplication    
          161:val containerContext = createContainerLaunchContext(newAppResponse)
          174:yarnClient.submitApplication(appContext)
          
      其中appContext中封裝了啓動ApplicationMaster的命令
      org.apache.spark.deploy.yarn.Client#createContainerLaunchContext
          887:val userClass =
                  if (isClusterMode) {
                    Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
                  } else {
                    Nil
                  }
          910:val amClass =
                  if (isClusterMode) {
                    Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster"). getName
                  } else {
                    Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
                  }        
          923: val amArgs =
                  Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs++
                  Seq("--properties-file", buildPath(Environment.PWD.$$(),LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
              val commands = prefixEnv ++
                  Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++javaOpts ++ amArgs ++
                  Seq(
                    "1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
                    "2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr") 
      複製代碼
    4. ApplicationMaster&Driver啓動

      根據參數以爲是cluster模式仍是client模式,若是是cluster模式,則啓動用戶最初在執行spark-submit腳本設置的mainClass參數,啓動用戶進程,並等待SparkContext注入(正常狀況下,用戶進程會示例化SparkContext,SparkContext在實例化過程當中經過SchedulerBackend將本身注入到ApplicationMaster中,具體第6小節會介紹)bash

      org.apache.spark.deploy.yarn.ApplicationMaster#main
          763:master = new ApplicationMaster(amArgs, new YarnRMClient)
          System.exit(master.run())
      ->org.apache.spark.deploy.yarn.ApplicationMaster#run 
          253:if (isClusterMode) {
                runDriver(securityMgr)
              } else {
                runExecutorLauncher(securityMgr)
              }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
          // 啓動用戶進程
          394:userClassThread = startUserApplication()
          // 等待SparkContext注入
          401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
          Duration(totalWaitTime, TimeUnit.MILLISECONDS))  
      ->org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication
          629:val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
          635:mainMethod.invoke(null, userArgs.toArray)
      複製代碼
    5. 建立SparkContext

      至此,已經完成了向RM申請第一個NM,並在其上啓動ApplicationMaster和Driver進程的工做,接下來會運行用戶設置的mainClass,當運行到new SparkContext的時候會建立SparkContext實例,在該過程當中會建立兩個特別重要的對象taskScheduler和schedulerBackend,這兩個對象協調配合將DAGScheduler建立的task集提交到Executor運行,並實時申請任務資源,監聽task運行狀態markdown

      用戶程序中執行new SparkContext()命令
      org.apache.spark.SparkContext#new (371-583的try-cache代碼段)
          397 // 設置jar包等基本信息
              _conf.set(DRIVER_HOST_ADDRESS,_conf.get(DRIVER_HOST_ADDRESS))
              _conf.setIfMissing("spark.driver.port", "0")
              _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
              _jars = Utils.getUserJars(_conf)
              _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
          428 // 建立LisenterBus 用戶SparkUI的渲染
              _jobProgressListener = new JobProgressListener(_conf)
              listenerBus.addListener(jobProgressListener)
          432 // 建立SparkEnv
              _env = createSparkEnv(_conf, isLocal, listenerBus)
              SparkEnv.set(_env)
          501 // 建立taskScheduler和schedulerBackend
              val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
              _schedulerBackend = sched
              _taskScheduler = ts
              _dagScheduler = new DAGScheduler(this)
              _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
          540 // 建立動態資源分配管理器,具體會在第9小節介紹
              _executorAllocationManager =
                if (dynamicAllocationEnabled) {
                  schedulerBackend match {
                    case b: ExecutorAllocationClient =>
                      Some(new ExecutorAllocationManager(
                        schedulerBackend.asInstanceOf[ExecutorAllocationClient],     listenerBus, _conf))
                    case _ =>
                      None
                  }
                } else {
                  None
                }
              _executorAllocationManager.foreach(_.start())
      ->org.apache.spark.SparkContext#createTaskScheduler
          2757 // 這裏若是是yarn模式 masterUrl是yarn,對應的ClusterManager是org.apache.spark.scheduler.cluster.YarnClusterManager
              case masterUrl =>
              val cm = getClusterManager(masterUrl) match {
                  case Some(clusterMgr) => clusterMgr
                  case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
              }
              try {
                // 對應的TaskScheduler 是 org.apache.spark.scheduler.cluster.YarnClusterScheduler
                val scheduler = cm.createTaskScheduler(sc, masterUrl)
                // 對應的backend是org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
                val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
                cm.initialize(scheduler, backend)
                (backend, scheduler)
              } catch {
                case se: SparkException => throw se
                case NonFatal(e) =>
                  throw new SparkException("External scheduler cannot be instantiated", e)
              }
      複製代碼
    6. 建立YarnClusterScheduler

      YarnClusterScheduler 主要是經過YarnScheduler繼承了TaskSchedulerImpl(spark的主要實現),這裏子類最重要的功能是將SparkContext注入到ApplicationMaster中(承接第五小節)app

      org.apache.spark.scheduler.cluster.YarnClusterManager#createTaskScheduler
          32: 經過不一樣的模式啓動不一樣的子類
              override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
                  sc.deployMode match {
                    case "cluster" => new YarnClusterScheduler(sc)
                    case "client" => new YarnScheduler(sc)
                    case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
                  }
              }
      org.apache.spark.scheduler.cluster.YarnClusterScheduler#postStartHook
              private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
              
               logInfo("Created YarnClusterScheduler")
      
               override def postStartHook() {
                 
                 ApplicationMaster.sparkContextInitialized(sc)
                 super.postStartHook()
                 logInfo("YarnClusterScheduler.postStartHook done")
               }
      
              }
      複製代碼
    7. 建立 YarnClusterSchedulerBackend

      YarnClusterSchedulerBackend 也是經過繼承YarnSchedulerBackend繼承了org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend(Spark實現的主類),這裏子類最主要的功能是實現Spark向Yarn申請資源的橋接功能(第9小節詳細介紹)和建立clientEndPoint和driverEndPoint用於通訊

      org.apache.spark.scheduler.cluster.YarnClusterManager#createSchedulerBackend
          // 根據不一樣的模式實例化不一樣的子類
          40:override def createSchedulerBackend(sc: SparkContext,
            masterURL: String,
            scheduler: TaskScheduler): SchedulerBackend = {
              sc.deployMode match {
                case "cluster" =>
                  new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
                case "client" =>
                  new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
                case  _ =>
                  throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' forYarn")
              }
          }
          
      -> org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend#start
          36:super.start()
      -> org.apache.spark.scheduler.cluster.YarnSchedulerBackend#start
          // 建立clientEndpoint
          52:private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
              -> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receive
                  case RegisterClusterManager(am) =>
                      logInfo(s"ApplicationMaster registered as $am")
                      // 這裏注入amEndpoint,注入時機是第8小節中ApplicationMaster被注入SparkContext後
                      amEndpoint = Option(am)
                      if (!shouldResetOnAmRegister) {
                        shouldResetOnAmRegister = true
                      } else {
                        // AM is already registered before, this potentially means that      AM   failed and
                        // a new one registered after the failure. This will only    happen in   yarn-client mode.
                        reset()
                      }
          86:super.start()
          -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#start
              // 建立driverEndpoint
              378:driverEndpoint = createDriverEndpointRef(properties)
      複製代碼
    8. SparkContext注入ApplicationMaster後的後續操做

      第6小節最後,經過ApplicationMaster.sparkContextInitialized向ApplicationMaster注入了SparkContext,而後會激活第4小節ApplicationMaster的等待

      org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
          769:master.sparkContextInitialized(sc)
      ->org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
          326:sparkContextPromise.success(sc)
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
          // 此時sc取到值,開始進行下一步操做
          401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
          Duration(totalWaitTime, TimeUnit.MILLISECONDS))
          if (sc != null) {
              rpcEnv = sc.env.rpcEnv
              // 這裏建立AMEndpoint,並向org.apache.spark.scheduler.cluster.YarnSchedulerBackend注入,對應第7小節
              val driverRef = runAMEndpoint(
                sc.getConf.get("spark.driver.host"),
                sc.getConf.get("spark.driver.port"),
                isClusterMode = true)
              registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
          }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#runAMEndpoint
          387:amEndpoint =rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
          681:override def onStart(): Unit = {
              driver.send(RegisterClusterManager(self))
          }
      ->org.apache.spark.deploy.yarn.ApplicationMaster#registerAM
          // 建立org.apache.spark.deploy.yarn.YarnAllocator
          // 這個類YarnAllocator負責從YARN ResourceManager請求容器,並肯定當YARN知足某些請求時如何處理容器
          359:allocator = client.register(driverUrl,
                  driverRef,
                  yarnConf,
                  _sparkConf,
                  uiAddress,
                  historyAddress,
                  securityMgr,
                  localResources)
          // 初始化申請資源
          368:allocator.allocateResources() 
          // 建立監聽線程,在任務運行過程當中按需申請資源,線程內也會調用allocator.allocateResources()
          369:reporterThread = launchReporterThread()
      複製代碼
    9. 申請資源

      申請資源主要由org.apache.spark.deploy.yarn.YarnAllocator 完成,由 org.apache.spark.scheduler.cluster.YarnSchedulerBackend 觸發資源的申請,觸發方式爲使用org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors消息通訊來更新Executor需求數targetNumExecutors

      // 主要經過調用這個方法來申請NodeManager,並啓動Executor
      org.apache.spark.deploy.yarn.YarnAllocator#allocateResources
          // 先經過這個方法獲取須要申請的資源數
          260:updateResourceRequests()
              -> org.apache.spark.deploy.yarn.YarnAllocator#updateResourceRequests
                  297:val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
                  //在初始化的時候,targetNumExecutors爲
                  //  spark.dynamicAllocation.minExecutors
                  //  spark.dynamicAllocation.maxExecutors
                  //  spark.dynamicAllocation.initialExecutors 的最大值
                  109: @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
                  275: val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
                  
              -> org.apache.spark.util.Utils#getDynamicAllocationInitialExecutors
                  2538:val initialExecutors = Seq(
                      conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
                      conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
                      conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
                      
                      
          276:handleAllocatedContainers(allocatedContainers.asScala)
              -> org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers
                  442:runAllocatedContainers(containersToUse)
              -> org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers
                  511:new ExecutorRunnable(
                            Some(container),
                            conf,
                            sparkConf,
                            driverUrl,
                            executorId,
                            executorHostname,
                            executorMemory,
                            executorCores,
                            appAttemptId.getApplicationId.toString,
                            securityMgr,
                            localResources
                          ).run()
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#run
                  65:startContainer()
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer
                  // nodeManager上須要運行的命令
                  98:val commands = prepareCommand()
                  // 啓動nodeManager
                  122:nmClient.startContainer(container.get, ctx)
              -> org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand
                  201:val commands = prefixEnv ++
                      Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
                      javaOpts ++
                      // 這句話是重點,說明會啓動這個類,具體在第10小節介紹
                      Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
                        "--driver-url", masterAddress,
                        "--executor-id", executorId,
                        "--hostname", hostname,
                        "--cores", executorCores.toString,
                        "--app-id", appId) ++
                      userClassPath ++
                      Seq(
                        s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
                        s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
                        
      複製代碼

      在第5小節建立SparkContext的時候有提到建立executorAllocationManager動態資源申請管理器,這個就是當spark.dynamicAllocation.enabled 設置爲true的時候會生效,並在任務運行時動態申請資源

      org.apache.spark.ExecutorAllocationManager 經過調用org.apache.spark.ExecutorAllocationClientrequestTotalExecutors方法來申請資源

      org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend 實現了這個接口

      該類又經過調用子類org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend的doRequestTotalExecutors方法最終實現資源的申請

      org.apache.spark.SparkContext#new
          552:_executorAllocationManager.foreach(_.start())
      -> org.apache.spark.ExecutorAllocationManager#start
          222:val scheduleTask = new Runnable() {
                override def run(): Unit = {
                  try {
                    schedule()
                  } catch {
                    case ct: ControlThrowable =>
                      throw ct
                    case t: Throwable =>
                      logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
                  }
                }
              }
              // 建立一個定時任務類來調度資源
              executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
      -> org.apache.spark.ExecutorAllocationManager#schedule
          281: updateAndSyncNumExecutorsTarget(now)
      -> org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget
          // maxNumExecutorsNeeded 經過
          // listener.totalPendingTasks(在CoarseGrainedSchedulerBackend的261行觸發更新)
          // listener.totalRunningTasks(在DAGScheduler的996行觸發更新)
          // spark.executor.cores (用戶自定義,默認1)計算而來,具體計算&更新方法待補充
          310: val maxNeeded = maxNumExecutorsNeeded
          331: val delta = addExecutors(maxNeeded)
          380: val addRequestAcknowledged = testing ||
        client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
          
      -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestTotalExecutors
          548: doRequestTotalExecutors(numExecutors)
      
      ->org.apache.spark.scheduler.cluster.YarnSchedulerBackend#doRequestTotalExecutors
          // 構建org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors 消息 ,並使用clientEndpoint發送
          138:yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
      
      ->org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receiveAndReply 
          // 最終將RequestExecutors 消息轉發給ApplicationMaster
          280:case r: RequestExecutors =>
                 amEndpoint match {
                   case Some(am) =>
                     am.ask[Boolean](r).andThen {
                       case Success(b) => context.reply(b)
                       case Failure(NonFatal(e)) =>
                         logError(s"Sending $r to AM was unsuccessful", e)
                         context.sendFailure(e)
                     }(ThreadUtils.sameThread)
                   case None =>
                     logWarning("Attempted to request executors before the AM has registered!")
                     context.reply(false)
                 }
      ->org.apache.spark.deploy.yarn.ApplicationMaster.AMEndpoint#receiveAndReply
          696: a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
                r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)
      -> org.apache.spark.deploy.yarn.YarnAllocator#requestTotalExecutorsWithPreferredLocalities
          // 最終修改了targetNumExecutors,這樣當調用org.apache.spark.deploy.yarn.YarnAllocator#allocateResources的時候就會拿到不同的targetNumExecutors,而requestedTotal的源頭是 org.apache.spark.ExecutorAllocationManager#schedule計算出來的
          218: targetNumExecutors = requestedTotal
      複製代碼

      小疑問,org.apache.spark.SparkContext#requestExecutors方法也能夠經過調用org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestExecutors進而申請資源,且該方法裏的logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")日誌能夠在Spark Driver日誌裏找到,但不知道誰調用但這個方法

    1. org.apache.spark.executor.CoarseGrainedExecutorBackend 類

      第8小節中,申請到資源後,在NodeManager中會啓動org.apache.spark.executor.CoarseGrainedExecutorBackend 進程,該進程和org.apache.spark.executor.CoarseGrainedScheduleBackend是多對一關係,從名字是能夠當作CoarseGrainedExecutorBackend是Executor的管理器,它會持有一個Executor對象,Executor會經過啓動一個線程池來運行Task,這樣整個流程就串起來了

      org.apache.spark.executor.CoarseGrainedExecutorBackend#main
          284:run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#run
          226:env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
          env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart
          // 向CoarseGrainedSchedulerBackend註冊本身
          63:ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
      -> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply 
          // 檢測沒有問題,通知CoarseGrainedExecutorBackend建立Executor
          195: executorRef.send(RegisteredExecutor)
      -> org.apache.spark.executor.CoarseGrainedExecutorBackend#receive
          83: executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
      -> org.apache.spark.executor.Executor#new
          101:Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
      複製代碼
    2. 至此SparkContext建立流程已經所有完畢,SparkContext建立成功後,就會開始執行用戶代碼,先構建RDD的邏輯關係圖,而後遇到action會切分stage造成物理執行圖,而後經過SparkContext.runJob執行,具體分析可見 github.com/JerryLead/S…

相關文章
相關標籤/搜索