spark版本 2.2 源碼連接:github.com/apache/spar…java
在 YARN-Cluster 模式中,當用戶向 YARN 中提交一個應用程序後,YARN 將分兩個階段運行該 應用程序:第一個階段是把 Spark 的 Driver 做爲一個 ApplicationMaster 在 YARN 集羣中先啓 動;第二個階段是由 ApplicationMaster 建立應用程序,而後爲它向 ResourceManager 申請資 源,並啓動 Executor 來運行 Task,同時監控它的整個運行過程,直到運行完成。node
YARN-cluster 的工做流程分爲如下幾個步驟:git
Spark Yarn Client
向 YARN 中提交應用程序,包括 ApplicationMaster
程序、啓動 ApplicationMaster
的命令、須要在 Executor
中運行的程序等;ResourceManager
收到請求後,在集羣中選擇一個 NodeManager
,爲該應用程序分配第 一個 Container
,要求它在這個 Container
中啓動應用程序的 ApplicationMaster
,其中 ApplicationMaster
進行 SparkContext
等的初始化;ApplicationMaster
向 ResourceManager
註冊,這樣用戶能夠直接經過 ResourceManage
查 看應用程序的運行狀態,而後它將採用輪詢的方式經過 RPC 協議爲各個任務申請資源,並 監控它們的運行狀態直到運行結束;ApplicationMaster
申請到資源(也就是 Container
)後,便與對應的 NodeManager
通 信 , 要 求 它 在 獲 得 的 Container
中 啓 動 啓 動 CoarseGrainedExecutorBackend
,CoarseGrainedExecutorBackend
啓動後會向 ApplicationMaster
中的 SparkContext
註冊並申請 Task。這一點和 Standalone 模式同樣,只不過 SparkContext
在 Spark Application
中初始化時, 使用 CoarseGrainedSchedulerBackend
配合 YarnClusterScheduler
進行任務的調度,其中 YarnClusterScheduler
只是對 TaskSchedulerImpl
的一個簡單包裝;ApplicationMaster
中的 SparkContext
分配 Task 給 CoarseGrainedExecutorBackend
執行, CoarseGrainedExecutorBackend
運行 Task 並向 ApplicationMaster
彙報運行的狀態和進度,以 讓 ApplicationMaster
隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務;ApplicationMaster
向 ResourceManager
申請註銷並關閉本身。用戶調用${SPARK_HOME}/bin/spark-submit
腳本提交命令,此時會執行SparkSubmit
類的main函數
,啓動主進程github
exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
複製代碼
SparkSubmit 解析腳本參數, 腳本的 master 參數和deployMode 參數,若是是yarnCluster模式則會將接下來須要啓動的類childMainClass
設置爲org.apache.spark.deploy.yarn.Client
,而且將用戶設置的啓動類mainClass
做爲--class,--jars參數
,含義爲由先申請一個NodeManange做爲ApplicationMaster,並在ApplicationMaster中啓動Driver(用戶設置的主類)web
org.apache.spark.deploy.SparkSubmit 584-603
// In yarn-cluster mode, use yarn.Client as a wrapper around the user class
if (isYarnCluster) {
childMainClass = "org.apache.spark.deploy.yarn.Client"
if (args.isPython) {
childArgs += ("--primary-py-file", args.primaryResource)
childArgs += ("--class", "org.apache.spark.deploy.PythonRunner")
} else if (args.isR) {
val mainFile = new Path(args.primaryResource).getName
childArgs += ("--primary-r-file", mainFile)
childArgs += ("--class", "org.apache.spark.deploy.RRunner")
} else {
if (args.primaryResource != SparkLauncher.NO_RESOURCE) {
childArgs += ("--jar", args.primaryResource)
}
childArgs += ("--class", args.mainClass)
}
if (args.childArgs != null) {
args.childArgs.foreach { arg => childArgs += ("--arg", arg) }
}
}
複製代碼
org.apache.spark.deploy.yarn.Client
經過反射運行腳本參數解析後的childMainClass
類apache
org.apache.spark.deploy.SparkSubmit#main
119:case SparkSubmitAction.SUBMIT => submit(appArgs)
->org.apache.spark.deploy.SparkSubmit:submit
162:runMain(childArgs, childClasspath, sysProps, childMainClass, args.verbose)
->org.apache.spark.deploy.SparkSubmit#runMain
// 注意這裏的childMainClass是org.apache.spark.deploy.yarn.Client,這個Client會向Yarn的RM申請的一個NM,並在NM中啓動AM
// 而後由AM啓動用戶設置的mainClass參數
712:mainClass = Utils.classForName(childMainClass)
739:val mainMethod = mainClass.getMethod("main", new Array[String](0).getClass)
755:mainMethod.invoke(null, childArgs.toArray)
複製代碼
org.apache.spark.deploy.yarn.Client
使用org.apache.hadoop.yarn.client.api.YarnClient
向Yarn提交應用程序api
org.apache.spark.deploy.yarn.Client#main
1150:new Client(args, sparkConf).run()
->org.apache.spark.deploy.yarn.Client#run
1091:this.appId = submitApplication()
org.apache.spark.deploy.yarn.Client:submitApplication
161:val containerContext = createContainerLaunchContext(newAppResponse)
174:yarnClient.submitApplication(appContext)
其中appContext中封裝了啓動ApplicationMaster的命令
org.apache.spark.deploy.yarn.Client#createContainerLaunchContext
887:val userClass =
if (isClusterMode) {
Seq("--class", YarnSparkHadoopUtil.escapeForShell(args.userClass))
} else {
Nil
}
910:val amClass =
if (isClusterMode) {
Utils.classForName("org.apache.spark.deploy.yarn.ApplicationMaster"). getName
} else {
Utils.classForName("org.apache.spark.deploy.yarn.ExecutorLauncher").getName
}
923: val amArgs =
Seq(amClass) ++ userClass ++ userJar ++ primaryPyFile ++ primaryRFile ++ userArgs++
Seq("--properties-file", buildPath(Environment.PWD.$$(),LOCALIZED_CONF_DIR, SPARK_CONF_FILE))
val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++javaOpts ++ amArgs ++
Seq(
"1>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stdout",
"2>", ApplicationConstants.LOG_DIR_EXPANSION_VAR + "/stderr")
複製代碼
根據參數以爲是cluster模式仍是client模式,若是是cluster模式,則啓動用戶最初在執行spark-submit腳本設置的mainClass參數,啓動用戶進程,並等待SparkContext注入(正常狀況下,用戶進程會示例化SparkContext,SparkContext在實例化過程當中經過SchedulerBackend將本身注入到ApplicationMaster中,具體第6小節會介紹)bash
org.apache.spark.deploy.yarn.ApplicationMaster#main
763:master = new ApplicationMaster(amArgs, new YarnRMClient)
System.exit(master.run())
->org.apache.spark.deploy.yarn.ApplicationMaster#run
253:if (isClusterMode) {
runDriver(securityMgr)
} else {
runExecutorLauncher(securityMgr)
}
->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
// 啓動用戶進程
394:userClassThread = startUserApplication()
// 等待SparkContext注入
401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
->org.apache.spark.deploy.yarn.ApplicationMaster#startUserApplication
629:val mainMethod = userClassLoader.loadClass(args.userClass).getMethod("main", classOf[Array[String]])
635:mainMethod.invoke(null, userArgs.toArray)
複製代碼
至此,已經完成了向RM申請第一個NM,並在其上啓動ApplicationMaster和Driver進程的工做,接下來會運行用戶設置的mainClass,當運行到new SparkContext的時候會建立SparkContext實例
,在該過程當中會建立兩個特別重要的對象taskScheduler和schedulerBackend
,這兩個對象協調配合將DAGScheduler建立的task集提交到Executor運行,並實時申請任務資源,監聽task運行狀態markdown
用戶程序中執行new SparkContext()命令
org.apache.spark.SparkContext#new (371-583的try-cache代碼段)
397 // 設置jar包等基本信息
_conf.set(DRIVER_HOST_ADDRESS,_conf.get(DRIVER_HOST_ADDRESS))
_conf.setIfMissing("spark.driver.port", "0")
_conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
_jars = Utils.getUserJars(_conf)
_files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)).toSeq.flatten
428 // 建立LisenterBus 用戶SparkUI的渲染
_jobProgressListener = new JobProgressListener(_conf)
listenerBus.addListener(jobProgressListener)
432 // 建立SparkEnv
_env = createSparkEnv(_conf, isLocal, listenerBus)
SparkEnv.set(_env)
501 // 建立taskScheduler和schedulerBackend
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
540 // 建立動態資源分配管理器,具體會在第9小節介紹
_executorAllocationManager =
if (dynamicAllocationEnabled) {
schedulerBackend match {
case b: ExecutorAllocationClient =>
Some(new ExecutorAllocationManager(
schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf))
case _ =>
None
}
} else {
None
}
_executorAllocationManager.foreach(_.start())
->org.apache.spark.SparkContext#createTaskScheduler
2757 // 這裏若是是yarn模式 masterUrl是yarn,對應的ClusterManager是org.apache.spark.scheduler.cluster.YarnClusterManager
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
// 對應的TaskScheduler 是 org.apache.spark.scheduler.cluster.YarnClusterScheduler
val scheduler = cm.createTaskScheduler(sc, masterUrl)
// 對應的backend是org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
複製代碼
YarnClusterScheduler 主要是經過YarnScheduler繼承了TaskSchedulerImpl(spark的主要實現),這裏子類最重要的功能是將SparkContext注入到ApplicationMaster中(承接第五小節)app
org.apache.spark.scheduler.cluster.YarnClusterManager#createTaskScheduler
32: 經過不一樣的模式啓動不一樣的子類
override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
sc.deployMode match {
case "cluster" => new YarnClusterScheduler(sc)
case "client" => new YarnScheduler(sc)
case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
}
}
org.apache.spark.scheduler.cluster.YarnClusterScheduler#postStartHook
private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
logInfo("Created YarnClusterScheduler")
override def postStartHook() {
ApplicationMaster.sparkContextInitialized(sc)
super.postStartHook()
logInfo("YarnClusterScheduler.postStartHook done")
}
}
複製代碼
YarnClusterSchedulerBackend 也是經過繼承YarnSchedulerBackend繼承了org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend(Spark實現的主類),這裏子類最主要的功能是實現Spark向Yarn申請資源的橋接功能(第9小節詳細介紹)和建立clientEndPoint和driverEndPoint用於通訊
org.apache.spark.scheduler.cluster.YarnClusterManager#createSchedulerBackend
// 根據不一樣的模式實例化不一樣的子類
40:override def createSchedulerBackend(sc: SparkContext,
masterURL: String,
scheduler: TaskScheduler): SchedulerBackend = {
sc.deployMode match {
case "cluster" =>
new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case "client" =>
new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
case _ =>
throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' forYarn")
}
}
-> org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend#start
36:super.start()
-> org.apache.spark.scheduler.cluster.YarnSchedulerBackend#start
// 建立clientEndpoint
52:private val yarnSchedulerEndpoint = new YarnSchedulerEndpoint(rpcEnv)
-> org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receive
case RegisterClusterManager(am) =>
logInfo(s"ApplicationMaster registered as $am")
// 這裏注入amEndpoint,注入時機是第8小節中ApplicationMaster被注入SparkContext後
amEndpoint = Option(am)
if (!shouldResetOnAmRegister) {
shouldResetOnAmRegister = true
} else {
// AM is already registered before, this potentially means that AM failed and
// a new one registered after the failure. This will only happen in yarn-client mode.
reset()
}
86:super.start()
-> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#start
// 建立driverEndpoint
378:driverEndpoint = createDriverEndpointRef(properties)
複製代碼
第6小節最後,經過ApplicationMaster.sparkContextInitialized向ApplicationMaster注入了SparkContext,而後會激活第4小節ApplicationMaster的等待
org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
769:master.sparkContextInitialized(sc)
->org.apache.spark.deploy.yarn.ApplicationMaster#sparkContextInitialized
326:sparkContextPromise.success(sc)
->org.apache.spark.deploy.yarn.ApplicationMaster#runDriver
// 此時sc取到值,開始進行下一步操做
401:val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
// 這裏建立AMEndpoint,並向org.apache.spark.scheduler.cluster.YarnSchedulerBackend注入,對應第7小節
val driverRef = runAMEndpoint(
sc.getConf.get("spark.driver.host"),
sc.getConf.get("spark.driver.port"),
isClusterMode = true)
registerAM(sc.getConf, rpcEnv, driverRef, sc.ui.map(_.webUrl), securityMgr)
}
->org.apache.spark.deploy.yarn.ApplicationMaster#runAMEndpoint
387:amEndpoint =rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverEndpoint, isClusterMode))
681:override def onStart(): Unit = {
driver.send(RegisterClusterManager(self))
}
->org.apache.spark.deploy.yarn.ApplicationMaster#registerAM
// 建立org.apache.spark.deploy.yarn.YarnAllocator
// 這個類YarnAllocator負責從YARN ResourceManager請求容器,並肯定當YARN知足某些請求時如何處理容器
359:allocator = client.register(driverUrl,
driverRef,
yarnConf,
_sparkConf,
uiAddress,
historyAddress,
securityMgr,
localResources)
// 初始化申請資源
368:allocator.allocateResources()
// 建立監聽線程,在任務運行過程當中按需申請資源,線程內也會調用allocator.allocateResources()
369:reporterThread = launchReporterThread()
複製代碼
申請資源主要由org.apache.spark.deploy.yarn.YarnAllocator
完成,由 org.apache.spark.scheduler.cluster.YarnSchedulerBackend
觸發資源的申請,觸發方式爲使用org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors
消息通訊來更新Executor需求數targetNumExecutors
// 主要經過調用這個方法來申請NodeManager,並啓動Executor
org.apache.spark.deploy.yarn.YarnAllocator#allocateResources
// 先經過這個方法獲取須要申請的資源數
260:updateResourceRequests()
-> org.apache.spark.deploy.yarn.YarnAllocator#updateResourceRequests
297:val missing = targetNumExecutors - numPendingAllocate - numExecutorsRunning
//在初始化的時候,targetNumExecutors爲
// spark.dynamicAllocation.minExecutors
// spark.dynamicAllocation.maxExecutors
// spark.dynamicAllocation.initialExecutors 的最大值
109: @volatile private var targetNumExecutors = YarnSparkHadoopUtil.getInitialTargetExecutorNumber(sparkConf)
275: val initialNumExecutors = Utils.getDynamicAllocationInitialExecutors(conf)
-> org.apache.spark.util.Utils#getDynamicAllocationInitialExecutors
2538:val initialExecutors = Seq(
conf.get(DYN_ALLOCATION_MIN_EXECUTORS),
conf.get(DYN_ALLOCATION_INITIAL_EXECUTORS),
conf.get(EXECUTOR_INSTANCES).getOrElse(0)).max
276:handleAllocatedContainers(allocatedContainers.asScala)
-> org.apache.spark.deploy.yarn.YarnAllocator#handleAllocatedContainers
442:runAllocatedContainers(containersToUse)
-> org.apache.spark.deploy.yarn.YarnAllocator#runAllocatedContainers
511:new ExecutorRunnable(
Some(container),
conf,
sparkConf,
driverUrl,
executorId,
executorHostname,
executorMemory,
executorCores,
appAttemptId.getApplicationId.toString,
securityMgr,
localResources
).run()
-> org.apache.spark.deploy.yarn.ExecutorRunnable#run
65:startContainer()
-> org.apache.spark.deploy.yarn.ExecutorRunnable#startContainer
// nodeManager上須要運行的命令
98:val commands = prepareCommand()
// 啓動nodeManager
122:nmClient.startContainer(container.get, ctx)
-> org.apache.spark.deploy.yarn.ExecutorRunnable#prepareCommand
201:val commands = prefixEnv ++
Seq(Environment.JAVA_HOME.$$() + "/bin/java", "-server") ++
javaOpts ++
// 這句話是重點,說明會啓動這個類,具體在第10小節介紹
Seq("org.apache.spark.executor.CoarseGrainedExecutorBackend",
"--driver-url", masterAddress,
"--executor-id", executorId,
"--hostname", hostname,
"--cores", executorCores.toString,
"--app-id", appId) ++
userClassPath ++
Seq(
s"1>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stdout",
s"2>${ApplicationConstants.LOG_DIR_EXPANSION_VAR}/stderr")
複製代碼
在第5小節建立SparkContext的時候有提到建立executorAllocationManager動態資源申請管理器,這個就是當spark.dynamicAllocation.enabled
設置爲true的時候會生效,並在任務運行時動態申請資源
org.apache.spark.ExecutorAllocationManager
經過調用org.apache.spark.ExecutorAllocationClient
的requestTotalExecutors
方法來申請資源
而org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend
實現了這個接口
該類又經過調用子類org.apache.spark.scheduler.cluster.YarnClusterSchedulerBackend
的doRequestTotalExecutors方法最終實現資源的申請
org.apache.spark.SparkContext#new
552:_executorAllocationManager.foreach(_.start())
-> org.apache.spark.ExecutorAllocationManager#start
222:val scheduleTask = new Runnable() {
override def run(): Unit = {
try {
schedule()
} catch {
case ct: ControlThrowable =>
throw ct
case t: Throwable =>
logWarning(s"Uncaught exception in thread ${Thread.currentThread().getName}", t)
}
}
}
// 建立一個定時任務類來調度資源
executor.scheduleWithFixedDelay(scheduleTask, 0, intervalMillis, TimeUnit.MILLISECONDS)
-> org.apache.spark.ExecutorAllocationManager#schedule
281: updateAndSyncNumExecutorsTarget(now)
-> org.apache.spark.ExecutorAllocationManager#updateAndSyncNumExecutorsTarget
// maxNumExecutorsNeeded 經過
// listener.totalPendingTasks(在CoarseGrainedSchedulerBackend的261行觸發更新)
// listener.totalRunningTasks(在DAGScheduler的996行觸發更新)
// spark.executor.cores (用戶自定義,默認1)計算而來,具體計算&更新方法待補充
310: val maxNeeded = maxNumExecutorsNeeded
331: val delta = addExecutors(maxNeeded)
380: val addRequestAcknowledged = testing ||
client.requestTotalExecutors(numExecutorsTarget, localityAwareTasks, hostToLocalTaskCount)
-> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestTotalExecutors
548: doRequestTotalExecutors(numExecutors)
->org.apache.spark.scheduler.cluster.YarnSchedulerBackend#doRequestTotalExecutors
// 構建org.apache.spark.scheduler.cluster.CoarseGrainedClusterMessages.RequestExecutors 消息 ,並使用clientEndpoint發送
138:yarnSchedulerEndpointRef.ask[Boolean](prepareRequestExecutors(requestedTotal))
->org.apache.spark.scheduler.cluster.YarnSchedulerBackend.YarnSchedulerEndpoint#receiveAndReply
// 最終將RequestExecutors 消息轉發給ApplicationMaster
280:case r: RequestExecutors =>
amEndpoint match {
case Some(am) =>
am.ask[Boolean](r).andThen {
case Success(b) => context.reply(b)
case Failure(NonFatal(e)) =>
logError(s"Sending $r to AM was unsuccessful", e)
context.sendFailure(e)
}(ThreadUtils.sameThread)
case None =>
logWarning("Attempted to request executors before the AM has registered!")
context.reply(false)
}
->org.apache.spark.deploy.yarn.ApplicationMaster.AMEndpoint#receiveAndReply
696: a.requestTotalExecutorsWithPreferredLocalities(r.requestedTotal,
r.localityAwareTasks, r.hostToLocalTaskCount, r.nodeBlacklist)
-> org.apache.spark.deploy.yarn.YarnAllocator#requestTotalExecutorsWithPreferredLocalities
// 最終修改了targetNumExecutors,這樣當調用org.apache.spark.deploy.yarn.YarnAllocator#allocateResources的時候就會拿到不同的targetNumExecutors,而requestedTotal的源頭是 org.apache.spark.ExecutorAllocationManager#schedule計算出來的
218: targetNumExecutors = requestedTotal
複製代碼
小疑問,org.apache.spark.SparkContext#requestExecutors
方法也能夠經過調用org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#requestExecutors
進而申請資源,且該方法裏的logInfo(s"Requesting $numAdditionalExecutors additional executor(s) from the cluster manager")
日誌能夠在Spark Driver日誌裏找到,但不知道誰調用但這個方法
第8小節中,申請到資源後,在NodeManager中會啓動org.apache.spark.executor.CoarseGrainedExecutorBackend
進程,該進程和org.apache.spark.executor.CoarseGrainedScheduleBackend
是多對一關係,從名字是能夠當作CoarseGrainedExecutorBackend是Executor的管理器,它會持有一個Executor對象,Executor會經過啓動一個線程池來運行Task,這樣整個流程就串起來了
org.apache.spark.executor.CoarseGrainedExecutorBackend#main
284:run(driverUrl, executorId, hostname, cores, appId, workerUrl, userClassPath)
-> org.apache.spark.executor.CoarseGrainedExecutorBackend#run
226:env.rpcEnv.setupEndpoint("Executor", new CoarseGrainedExecutorBackend(
env.rpcEnv, driverUrl, executorId, hostname, cores, userClassPath, env))
-> org.apache.spark.executor.CoarseGrainedExecutorBackend#onStart
// 向CoarseGrainedSchedulerBackend註冊本身
63:ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
-> org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend.DriverEndpoint#receiveAndReply
// 檢測沒有問題,通知CoarseGrainedExecutorBackend建立Executor
195: executorRef.send(RegisteredExecutor)
-> org.apache.spark.executor.CoarseGrainedExecutorBackend#receive
83: executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
-> org.apache.spark.executor.Executor#new
101:Executors.newCachedThreadPool(threadFactory).asInstanceOf[ThreadPoolExecutor]
複製代碼
至此SparkContext建立流程已經所有完畢,SparkContext建立成功後,就會開始執行用戶代碼,先構建RDD的邏輯關係圖,而後遇到action會切分stage造成物理執行圖,而後經過SparkContext.runJob執行,具體分析可見 github.com/JerryLead/S…