SparkContext能夠說是Spark應用的發動機引擎,Spark Drive的初始化圍繞這SparkContext的初始化。node
sparkcontxt的主要組成部分web
下面學習SparkContext的初始化過程算法
// This function allows components created by SparkEnv to be mocked in unit tests: private[spark] def createSparkEnv( conf: SparkConf, isLocal: Boolean, listenerBus: LiveListenerBus): SparkEnv = { SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master, conf)) }
private[spark] def env: SparkEnv = _env
先是建立createSparkEnv()方法,調用了createDriverEnv()apache
/* ------------------------------------------------------------------------------------- * | Initialization. This code initializes the context in a manner that is exception-safe. | | All internal fields holding state are initialized here, and any error prompts the | | stop() method to be called. | * ------------------------------------------------------------------------------------- */ private def warnSparkMem(value: String): String = { logWarning("Using SPARK_MEM to set amount of memory to use per executor process is " + "deprecated, please use spark.executor.memory instead.") value } /** Control our logLevel. This overrides any user-defined log settings. * @param logLevel The desired log level as a string. * Valid log levels include: ALL, DEBUG, ERROR, FATAL, INFO, OFF, TRACE, WARN */ def setLogLevel(logLevel: String) { // let's allow lowercase or mixed case too val upperCased = logLevel.toUpperCase(Locale.ROOT) require(SparkContext.VALID_LOG_LEVELS.contains(upperCased), s"Supplied level $logLevel did not match one of:" + s" ${SparkContext.VALID_LOG_LEVELS.mkString(",")}") Utils.setLogLevel(org.apache.log4j.Level.toLevel(upperCased)) } try { _conf = config.clone() _conf.validateSettings() if (!_conf.contains("spark.master")) { throw new SparkException("A master URL must be set in your configuration") } if (!_conf.contains("spark.app.name")) { throw new SparkException("An application name must be set in your configuration") } // log out spark.app.name in the Spark driver logs logInfo(s"Submitted application: $appName") // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") } if (_conf.getBoolean("spark.logConf", false)) { logInfo("Spark configuration:\n" + _conf.toDebugString) } // Set Spark driver host and port system properties. This explicitly sets the configuration // instead of relying on the default value of the config constant. _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) _conf.setIfMissing("spark.driver.port", "0") _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) _jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten _eventLogDir = if (isEventLogEnabled) { val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) .stripSuffix("/") Some(Utils.resolveURI(unresolvedDir)) } else { None } _eventLogCodec = { val compress = _conf.getBoolean("spark.eventLog.compress", false) if (compress && isEventLogEnabled) { Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) } else { None } } _listenerBus = new LiveListenerBus(_conf) // Initialize the app status store and listener before SparkEnv is created so that it gets // all events. _statusStore = AppStatusStore.createLiveStore(conf) listenerBus.addToStatusQueue(_statusStore.listener.get) // Create the Spark execution environment (cache, map output tracker, etc) _env = createSparkEnv(_conf, isLocal, listenerBus) SparkEnv.set(_env)
由於SparkEnv的不少組件都向LiveListenerBus的事件總線隊列中投遞事件,因此首先建立的LiveListenerBus,這個類主要功能以下緩存
SparkUI涉及太多組件,這裏暫時不深刻剖析,後續單獨剖析。下面是建立SparkUI的代碼性能優化
_statusTracker = new SparkStatusTracker(this, _statusStore) _progressBar = if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { Some(new ConsoleProgressBar(this)) } else { None } _ui = if (conf.getBoolean("spark.ui.enabled", true)) { Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", startTime)) } else { // For tests, do not enable the UI None } // Bind the UI before starting the task scheduler to communicate // the bound port to the cluster manager properly _ui.foreach(_.bind())
若是是local模式,Driver和executor再同一節點,能夠直接使用本地交互。出現異常能夠方便知道。架構
當再生產環境中時,每每Executor和Driver是在不一樣節點上啓動的,所以,Driver爲了可以掌控Executor,在Driver中建立了心跳接收器。app
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) _heartbeatReceiver = env.rpcEnv.setupEndpoint( HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
代碼用了SparkEnv的子組件NettyRpcEnv的setupEndpoint()方法,運維
該方法的做用:是向RpcEnv的Dispatcher註冊HeartbeatReceiver,並返回HeartbeatReceiver的NettyRpcEndPointRef的引用。異步
TaskScheduler負責請求集羣管理器給應用程序分配並運行Executor(一級調度)和給任務分配Executor並運行任務(二級調度),能夠看做是任務調度的客戶端。
DAGScheduler主要在任務正式交給TaskSchedulerImp提交前的準備工做,包括建立Job、將DAG的RDD劃分到不一樣的stage、提交Stage等。
// Create and start the scheduler val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) _schedulerBackend = sched _taskScheduler = ts _dagScheduler = new DAGScheduler(this) _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's // constructor _taskScheduler.start()
createTaskScheduler()方法返回Scheduler和TaskScheduler的對偶(此處補scala的知識),表示SparkContext的_taskScheduler已經有了TAskScheduler的引用,HeartbeatReceiver接收到TaskSchedulerIsSet消息後將獲取sparkContext的
_taskScheduler屬性設置到自身的scheduler屬性中。
/** * Create a task scheduler based on a given master URL. * Return a 2-tuple of the scheduler backend and the task scheduler. */ private def createTaskScheduler( sc: SparkContext, master: String, deployMode: String): (SchedulerBackend, TaskScheduler) = { import SparkMasterRegex._ // When running locally, don't try to re-execute tasks on failure. val MAX_LOCAL_TASK_FAILURES = 1 master match { case "local" => val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_REGEX(threads) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*] estimates the number of cores on the machine; local[N] uses exactly N threads. val threadCount = if (threads == "*") localCpuCount else threads.toInt if (threadCount <= 0) { throw new SparkException(s"Asked to run locally with $threadCount threads") } val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case LOCAL_N_FAILURES_REGEX(threads, maxFailures) => def localCpuCount: Int = Runtime.getRuntime.availableProcessors() // local[*, M] means the number of cores on the computer with M failures // local[N, M] means exactly N threads with M failures val threadCount = if (threads == "*") localCpuCount else threads.toInt val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true) val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount) scheduler.initialize(backend) (backend, scheduler) case SPARK_REGEX(sparkUrl) => val scheduler = new TaskSchedulerImpl(sc) val masterUrls = sparkUrl.split(",").map("spark://" + _) val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) (backend, scheduler) case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) => // Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang. val memoryPerSlaveInt = memoryPerSlave.toInt if (sc.executorMemory > memoryPerSlaveInt) { throw new SparkException( "Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format( memoryPerSlaveInt, sc.executorMemory)) } val scheduler = new TaskSchedulerImpl(sc) val localCluster = new LocalSparkCluster( numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf) val masterUrls = localCluster.start() val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls) scheduler.initialize(backend) backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => { localCluster.stop() } (backend, scheduler) case masterUrl => val cm = getClusterManager(masterUrl) match { case Some(clusterMgr) => clusterMgr case None => throw new SparkException("Could not parse Master URL: '" + master + "'") } try { val scheduler = cm.createTaskScheduler(sc, masterUrl) val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler) cm.initialize(scheduler, backend) (backend, scheduler) } catch { case se: SparkException => throw se case NonFatal(e) => throw new SparkException("External scheduler cannot be instantiated", e) } } }
BlockManager是SparkEnv的組件之一,囊括了spark存儲體系的全部組件和功能,是存儲體系最重要的組件。spark的存儲體系後續學習。
_applicationId = _taskScheduler.applicationId()
_env.blockManager.initialize(_applicationId)
spark在監控方面有本身的一套體系,一個系統的監控功能可豐富可測試性、性能優化、運維評估、數據統計等。spark的度量系統使用的是codahale提供的第三方倉庫Metrics。
spark的度量系統的三個重要概念:
metricsSystem對Source和Sink進行封裝,將Source的數據輸出到不一樣的Sink。
metricsSystem是SparkEnv內部組件之一,是整個spark應用程序的度量系統。
// The metrics system for Driver need to be set spark.app.id to app ID. // So it should start after we get app ID from the task scheduler and set spark.app.id. _env.metricsSystem.start() // Attach the driver metrics servlet handler to the web ui after the metrics system is started. _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
將系統的ServletContextHandler添加到SparkUI中。
_eventLogger = if (isEventLogEnabled) { val logger = new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, _conf, _hadoopConfiguration) logger.start() listenerBus.addToEventLogQueue(logger) Some(logger) } else { None }
ExecutorAllocationManager是基於工做負載動態分配和刪除Executor的代理。
它內部會定時根據工做負載計算所需的Executor數量,
若是對Executor需求大於集羣管理器申請的數量,那麼向集羣管理器添加Executor。反之,向集羣管理器申請取消部分Executor。
此外它內部還會定時向集羣管理器申請一出(殺死)過時了的Executor。
// Optionally scale number of executors dynamically based on workload. Exposed for testing. val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) _executorAllocationManager = if (dynamicAllocationEnabled) { schedulerBackend match { case b: ExecutorAllocationClient => Some(new ExecutorAllocationManager( schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, _env.blockManager.master)) case _ => None } } else { None } _executorAllocationManager.foreach(_.start())
用於清理超出應用範圍的RDD、shuffle對應的map任務狀態、Shuffle元數據、Broadcast對象及RDD的checkpoint數據
_cleaner = if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { Some(new ContextCleaner(this)) } else { None } _cleaner.foreach(_.start())
/** Start the cleaner. */ def start(): Unit = { cleaningThread.setDaemon(true) cleaningThread.setName("Spark Context Cleaner") cleaningThread.start() periodicGCService.scheduleAtFixedRate(new Runnable { override def run(): Unit = System.gc() }, periodicGCInterval, periodicGCInterval, TimeUnit.SECONDS) }
除了GC的定時器,ContextCleaner的其他工做原理和listenerBus同樣(採用監聽器模式,由異步線程來處理)。
SparkContext初始化的時候會讀取用戶指定的Jar文件或者其餘文件
_jars = Utils.getUserJars(_conf) _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) .toSeq.flatten
首先讀取的時Jar文件,而後讀取用戶設置的其餘文件。
當用Yarn模式時,_jars是spark.jars和spark.yarn.dist.jars的Jar文件的並集。
其餘模式時,只採用spark.jars指定的Jar文件。
def jars: Seq[String] = _jars def files: Seq[String] = _files
// Add each JAR given through the constructor
if (jars != null) {
jars.foreach(addJar)
}
if (files != null) {
files.foreach(addFile)
}
addJar將Jar文件添加到Driver的RPC環境中。
因爲addJar和addFile可能會對應用的環境產生影響,所以在SparkContext初始化的最後對更新環境
postEnvironmentUpdate()
postEnvironmentUpdate() postApplicationStart() // Post init _taskScheduler.postStartHook() // 等待SchedulerBackend準備完成 // 向度量系統註冊Source _env.metricsSystem.registerSource(_dagScheduler.metricsSource) _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) _executorAllocationManager.foreach { e => _env.metricsSystem.registerSource(e.executorAllocationManagerSource) } // Make sure the context is stopped if the user forgets about it. This avoids leaving // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM // is killed, though. // 添加SparkContext的關閉鉤子 logDebug("Adding shutdown hook") // force eager creation of logger _shutdownHookRef = ShutdownHookManager.addShutdownHook( ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => logInfo("Invoking stop() from shutdown hook") try { stop() } catch { case e: Throwable => logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e) } } } catch { case NonFatal(e) => logError("Error initializing SparkContext.", e) try { stop() } catch { case NonFatal(inner) => logError("Error stopping SparkContext after init error.", inner) } finally { throw e } } // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having finished construction. // NOTE: this must be placed at the end of the SparkContext constructor. SparkContext.setActiveContext(this, allowMultipleContexts)
/** * Broadcast a read-only variable to the cluster, returning a * [[org.apache.spark.broadcast.Broadcast]] object for reading it in distributed functions. * The variable will be sent to each cluster only once. * * @param value value to broadcast to the Spark nodes * @return `Broadcast` object, a read-only variable cached on each machine */ def broadcast[T: ClassTag](value: T): Broadcast[T] = { assertNotStopped() require(!classOf[RDD[_]].isAssignableFrom(classTag[T].runtimeClass), "Can not directly broadcast RDDs; instead, call collect() and broadcast the result.") val bc = env.broadcastManager.newBroadcast[T](value, isLocal) val callSite = getCallSite logInfo("Created broadcast " + bc.id + " from " + callSite.shortForm) cleaner.foreach(_.registerBroadcastForCleanup(bc)) bc }
實質上是調用了SparkEnv的BroadcastManager的newBroadcast()方法生成廣播對象。
用於向LiveListenerBus中提娜佳實現了特質SparkListenerInterface的監聽器
/** * :: DeveloperApi :: * Register a listener to receive up-calls from events that happen during execution. */ @DeveloperApi def addSparkListener(listener: SparkListenerInterface) { listenerBus.addToSharedQueue(listener) }
SparkContext重載了runjob方法。最終都調用下面這個runjob。
/** * Run a function on a given set of partitions in an RDD and pass the results to the given * handler function. This is the main entry point for all actions in Spark. * * @param rdd target RDD to run tasks on * @param func a function to run on each partition of the RDD * @param partitions set of partitions to run on; some jobs may not want to compute on all * partitions of the target RDD, e.g. for operations like `first()` * @param resultHandler callback to pass each result to */ def runJob[T, U: ClassTag]( rdd: RDD[T], func: (TaskContext, Iterator[T]) => U, partitions: Seq[Int], resultHandler: (Int, U) => Unit): Unit = { if (stopped.get()) { throw new IllegalStateException("SparkContext has been shutdown") } val callSite = getCallSite val cleanedFunc = clean(func) logInfo("Starting job: " + callSite.shortForm) if (conf.getBoolean("spark.logLineage", false)) { logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString) } //調用sparkContext以前初始化時建立的DAGScheduler的runJob()方法 dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get) progressBar.foreach(_.finishAll()) rdd.doCheckpoint() // 保存檢查點 }
給做業中的RDD指定保存檢查點的目錄,是啓用檢查點機制的前提。
/** * Set the directory under which RDDs are going to be checkpointed. * @param directory path to the directory where checkpoint files will be stored * (must be HDFS path if running in cluster) */ def setCheckpointDir(directory: String) { // If we are running on a cluster, log a warning if the directory is local. // Otherwise, the driver may attempt to reconstruct the checkpointed RDD from // its own local file system, which is incorrect because the checkpoint files // are actually on the executor machines. if (!isLocal && Utils.nonLocalPaths(directory).isEmpty) { logWarning("Spark is not running in local mode, therefore the checkpoint directory " + s"must not be on the local filesystem. Directory '$directory' " + "appears to be on the local filesystem.") }
參考
1.《Spark內核設計的藝術架構設計與實現》
2.Spark2.4.3源碼