spark 源碼分析之二 -- SparkContext 的初始化過程

建立或使用現有Session

從Spark 2.0 開始,引入了 SparkSession的概念,建立或使用已有的session 代碼以下:html

1 val spark = SparkSession
2   .builder
3   .appName("SparkTC")
4   .getOrCreate()

首先,使用了 builder 模式來建立或使用已存在的SparkSession,org.apache.spark.sql.SparkSession.Builder#getOrCreate 代碼以下:java

  

 1 def getOrCreate(): SparkSession = synchronized {
 2   assertOnDriver() // 注意,spark session只能在 driver端建立並訪問
 3   // Get the session from current thread's active session.
 4 // activeThreadSession 是一個InheritableThreadLocal(繼承自ThreadLocal)方法。由於數據在 ThreadLocal中存放着,因此不須要加鎖
 5   var session = activeThreadSession.get()
 6 // 若是session不爲空,且session對應的sparkContext已經中止了,可使用現有的session
 7   if ((session ne null) && !session.sparkContext.isStopped) {
 8     options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
 9     if (options.nonEmpty) {
10       logWarning("Using an existing SparkSession; some configuration may not take effect.")
11     }
12     return session
13   }
14 
15   // 給SparkSession 對象加鎖,防止重複初始化 session
16 SparkSession.synchronized {
17     // If the current thread does not have an active session, get it from the global session.
18 // 若是默認session 中有session存在,切其sparkContext 已經中止,也可使用
19     session = defaultSession.get()
20     if ((session ne null) && !session.sparkContext.isStopped) {
21       options.foreach { case (k, v) => session.sessionState.conf.setConfString(k, v) }
22       if (options.nonEmpty) {
23         logWarning("Using an existing SparkSession; some configuration may not take effect.")
24       }
25       return session
26     }
27 
28     // 建立session
29     val sparkContext = userSuppliedContext.getOrElse { // 默認userSuppliedContext確定沒有SparkSession對象
30       val sparkConf = new SparkConf()
31       options.foreach { case (k, v) => sparkConf.set(k, v) }
32 
33       // set a random app name if not given.
34       if (!sparkConf.contains("spark.app.name")) {
35         sparkConf.setAppName(java.util.UUID.randomUUID().toString)
36       }
37 
38       SparkContext.getOrCreate(sparkConf)
39       // Do not update `SparkConf` for existing `SparkContext`, as it's shared by all sessions.
40     }
41 
42     // Initialize extensions if the user has defined a configurator class.
43     val extensionConfOption = sparkContext.conf.get(StaticSQLConf.SPARK_SESSION_EXTENSIONS)
44     if (extensionConfOption.isDefined) {
45       val extensionConfClassName = extensionConfOption.get
46       try {
47         val extensionConfClass = Utils.classForName(extensionConfClassName)
48         val extensionConf = extensionConfClass.newInstance()
49           .asInstanceOf[SparkSessionExtensions => Unit]
50         extensionConf(extensions)
51       } catch {
52         // Ignore the error if we cannot find the class or when the class has the wrong type.
53         case e @ (_: ClassCastException |
54                   _: ClassNotFoundException |
55                   _: NoClassDefFoundError) =>
56           logWarning(s"Cannot use $extensionConfClassName to configure session extensions.", e)
57       }
58     }
59    // 初始化 SparkSession,並把剛初始化的 SparkContext 傳遞給它
60     session = new SparkSession(sparkContext, None, None, extensions)
61     options.foreach { case (k, v) => session.initialSessionOptions.put(k, v) }
62 // 設置 default session
63     setDefaultSession(session)
64 // 設置 active session
65 setActiveSession(session)
66 
67     // Register a successfully instantiated context to the singleton. This should be at the
68     // end of the class definition so that the singleton is updated only if there is no
69     // exception in the construction of the instance.
70     // 設置 apark listener ,當application 結束時,default session 重置
71 sparkContext.addSparkListener(new SparkListener {
72       override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
73         defaultSession.set(null)
74       }
75     })
76   }
77 
78   return session
79 }

org.apache.spark.SparkContext#getOrCreate方法以下:web

 1 def getOrCreate(config: SparkConf): SparkContext = {
 2   // Synchronize to ensure that multiple create requests don't trigger an exception
 3   // from assertNoOtherContextIsRunning within setActiveContext
 4 // 使用Object 對象鎖
 5   SPARK_CONTEXT_CONSTRUCTOR_LOCK.synchronized {
 6 // activeContext是一個AtomicReference 實例,它的數據set或update都是原子性的
 7     if (activeContext.get() == null) {
 8 // 一個session 只有一個 SparkContext 上下文對象
 9       setActiveContext(new SparkContext(config), allowMultipleContexts = false)
10     } else {
11       if (config.getAll.nonEmpty) {
12         logWarning("Using an existing SparkContext; some configuration may not take effect.")
13       }
14     }
15     activeContext.get()
16   }
17 }

Spark Context 初始化

SparkContext 表明到 spark 集羣的鏈接,它能夠用來在spark集羣上建立 RDD,accumulator和broadcast 變量。一個JVM 只能有一個活動的 SparkContext 對象,當建立一個新的時候,必須調用stop 方法中止活動的 SparkContext。
當調用了構造方法後,會初始化類的成員變量,而後進入初始化過程。由 try catch 塊包圍,這個 try catch 塊是在執行構造函數時執行的,參照我寫的一篇文章:scala class中孤立代碼塊揭祕sql

這塊孤立的代碼塊以下:  apache

  1 try {
  2   // 1. 初始化 configuration
  3   _conf = config.clone()
  4   _conf.validateSettings()
  5 
  6   if (!_conf.contains("spark.master")) {
  7     throw new SparkException("A master URL must be set in your configuration")
  8   }
  9   if (!_conf.contains("spark.app.name")) {
 10     throw new SparkException("An application name must be set in your configuration")
 11   }
 12 
 13   // log out spark.app.name in the Spark driver logs
 14   logInfo(s"Submitted application: $appName")
 15 
 16   // System property spark.yarn.app.id must be set if user code ran by AM on a YARN cluster
 17   if (master == "yarn" && deployMode == "cluster" && !_conf.contains("spark.yarn.app.id")) {
 18     throw new SparkException("Detected yarn cluster mode, but isn't running on a cluster. " +
 19       "Deployment to YARN is not supported directly by SparkContext. Please use spark-submit.")
 20   }
 21 
 22   if (_conf.getBoolean("spark.logConf", false)) {
 23     logInfo("Spark configuration:\n" + _conf.toDebugString)
 24   }
 25 
 26   // Set Spark driver host and port system properties. This explicitly sets the configuration
 27   // instead of relying on the default value of the config constant.
 28   _conf.set(DRIVER_HOST_ADDRESS, _conf.get(DRIVER_HOST_ADDRESS))
 29   _conf.setIfMissing("spark.driver.port", "0")
 30 
 31   _conf.set("spark.executor.id", SparkContext.DRIVER_IDENTIFIER)
 32 
 33   _jars = Utils.getUserJars(_conf)
 34   _files = _conf.getOption("spark.files").map(_.split(",")).map(_.filter(_.nonEmpty))
 35     .toSeq.flatten
 36   // 2. 初始化日誌目錄並設置壓縮類
 37   _eventLogDir =
 38     if (isEventLogEnabled) {
 39       val unresolvedDir = conf.get("spark.eventLog.dir", EventLoggingListener.DEFAULT_LOG_DIR)
 40         .stripSuffix("/")
 41       Some(Utils.resolveURI(unresolvedDir))
 42     } else {
 43       None
 44     }
 45 
 46   _eventLogCodec = {
 47     val compress = _conf.getBoolean("spark.eventLog.compress", false)
 48     if (compress && isEventLogEnabled) {
 49       Some(CompressionCodec.getCodecName(_conf)).map(CompressionCodec.getShortName)
 50     } else {
 51       None
 52     }
 53   }
 54   // 3. LiveListenerBus負責將SparkListenerEvent異步地傳遞給對應註冊的SparkListener.
 55   _listenerBus = new LiveListenerBus(_conf)
 56 
 57   // Initialize the app status store and listener before SparkEnv is created so that it gets
 58   // all events.
 59   // 4. 給 app 提供一個 kv store(in-memory)
 60   _statusStore = AppStatusStore.createLiveStore(conf)
 61   // 5. 註冊 AppStatusListener 到 LiveListenerBus 中
 62   listenerBus.addToStatusQueue(_statusStore.listener.get)
 63 
 64   // Create the Spark execution environment (cache, map output tracker, etc)
 65   // 6. 建立 driver端的 env
 66   // 包含全部的spark 實例運行時對象(master 或 worker),包含了序列化器,RPCEnv,block manager, map out tracker等等。
 67   // 當前的spark 經過一個全局的變量代碼找到 SparkEnv,全部的線程能夠訪問同一個SparkEnv,
 68   // 建立SparkContext以後,能夠經過 SparkEnv.get方法來訪問它。
 69   _env = createSparkEnv(_conf, isLocal, listenerBus)
 70   SparkEnv.set(_env)
 71 
 72   // If running the REPL, register the repl's output dir with the file server.
 73   _conf.getOption("spark.repl.class.outputDir").foreach { path =>
 74     val replUri = _env.rpcEnv.fileServer.addDirectory("/classes", new File(path))
 75     _conf.set("spark.repl.class.uri", replUri)
 76   }
 77   // 7. 從底層監控 spark job 和 stage 的狀態並彙報的 API
 78   _statusTracker = new SparkStatusTracker(this, _statusStore)
 79 
 80   // 8. console 進度條
 81   _progressBar =
 82     if (_conf.get(UI_SHOW_CONSOLE_PROGRESS) && !log.isInfoEnabled) {
 83       Some(new ConsoleProgressBar(this))
 84     } else {
 85       None
 86     }
 87 
 88   // 9. spark ui, 使用jetty 實現
 89   _ui =
 90     if (conf.getBoolean("spark.ui.enabled", true)) {
 91       Some(SparkUI.create(Some(this), _statusStore, _conf, _env.securityManager, appName, "",
 92         startTime))
 93     } else {
 94       // For tests, do not enable the UI
 95       None
 96     }
 97   // Bind the UI before starting the task scheduler to communicate
 98   // the bound port to the cluster manager properly
 99   _ui.foreach(_.bind())
100 
101   // 10. 建立 hadoop configuration
102   _hadoopConfiguration = SparkHadoopUtil.get.newConfiguration(_conf)
103 
104   // 11. Add each JAR given through the constructor
105   if (jars != null) {
106     jars.foreach(addJar)
107   }
108 
109   if (files != null) {
110     files.foreach(addFile)
111   }
112   // 12. 計算 executor 的內存
113   _executorMemory = _conf.getOption("spark.executor.memory")
114     .orElse(Option(System.getenv("SPARK_EXECUTOR_MEMORY")))
115     .orElse(Option(System.getenv("SPARK_MEM"))
116     .map(warnSparkMem))
117     .map(Utils.memoryStringToMb)
118     .getOrElse(1024)
119 
120   // Convert java options to env vars as a work around
121   // since we can't set env vars directly in sbt.
122   for { (envKey, propKey) <- Seq(("SPARK_TESTING", "spark.testing"))
123     value <- Option(System.getenv(envKey)).orElse(Option(System.getProperty(propKey)))} {
124     executorEnvs(envKey) = value
125   }
126   Option(System.getenv("SPARK_PREPEND_CLASSES")).foreach { v =>
127     executorEnvs("SPARK_PREPEND_CLASSES") = v
128   }
129   // The Mesos scheduler backend relies on this environment variable to set executor memory.
130   // TODO: Set this only in the Mesos scheduler.
131   executorEnvs("SPARK_EXECUTOR_MEMORY") = executorMemory + "m"
132   executorEnvs ++= _conf.getExecutorEnv
133   executorEnvs("SPARK_USER") = sparkUser
134 
135   // We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
136   // retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
137   // 13. 建立 HeartbeatReceiver endpoint
138   _heartbeatReceiver = env.rpcEnv.setupEndpoint(
139     HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
140 
141   // Create and start the scheduler
142   // 14. 建立 task scheduler 和 scheduler backend
143   val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
144   _schedulerBackend = sched
145   _taskScheduler = ts
146   // 15. 建立DAGScheduler實例
147   _dagScheduler = new DAGScheduler(this)
148   _heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
149 
150   // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
151   // constructor
152   // 16. 啓動 task scheduler
153   _taskScheduler.start()
154 
155   // 17. 從task scheduler 獲取 application ID
156   _applicationId = _taskScheduler.applicationId()
157   // 18. 從 task scheduler 獲取 application attempt id
158   _applicationAttemptId = taskScheduler.applicationAttemptId()
159   _conf.set("spark.app.id", _applicationId)
160   if (_conf.getBoolean("spark.ui.reverseProxy", false)) {
161     System.setProperty("spark.ui.proxyBase", "/proxy/" + _applicationId)
162   }
163   // 19. 爲ui 設置 application id
164   _ui.foreach(_.setAppId(_applicationId))
165   // 20. 初始化 block manager
166   _env.blockManager.initialize(_applicationId)
167 
168   // The metrics system for Driver need to be set spark.app.id to app ID.
169   // So it should start after we get app ID from the task scheduler and set spark.app.id.
170   // 21. 啓動 metricsSystem
171   _env.metricsSystem.start()
172   // Attach the driver metrics servlet handler to the web ui after the metrics system is started.
173   // 22. 將 metricSystem 的 servlet handler 給 ui 用
174   _env.metricsSystem.getServletHandlers.foreach(handler => ui.foreach(_.attachHandler(handler)))
175 
176   // 23. 初始化 event logger listener
177   _eventLogger =
178     if (isEventLogEnabled) {
179       val logger =
180         new EventLoggingListener(_applicationId, _applicationAttemptId, _eventLogDir.get,
181           _conf, _hadoopConfiguration)
182       logger.start()
183       listenerBus.addToEventLogQueue(logger)
184       Some(logger)
185     } else {
186       None
187     }
188 
189   // Optionally scale number of executors dynamically based on workload. Exposed for testing.
190   // 24. 若是啓用了動態分配 executor, 須要實例化 executorAllocationManager 並啓動之
191   val dynamicAllocationEnabled = Utils.isDynamicAllocationEnabled(_conf)
192   _executorAllocationManager =
193     if (dynamicAllocationEnabled) {
194       schedulerBackend match {
195         case b: ExecutorAllocationClient =>
196           Some(new ExecutorAllocationManager(
197             schedulerBackend.asInstanceOf[ExecutorAllocationClient], listenerBus, _conf,
198             _env.blockManager.master))
199         case _ =>
200           None
201       }
202     } else {
203       None
204     }
205   _executorAllocationManager.foreach(_.start())
206 
207   // 25. 初始化 ContextCleaner,並啓動之
208   _cleaner =
209     if (_conf.getBoolean("spark.cleaner.referenceTracking", true)) {
210       Some(new ContextCleaner(this))
211     } else {
212       None
213     }
214   _cleaner.foreach(_.start())
215   // 26. 創建並啓動 listener bus
216   setupAndStartListenerBus()
217   // 27.  task scheduler 已就緒,發送環境已更新請求
218   postEnvironmentUpdate()
219   // 28.  發送 application start 請求事件
220   postApplicationStart()
221 
222   // Post init
223   // 29.等待 直至task scheduler backend 準備好了
224   _taskScheduler.postStartHook()
225   // 30. 註冊 dagScheduler metricsSource
226   _env.metricsSystem.registerSource(_dagScheduler.metricsSource)
227   // 31. 註冊 metric source
228   _env.metricsSystem.registerSource(new BlockManagerSource(_env.blockManager))
229   //32. 註冊 metric source
230   _executorAllocationManager.foreach { e =>
231     _env.metricsSystem.registerSource(e.executorAllocationManagerSource)
232   }
233 
234   // Make sure the context is stopped if the user forgets about it. This avoids leaving
235   // unfinished event logs around after the JVM exits cleanly. It doesn't help if the JVM
236   // is killed, though.
237   logDebug("Adding shutdown hook") // force eager creation of logger
238   // 33. 設置 shutdown hook, 在spark context 關閉時,要作的回調操做
239   _shutdownHookRef = ShutdownHookManager.addShutdownHook(
240     ShutdownHookManager.SPARK_CONTEXT_SHUTDOWN_PRIORITY) { () =>
241     logInfo("Invoking stop() from shutdown hook")
242     try {
243       stop()
244     } catch {
245       case e: Throwable =>
246         logWarning("Ignoring Exception while stopping SparkContext from shutdown hook", e)
247     }
248   }
249 } catch {
250   case NonFatal(e) =>
251     logError("Error initializing SparkContext.", e)
252     try {
253       stop()
254     } catch {
255       case NonFatal(inner) =>
256         logError("Error stopping SparkContext after init error.", inner)
257     } finally {
258       throw e
259     }
260

 

從上面能夠看出,spark context 的初始化是很是複雜的,涉及的spark 組件不少,包括 異步事務總線系統LiveListenerBus、SparkEnv、SparkUI、DAGScheduler、metrics監測系統、EventLoggingListener、TaskScheduler、ExecutorAllocationManager、ContextCleaner等等。先暫且看成是總述,後面對部分組件會有比較全面的剖析。session

相關文章
相關標籤/搜索