在 spark 源碼分析之二 -- SparkContext 的初始化過程 中,第 14 步 和 16 步分別描述了 TaskScheduler的 初始化 和 啓動過程。html
話分兩頭,先說 TaskScheduler的初始化過程web
1 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
其調用了org.apache.spark.SparkContext#createTaskScheduler , 源碼以下:apache
1 /** 2 * Create a task scheduler based on a given master URL. 3 * Return a 2-tuple of the scheduler backend and the task scheduler. 4 */ 5 private def createTaskScheduler( 6 sc: SparkContext, 7 master: String, 8 deployMode: String): (SchedulerBackend, TaskScheduler) = { 9 import SparkMasterRegex._ 10 11 // When running locally, don't try to re-execute tasks on failure. 12 val MAX_LOCAL_TASK_FAILURES = 1 13 14 master match { 15 case "local" => 16 val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) 17 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) 18 scheduler.initialize(backend) 19 (backend, scheduler) 20 21 case LOCAL_N_REGEX(threads) => 22 def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 23 // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. 24 val threadCount = if (threads == "*") localCpuCount else threads.toInt 25 if (threadCount <= 0) { 26 throw new SparkException(s"Asked to run locally with $threadCount threads") 27 } 28 val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) 29 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) 30 scheduler.initialize(backend) 31 (backend, scheduler) 32 33 case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => 34 def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 35 // local[*, M] means the number of cores on the computer with M failures 36 // local[N, M] means exactly N threads with M failures 37 val threadCount = if (threads == "*") localCpuCount else threads.toInt 38 val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) 39 val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) 40 scheduler.initialize(backend) 41 (backend, scheduler) 42 43 case SPARK_REGEX(sparkUrl) => 44 val scheduler = new TaskSchedulerImpl(sc) 45 val masterUrls = sparkUrl.split(",").map("spark://" + _) 46 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 47 scheduler.initialize(backend) 48 (backend, scheduler) 49 50 case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => 51 // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. 52 val memoryPerSlaveInt = memoryPerSlave.toInt 53 if (sc.executorMemory > memoryPerSlaveInt) { 54 throw new SparkException( 55 "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( 56 memoryPerSlaveInt, sc.executorMemory)) 57 } 58 59 val scheduler = new TaskSchedulerImpl(sc) 60 val localCluster = new LocalSparkCluster( 61 numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) 62 val masterUrls = localCluster.start() 63 val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) 64 scheduler.initialize(backend) 65 backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { 66 localCluster.stop() 67 } 68 (backend, scheduler) 69 70 case masterUrl => 71 val cm = getClusterManager(masterUrl) match { 72 case Some(clusterMgr) => clusterMgr 73 case None => throw new SparkException("Could not parse Master URL: '" + master + "'") 74 } 75 try { 76 val scheduler = cm.createTaskScheduler(sc, masterUrl) 77 val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) 78 cm.initialize(scheduler, backend) 79 (backend, scheduler) 80 } catch { 81 case se: SparkException => throw se 82 case NonFatal(e) => 83 throw new SparkException("External scheduler cannot be instantiated", e) 84 } 85 } 86 }
不一樣的實現以下:架構
實例化部分剖析完畢,下半部分重點剖析yarn-client mode 下 TaskScheduler 的啓動過程app
yarn-client 模式下,TaskScheduler的實現是 org.apache.spark.scheduler.cluster.YarnScheduler, TaskSchedulerBackend的實現是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackendasync
在org.apache.spark.SparkContext#createTaskScheduler 方法中,有以下調用:ide
1 case masterUrl => 2 val cm = getClusterManager(masterUrl) match { 3 case Some(clusterMgr) => clusterMgr 4 case None => throw new SparkException("Could not parse Master URL: '" + master + "'") 5 } 6 try { 7 val scheduler = cm.createTaskScheduler(sc, masterUrl) 8 val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) 9 cm.initialize(scheduler, backend) 10 (backend, scheduler) 11 } catch { 12 case se: SparkException => throw se 13 case NonFatal(e) => 14 throw new SparkException("External scheduler cannot be instantiated", e) 15 }
其中的,cm.initialize(scheduler, backend)中的cm 是org.apache.spark.scheduler.cluster.YarnClusterManager,TaskScheduler的實現是 org.apache.spark.scheduler.cluster.YarnScheduler, TaskSchedulerBackend的實現是org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend。YarnClusterManager 的 initialize 方法實現以下:源碼分析
1 override def initialize(scheduler: TaskScheduler, backend: SchedulerBackend): Unit = { 2 scheduler.asInstanceOf[TaskSchedulerImpl].initialize(backend) 3 }
其並無實現 initialize, 父類TaskSchedulerImpl 的實現以下:post
1 def initialize(backend: SchedulerBackend) { 2 this.backend = backend 3 schedulableBuilder = { 4 schedulingMode match { 5 case SchedulingMode.FIFO => 6 new FIFOSchedulableBuilder(rootPool) 7 case SchedulingMode.FAIR => 8 new FairSchedulableBuilder(rootPool, conf) 9 case _ => 10 throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " + 11 s"$schedulingMode") 12 } 13 } 14 schedulableBuilder.buildPools() 15 }
能夠看出,其重要做用就是設置 TaskScheduler 的 TaskSchedulerBackend 引用。ui
調度模式主要有FIFO和FAIR兩種模式。默認是FIFO模式,可使用spark.scheduler.mode 參數來設定。使用建造者模式來建立 Pool 對象。
其中,org.apache.spark.scheduler.FIFOSchedulableBuilder#buildPools是一個空實現,即沒有作任何的操做;而 org.apache.spark.scheduler.FairSchedulableBuilder#buildPools會加載 相應調度分配策略文件;策略文件可使用 spark.scheduler.allocation.file 參數來設定,若是沒有設定會進一步加載默認的 fairscheduler.xml 文件,若是尚未,則不加載。若是有調度池的配置,則根據配置配置調度pool並將其加入到 root 池中。最後初始化 default 池並將其加入到 root 池中。
1 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
首先,_heartbeatReceiver 是一個 RpcEndPointRef 對象,其請求最終會被 HeartbeatReceiver(Endpoint)接收並處理。即org.apache.spark.HeartbeatReceiver#receiveAndReply方法:
1 case TaskSchedulerIsSet => 2 scheduler = sc.taskScheduler 3 context.reply(true)
具體的關於RPC的相關解釋,會在後面有專門的文章篇幅介紹。在這裏就不作過多解釋。 // TODO
啓動TaskScheduler
org.apache.spark.SparkContext 的初始化方法有以下代碼啓動 TaskScheduler:
1 _taskScheduler.start()
yarn-client模式下,運行中調用了 org.apache.spark.scheduler.cluster.YarnScheduler 的 start 方法,它沿用了父類 TaskSchedulerImpl 的實現:
1 override def start() { 2 // 1. 啓動 task scheduler backend 3 backend.start() 4 // 2. 設定 speculationScheduler 定時任務 5 if (!isLocal && conf.getBoolean("spark.speculation", false)) { 6 logInfo("Starting speculative execution thread") 7 speculationScheduler.scheduleWithFixedDelay(new Runnable { 8 override def run(): Unit = Utils.tryOrStopSparkContext(sc) { 9 checkSpeculatableTasks() 10 } 11 }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS) 12 } 13 }
第1步:task scheduler backend 的啓動:org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend#start的方法以下:
1 /** 2 * Create a Yarn client to submit an application to the ResourceManager. 3 * This waits until the application is running. 4 */ 5 override def start() { 6 // 1. 獲取driver 的 host 和 port 7 val driverHost = conf.get("spark.driver.host") 8 val driverPort = conf.get("spark.driver.port") 9 val hostport = driverHost + ":" + driverPort 10 // 2. 設定 driver 的 web UI 地址 11 sc.ui.foreach { ui => conf.set("spark.driver.appUIAddress", ui.webUrl) } 12 13 val argsArrayBuf = new ArrayBuffer[String]() 14 argsArrayBuf += ("--arg", hostport) 15 16 logDebug("ClientArguments called with: " + argsArrayBuf.mkString(" ")) 17 val args = new ClientArguments(argsArrayBuf.toArray) 18 totalExpectedExecutors = SchedulerBackendUtils.getInitialTargetExecutorNumber(conf) 19 // 3. 啓動 deploy client,並切初始化 driverClient 的 Rpc environment,並在該RPC 環境中初始化master 和 driver 的rpc endpoint 20 client = new Client(args, conf) 21 // 4. 將 application id 綁定到 yarn 上 22 bindToYarn(client.submitApplication(), None) 23 24 // SPARK-8687: Ensure all necessary properties have already been set before 25 // we initialize our driver scheduler backend, which serves these properties 26 // to the executors 27 super.start() 28 // 5. 檢查 yarn application的狀態,不能爲 kill, finished等等 29 waitForApplication() 30 // 6. 監控線程 31 monitorThread = asyncMonitorApplication() 32 monitorThread.start() 33 }
重點解釋一下第三步,涉及的源碼步以下:
1 object Client { 2 def main(args: Array[String]) { 3 // scalastyle:off println 4 if (!sys.props.contains("SPARK_SUBMIT")) { 5 println("WARNING: This client is deprecated and will be removed in a future version of Spark") 6 println("Use ./bin/spark-submit with \"--master spark://host:port\"") 7 } 8 // scalastyle:on println 9 new ClientApp().start(args, new SparkConf()) 10 } 11 } 12 13 private[spark] class ClientApp extends SparkApplication { 14 15 override def start(args: Array[String], conf: SparkConf): Unit = { 16 val driverArgs = new ClientArguments(args) 17 18 if (!conf.contains("spark.rpc.askTimeout")) { 19 conf.set("spark.rpc.askTimeout", "10s") 20 } 21 Logger.getRootLogger.setLevel(driverArgs.logLevel) 22 23 val rpcEnv = 24 RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf)) 25 26 val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL). 27 map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME)) 28 rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf)) 29 30 rpcEnv.awaitTermination() 31 } 32 33 }
能夠看到,在Client 的main方法中,初始化了ClientApp 對象,並調用了其 start 方法,在start 方法中, 首先解析了 driver的 參數。而後建立了 driver 端的 RPC environment,而後 根據解析的 master 的信息,初始化 master 的endpointref,而且創建了 client endpoint 並返回 client endpoint ref。
下面繼續看 org.apache.spark.scheduler.cluster.YarnScheduler 的 start 方法 的 第二步方法,首先 spark 推測任務 feature 默認是關閉的,緣由若是有不少任務都延遲了,那麼它會再啓動一個相同的任務,這樣可能會消耗掉全部的資源,對集羣資源和提交到集羣上的任務形成不可控的影響。啓動了一個延遲定時器,定時地執行 checkSpeculatableTasks 方法,以下:
1 // Check for speculatable tasks in all our active jobs. 2 def checkSpeculatableTasks() { 3 var shouldRevive = false 4 synchronized { 5 shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION) // 1. 推測是否應該跑一個新任務 6 } 7 if (shouldRevive) { 8 backend.reviveOffers() // 2. 跑一個新任務 9 } 10 }
其中,第一步推斷任務,有兩個實現一個是Pool 的實現,一個是TaskSetManager 的實現,Pool 會遞歸調用子Pool來獲取 speculatable tasks。若是須要推測,則運行task scheduler backend 的 reviveOffers方法,大體思路以下,首先獲取 executor 上的空閒資源,而後將這些資源分配給 推測的 task,供其使用。
總結,本篇源碼剖析了在Spark Context 啓動過程當中, 以 yarn-client 模式爲例,剖析了task scheduler 是如何啓動的。
其中關於RpcEnv的介紹直接略過了,下一篇會專門講解Spark 中內置的Rpc 機制的總體架構以及其是如何運行的。