Spark的應用程序是經過spark-submit提交到Spark集羣上運行的,那麼spark-submit到底提交了什麼,集羣是怎樣調度運行的,下面一一詳解。java
0. spark-submit提交任務apache
0.1 啓動腳本解析app
分析spark-submit腳本源碼可知最終該命令執行./bin/spark-class的Java類腳本,./bin/spark-class腳本啓動的類是org.apache.spark.launcher.Main,在spark-submit模式下該類會啓動SparkSubmitCommandBuilder.java類,最終調用package org.apache.spark.deploy.SparkSubmit.scala。dom
/** * Main gateway of launching a Spark application. 啓動Spark應用程序的主要入口 * * This program handles setting up the classpath with relevant Spark dependencies and provides 處理與Spark依賴相關的類路徑設置 * a layer over the different cluster managers and deploy modes that Spark supports. 在Spark支持的不一樣部署模式下提供一個抽象層,封裝了不一樣集羣模式的差別 */ object SparkSubmit {
0.2 任務解析--運行提交任務時設置的主類async
根據不一樣的集羣模式,任務會在不一樣的節點上進行解析。ide
a. CLIENT模式(本地模式),此時任務設置的主類直接在提交節點運行。fetch
b. CLUSTER模式,此時會由集羣調度,在集羣分配的節點上運行任務設置的主類。(具體分配的細節略過)ui
1. 千呼萬喚始出來--SparkContext this
/** * Main entry point for Spark functionality. A SparkContext represents the connection to a Spark SparkContext是Spark集羣的主入口,負責與集羣創建鏈接, * cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. 同時能夠建立RDD,累加器和廣播變量。 * * Only one SparkContext may be active per JVM. You must `stop()` the active SparkContext before Spark中每個運行的JVM只有一個SparkContext實例。 * creating a new one. This limitation may eventually be removed; see SPARK-2243 for more details. * * @param config a Spark Config object describing the application configuration. Any settings in * this config overrides the default configs as well as system properties. */ class SparkContext(config: SparkConf) extends Logging {
在Spark中,SparkContext負責與集羣進行通訊、資源的申請、任務的分配和監控等。能夠理解爲一個SparkContext實例對應一個Spark Driver Program(Spark應用程序),存在於任務的整個生命週期中。spa
開發Spark應用程序首先須要獲取SparkContext對象,SparkSession和StreamingContext在實例時前都會先建立SparkContext對象。
def getOrCreate(): SparkSession = synchronized { ... val sparkContext = userSuppliedContext.getOrElse { // set app name if not given val randomAppName = java.util.UUID.randomUUID().toString val sparkConf = new SparkConf() options.foreach { case (k, v) => sparkConf.set(k, v) } if (!sparkConf.contains("spark.app.name")) { sparkConf.setAppName(randomAppName) } val sc = SparkContext.getOrCreate(sparkConf)
private[streaming] val sc: SparkContext = { if (_sc != null) { _sc } else if (isCheckpointPresent) { SparkContext.getOrCreate(_cp.createSparkConf()) } else { throw new SparkException("Cannot create StreamingContext without a SparkContext") } }
SparkContext包含四大核心對象:DAGScheduler,TaskScheduler,SchedulerBackend,MapOutputTaskMaster,四大核心對象後面詳述。
SchedulerBackend是一個traint,根據不一樣運行方法,實例化不一樣的對象。以StandaloneSchedulerBackend爲例,主要有三大功能:
1. 負責與Master通訊,註冊當前程序RegisterWithMaster;
2.接收集羣中爲當前應用程序分配的計算資源Excutor的註冊並管理Executors;
3.負責發送Task到具體的Executor執行。
1.1 SparkContext在實例化過程
1. SparkContext實例化時,全部不在方法中的成員都會被實例化。createTaskScheduler位於SparkContext的代碼塊中,在實例化時會被執行;
2. createTaskScheduler會根據不一樣的集羣類型返回對應的二元組(SchedulerBackend,TaskScheduler),以Standalone爲例,返回(StandaloneSchedulerBackend,TaskSchedulerImpl);
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start()
3. 執行_taskScheduler.start()方法,該方法中調用SchedulerBackend(此處爲StandaloneSchedulerBackend)的start方法;
4. 在StandaloneSchedulerBackend的start方法中將任務信息封裝,調用StandaloneAppClient的start方法(此處僅註冊任務信息,並不會提交任務);
// Start executors with a few necessary configs for registering with the scheduler val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf) val javaOpts = sparkJavaOpts ++ extraJavaOpts val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend", args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts) val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("") val coresPerExecutor = conf.getOption("spark.executor.cores").map(_.toInt)
...
client = new StandaloneAppClient(sc.env.rpcEnv, masters, appDesc, this, conf)
client.start()
5. StandaloneAppClient(向Master註冊任務的客戶端,並非Driver)的方法會建立ClientEndPoint對象,調用本身的onStart方法,onStart中會調用registerWithMaster方法向Master註冊;
2. Master註冊任務信息
Master中的receive方法接收到StandaloneAppClient的註冊信息,主要完成如下工做:
1. 註冊應用信息app
2. 將註冊成功的信息driver.send(RegisteredApplication(app.id, self))回傳給請求註冊的客戶端StandaloneAppClient(此時已經完成了任務的註冊)
3.調用schedule方法,爲任務分配資源
case RegisterApplication(description, driver) => // TODO Prevent repeated registrations from some driver if (state == RecoveryState.STANDBY) { // ignore, don't send response } else { logInfo("Registering app " + description.name) val app = createApplication(description, driver) registerApplication(app) logInfo("Registered app " + description.name + " with ID " + app.id) persistenceEngine.addApplication(app) driver.send(RegisteredApplication(app.id, self)) schedule() }
schedule方法主要進行driver和worker的資源分配,具體的Dirver和Executor運行在Worker中;將Worker資源分配給任務並啓動對應的Dirver和Executors執行任務。
/** * Schedule the currently available resources among waiting apps. This method will be called * every time a new app joins or resource availability changes. */ private def schedule(): Unit = { if (state != RecoveryState.ALIVE) { return } // Drivers take strict precedence over executors val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE)) val numWorkersAlive = shuffledAliveWorkers.size var curPos = 0 for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers // We assign workers to each waiting driver in a round-robin fashion. For each driver, we // start from the last worker that was assigned a driver, and continue onwards until we have // explored all alive workers. var launched = false var numWorkersVisited = 0 while (numWorkersVisited < numWorkersAlive && !launched) { val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) { launchDriver(worker, driver) //加載Driver waitingDrivers -= driver launched = true } curPos = (curPos + 1) % numWorkersAlive } } startExecutorsOnWorkers() //啓動Workers }
3. Worker中啓動Driver
3.1 Driver啓動過程
Master發送LaunchDriver消息給Worker,Worker收到消息後建立一個DriverRunner對象,建立完成後調用該對象的start方法啓動。
case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") val driver = new DriverRunner
start方法最後會調用prepareAndRunDriver經過runDriver啓動一個Driver進程。
downloadUserJar方法將用戶的jar包下載到本地。
private[worker] def prepareAndRunDriver(): Int = { val driverDir = createWorkingDirectory() val localJarFilename = downloadUserJar(driverDir) def substituteVariables(argument: String): String = argument match { case "{{WORKER_URL}}" => workerUrl case "{{USER_JAR}}" => localJarFilename case other => other } // TODO: If we add ability to submit multiple jars they should also be added here val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager, driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables) runDriver(builder, driverDir, driverDesc.supervise) }
3.2 關於Driver的幾個問題
a. Driver指的是提交的應用程序,以下在DriverDescription中的定義。
override def toString: String = s"DriverDescription (${command.mainClass})"
b. Driver是在Worker中加載運行的,具體在哪一個Worker上運行,由Master分配,worker.endpoint.send(LaunchDriver(driver.id, driver.desc))。
private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { logInfo("Launching driver " + driver.id + " on worker " + worker.id) worker.addDriver(driver) driver.worker = Some(worker) worker.endpoint.send(LaunchDriver(driver.id, driver.desc)) driver.state = DriverState.RUNNING }
c. Driver不是提交的應用程序,Driver是在Worker中實例化的DriverRunner對象,是Worker中一個獨立運行的進程,負責管理該Driver的execution以及失敗重啓。
/** * Manages the execution of one driver, including automatically restarting the driver on failure. * This is currently only used in standalone cluster deploy mode. */ private[deploy] class DriverRunner(
d. 從Master的註冊應用的過程及schedule方法中可知,一個應用程序對應一個Driver,能夠有多個Worker(Executors)。
4. Worker中啓動Executor
Master中schedule方法調用startExecutorsOnWorkers最終會發送消息LaunchExecutor給Worker,經過val manager = new ExecutorRunner實例化一個ExecutorRunner對象來運行任務。
ExecutorRunner方法中會調用fetchAndRunExecutor建立一個新的進程來執行任務。
/** * Download and run the executor described in our ApplicationDescription */ private def fetchAndRunExecutor() { try { // Launch the process val builder = CommandUtils.buildProcessBuilder(appDesc.command, new SecurityManager(conf),
** appDesc.command爲StandaloneSchedulerBackend傳入的command,即Worker節點中啓動ExecutorRunner時,ExecutorRunner中會啓動CoarseGrainedExecutorBackend進程
val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
在CoarseGrainedExecutorBackend的onStart方法中,會向Driver發送RedisterExecutor的註冊請求。
override def onStart() { logInfo("Connecting to driver: " + driverUrl) rpcEnv.asyncSetupEndpointRefByURI(driverUrl).flatMap { ref => // This is a very fast action so we can use "ThreadUtils.sameThread" driver = Some(ref) ref.ask[Boolean](RegisterExecutor(executorId, self, hostname, cores, extractLogUrls))
Driver處理完註冊請求後會返回註冊結果給CoarseGrainedExecutorBackend,註冊成功後CoarseGrainedExecutorBackend會新建一個Executor執行器,至此Executor建立完成。
override def receive: PartialFunction[Any, Unit] = { case RegisteredExecutor => logInfo("Successfully registered with driver") try { executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false) } catch { case NonFatal(e) => exitExecutor(1, "Unable to create executor due to " + e.getMessage, e) }
* 每一個Worker上能夠啓動多個Executors,每一個Executor是一個獨立的進程。
* 下圖爲用戶提交Spark任務的流程圖