從Spark 2.0 開始,引入了 SparkSession的概念,建立或使用已有的session 代碼以下:html
1 val spark = SparkSession 2 .builder 3 .appName("SparkTC") 4 .getOrCreate()
首先,使用了 builder 模式來建立或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代碼以下:java
1 def getOrCreate(): SparkSession = synchronized { 2 assertOnDriver() // 注意,spark session只能在 driver端建立並訪問 3 // Get the session from current thread's active session. 4 // activeThreadSession 是一個InheritableThreadLocal(繼承自ThreadLocal)方法。由於數據在 ThreadLocal中存放着,因此不須要加鎖 5 var session = activeThreadSession.get() 6 // 若是session不爲空,且session對應的sparkContext已經中止了,可使用現有的session 7 if ((session ne null) && !session.sparkContext.isStopped) { 8 options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } 9 if (options.nonEmpty) { 10 logWarning("Using an existing SparkSession; some configuration may not take effect.") 11 } 12 return session 13 } 14 15 // 給SparkSession 對象加鎖,防止重複初始化 session 16 SparkSession.synchronized { 17 // If the current thread does not have an active session, get it from the global session. 18 // 若是默認session 中有session存在,切其sparkContext 已經中止,也可使用 19 session = defaultSession.get() 20 if ((session ne null) && !session.sparkContext.isStopped) { 21 options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) } 22 if (options.nonEmpty) { 23 logWarning("Using an existing SparkSession; some configuration may not take effect.") 24 } 25 return session 26 } 27 28 // 建立session 29 val sparkContext = userSuppliedContext.getOrElse { // 默認userSuppliedContext確定沒有SparkSession對象 30 val sparkConf = new SparkConf() 31 options.foreach { case (k, v) => sparkConf.set(k, v) } 32 33 // set a random app name if not given. 34 if (!sparkConf.contains("spark.app.name")) { 35 sparkConf.setAppName(java.util.UUID.randomUUID().toString) 36 } 37 38 SparkContext.getOrCreate(sparkConf) 39 // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions. 40 } 41 42 // Initialize extensions if the user has defined a configurator class. 43 val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS) 44 if (extensionConfOption.isDefined) { 45 val extensionConfClassName = extensionConfOption.get 46 try { 47 val extensionConfClass = Utils.classForName(extensionConfClassName) 48 val extensionConf = extensionConfClass.newInstance() 49 .asInstanceOf[SparkSessionExtensions => Unit] 50 extensionConf(extensions) 51 } catch { 52 // Ignore the error if we cannot find the class or when the class has the wrong type. 53 case e @ (_: ClassCastException | 54 _: ClassNotFoundException | 55 _: NoClassDefFoundError) => 56 logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e) 57 } 58 } 59 // 初始化 SparkSession,並把剛初始化的 SparkContext 傳遞給它 60 session = new SparkSession(sparkContext, None, None, extensions) 61 options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) } 62 // 設置 default session 63 setDefaultSession(session) 64 // 設置 active session 65 setActiveSession(session) 66 67 // Register a successfully instantiated context to the singleton. This should be at the 68 // end of the class definition so that the singleton is updated only if there is no 69 // exception in the construction of the instance. 70 // 設置 apark listener ,當application 結束時,default session 重置 71 sparkContext.addSparkListener(new SparkListener { 72 override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = { 73 defaultSession.set(null) 74 } 75 }) 76 } 77 78 return session 79 }
org.apache.spark.SparkContext#getOrCreate方法以下:web
1 def getOrCreate(config: SparkConf): SparkContext = { 2 // Synchronize to ensure that multiple create requests don't trigger an exception 3 // from assertNoOtherContextIsRunning within setActiveContext 4 // 使用Object 對象鎖 5 SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized { 6 // activeContext是一個AtomicReference 實例,它的數據set或update都是原子性的 7 if (activeContext.get() == null) { 8 // 一個session 只有一個 SparkContext 上下文對象 9 setActiveContext(new SparkContext(config), allowMultipleContexts = false) 10 } else { 11 if (config.getAll.nonEmpty) { 12 logWarning("Using an existing SparkContext; some configuration may not take effect.") 13 } 14 } 15 activeContext.get() 16 } 17 }
SparkContext 表明到 spark 集羣的鏈接,它能夠用來在spark集羣上建立 RDD,accumulator和broadcast 變量。一個JVM 只能有一個活動的 SparkContext 對象,當建立一個新的時候,必須調用stop 方法中止活動的 SparkContext。
當調用了構造方法後,會初始化類的成員變量,而後進入初始化過程。由 try catch 塊包圍,這個 try catch 塊是在執行構造函數時執行的,參照我寫的一篇文章:scala class中孤立代碼塊揭祕sql
這塊孤立的代碼塊以下: apache
1 try { 2 // 1. 初始化 configuration 3 _conf = config.clone() 4 _conf.validateSettings() 5 6 if (!_conf.contains("spark.master")) { 7 throw new SparkException("A master URL must be set in your configuration") 8 } 9 if (!_conf.contains("spark.app.name")) { 10 throw new SparkException("An application name must be set in your configuration") 11 } 12 13 // log out spark.app.name in the Spark driver logs 14 logInfo(s"Submitted application: $appName") 15 16 // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster 17 if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) { 18 throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " + 19 "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.") 20 } 21 22 if (_conf.getBoolean("spark.logConf", false)) { 23 logInfo("Spark configuration:\n" + _conf.toDebugString) 24 } 25 26 // Set Spark driver host and port system properties. This explicitly sets the configuration 27 // instead of relying on the default value of the config constant. 28 _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS)) 29 _conf.setIfMissing("spark.driver.port", "0") 30 31 _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER) 32 33 _jars = Utils.getUserJars(_conf) 34 _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty)) 35 .toSeq.flatten 36 // 2. 初始化日誌目錄並設置壓縮類 37 _eventLogDir = 38 if (isEventLogEnabled) { 39 val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR) 40 .stripSuffix("/") 41 Some(Utils.resolveURI(unresolvedDir)) 42 } else { 43 None 44 } 45 46 _eventLogCodec = { 47 val compress = _conf.getBoolean("spark.eventLog.compress", false) 48 if (compress && isEventLogEnabled) { 49 Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName) 50 } else { 51 None 52 } 53 } 54 // 3. LiveListenerBus負責將SparkListenerEvent異步地傳遞給對應註冊的SparkListener. 55 _listenerBus = new LiveListenerBus(_conf) 56 57 // Initialize the app status store and listener before SparkEnv is created so that it gets 58 // all events. 59 // 4. 給 app 提供一個 kv store(in-memory) 60 _statusStore = AppStatusStore.createLiveStore(conf) 61 // 5. 註冊 AppStatusListener 到 LiveListenerBus 中 62 listenerBus.addToStatusQueue(_statusStore.listener.get) 63 64 // Create the Spark execution environment (cache, map output tracker, etc) 65 // 6. 建立 driver端的 env 66 // 包含全部的spark 實例運行時對象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。 67 // 當前的spark 經過一個全局的變量代碼找到 SparkEnv,全部的線程能夠訪問同一個SparkEnv, 68 // 建立SparkContext以後,能夠經過 SparkEnv.get方法來訪問它。 69 _env = createSparkEnv(_conf, isLocal, listenerBus) 70 SparkEnv.set(_env) 71 72 // If running the REPL, register the repl's output dir with the file server. 73 _conf.getOption("spark.repl.class.outputDir").foreach { path => 74 val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path)) 75 _conf.set("spark.repl.class.uri", replUri) 76 } 77 // 7. 從底層監控 spark job 和 stage 的狀態並彙報的 API 78 _statusTracker = new SparkStatusTracker(this, _statusStore) 79 80 // 8. console 進度條 81 _progressBar = 82 if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) { 83 Some(new ConsoleProgressBar(this)) 84 } else { 85 None 86 } 87 88 // 9. spark ui, 使用jetty 實現 89 _ui = 90 if (conf.getBoolean("spark.ui.enabled", true)) { 91 Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "", 92 startTime)) 93 } else { 94 // For tests, do not enable the UI 95 None 96 } 97 // Bind the UI before starting the task scheduler to communicate 98 // the bound port to the cluster manager properly 99 _ui.foreach(_.bind()) 100 101 // 10. 建立 hadoop configuration 102 _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf) 103 104 // 11. Add each JAR given through the constructor 105 if (jars != null) { 106 jars.foreach(addJar) 107 } 108 109 if (files != null) { 110 files.foreach(addFile) 111 } 112 // 12. 計算 executor 的內存 113 _executorMemory = _conf.getOption("spark.executor.memory") 114 .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY"))) 115 .orElse(Option(System.getenv("SPARK_MEM")) 116 .map(warnSparkMem)) 117 .map(Utils.memoryStringToMb) 118 .getOrElse(1024) 119 120 // Convert java options to env vars as a work around 121 // since we can't set env vars directly in sbt. 122 for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing")) 123 value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} { 124 executorEnvs(envKey) = value 125 } 126 Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v => 127 executorEnvs("SPARK_PREPEND_CLASSES") = v 128 } 129 // The Mesos scheduler backend relies on this environment variable to set executor memory. 130 // TODO: Set this only in the Mesos scheduler. 131 executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m" 132 executorEnvs ++= _conf.getExecutorEnv 133 executorEnvs("SPARK_USER") = sparkUser 134 135 // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will 136 // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640) 137 // 13. 建立 HeartbeatReceiver endpoint 138 _heartbeatReceiver = env.rpcEnv.setupEndpoint( 139 HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this)) 140 141 // Create and start the scheduler 142 // 14. 建立 task scheduler 和 scheduler backend 143 val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode) 144 _schedulerBackend = sched 145 _taskScheduler = ts 146 // 15. 建立DAGScheduler實例 147 _dagScheduler = new DAGScheduler(this) 148 _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet) 149 150 // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's 151 // constructor 152 // 16. 啓動 task scheduler 153 _taskScheduler.start() 154 155 // 17. 從task scheduler 獲取 application ID 156 _applicationId = _taskScheduler.applicationId() 157 // 18. 從 task scheduler 獲取 application attempt id 158 _applicationAttemptId = taskScheduler.applicationAttemptId() 159 _conf.set("spark.app.id", _applicationId) 160 if (_conf.getBoolean("spark.ui.reverseProxy", false)) { 161 System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId) 162 } 163 // 19. 爲ui 設置 application id 164 _ui.foreach(_.setAppId(_applicationId)) 165 // 20. 初始化 block manager 166 _env.blockManager.initialize(_applicationId) 167 168 // The metrics system for Driver need to be set spark.app.id to app ID. 169 // So it should start after we get app ID from the task scheduler and set spark.app.id. 170 // 21. 啓動 metricsSystem 171 _env.metricsSystem.start() 172 // Attach the driver metrics servlet handler to the web ui after the metrics system is started. 173 // 22. 將 metricSystem 的 servlet handler 給 ui 用 174 _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler))) 175 176 // 23. 初始化 event logger listener 177 _eventLogger = 178 if (isEventLogEnabled) { 179 val logger = 180 new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get, 181 _conf, _hadoopConfiguration) 182 logger.start() 183 listenerBus.addToEventLogQueue(logger) 184 Some(logger) 185 } else { 186 None 187 } 188 189 // Optionally scale number of executors dynamically based on workload. Exposed for testing. 190 // 24. 若是啓用了動態分配 executor, 須要實例化 executorAllocationManager 並啓動之 191 val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf) 192 _executorAllocationManager = 193 if (dynamicAllocationEnabled) { 194 schedulerBackend match { 195 case b: ExecutorAllocationClient => 196 Some(new ExecutorAllocationManager( 197 schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf, 198 _env.blockManager.master)) 199 case _ => 200 None 201 } 202 } else { 203 None 204 } 205 _executorAllocationManager.foreach(_.start()) 206 207 // 25. 初始化 ContextCleaner,並啓動之 208 _cleaner = 209 if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) { 210 Some(new ContextCleaner(this)) 211 } else { 212 None 213 } 214 _cleaner.foreach(_.start()) 215 // 26. 創建並啓動 listener bus 216 setupAndStartListenerBus() 217 // 27. task scheduler 已就緒,發送環境已更新請求 218 postEnvironmentUpdate() 219 // 28. 發送 application start 請求事件 220 postApplicationStart() 221 222 // Post init 223 // 29.等待 直至task scheduler backend 準備好了 224 _taskScheduler.postStartHook() 225 // 30. 註冊 dagScheduler metricsSource 226 _env.metricsSystem.registerSource(_dagScheduler.metricsSource) 227 // 31. 註冊 metric source 228 _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager)) 229 //32. 註冊 metric source 230 _executorAllocationManager.foreach { e => 231 _env.metricsSystem.registerSource(e.executorAllocationManagerSource) 232 } 233 234 // Make sure the context is stopped if the user forgets about it. This avoids leaving 235 // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM 236 // is killed, though. 237 logDebug("Adding shutdown hook") // force eager creation of logger 238 // 33. 設置 shutdown hook, 在spark context 關閉時,要作的回調操做 239 _shutdownHookRef = ShutdownHookManager.addShutdownHook( 240 ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () => 241 logInfo("Invoking stop() from shutdown hook") 242 try { 243 stop() 244 } catch { 245 case e: Throwable => 246 logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e) 247 } 248 } 249 } catch { 250 case NonFatal(e) => 251 logError("Error initializing SparkContext.", e) 252 try { 253 stop() 254 } catch { 255 case NonFatal(inner) => 256 logError("Error stopping SparkContext after init error.", inner) 257 } finally { 258 throw e 259 } 260 }
從上面能夠看出,spark context 的初始化是很是複雜的,涉及的spark 組件不少,包括 異步事務總線系統LiveListenerBus、SparkEnv、SparkUI、DAGScheduler、metrics監測系統、EventLoggingListener、TaskScheduler、ExecutorAllocationManager、ContextCleaner等等。先暫且看成是總述,後面對部分組件會有比較全面的剖析。session