SparkContext的初始化(伯篇)——執行環境與元數據清理器

《深入理解Spark:核心思想與源碼分析》一書前言的內容請看鏈接《深入理解SPARK:核心思想與源碼分析》一書正式出版上市

《深入理解Spark:核心思想與源碼分析》一書第一章的內容請看鏈接《第1章 環境準備》

《深入理解Spark:核心思想與源碼分析》一書第二章的內容請看鏈接《第2章 SPARK設計理念與基本架構》

由於本書的第3章內容較多,所以打算分別開闢三篇隨筆分別展現。本文展現第3章第一部分的內容:


第3章 SparkContext的初始化


「道生一,一生二,二生三,三生萬物。」——《道德經》

本章導讀:

       SparkContext的初始化是Driver應用程序提交執行的前提,本章內容以local模式爲主,並按照代碼執行順序講解,這將有助於首次接觸Spark的讀者理解源碼。讀者朋友如果能邊跟蹤代碼,邊學習本章內容,也許是快速理解SparkContext初始化過程的便捷途徑。已經熟練使用Spark的開發人員可以選擇跳過本章內容。

       本章將在介紹SparkContext初始化過程的同時,向讀者介紹各個組件的作用,爲閱讀後面的章節打好基礎。Spark中的組件很多,就其功能而言涉及到網絡通信、分佈式、消息、存儲、計算、緩存、測量、清理、文件服務、Web UI的方方面面。

 

3.1 SparkContext概述

        Spark Driver用於提交用戶應用程序,實際可以看作Spark的客戶端。瞭解Spark Driver的初始化,有助於讀者理解用戶應用程序在客戶端的處理過程。

        Spark Driver的初始化始終圍繞着SparkContext的初始化。SparkContext可以算得上是所有Spark應用程序的發動機引擎,轎車要想跑起來,發動機首先要啓動。SparkContext初始化完畢,才能向Spark集羣提交任務。在平坦的公路上,發動機只需以較低的轉速,較低的功率就可以遊刃有餘;在山區,你可能需要一臺能夠提供大功率的發動機,這樣才能滿足你轉山的體驗。這些參數都是通過駕駛員操作油門、檔位等傳送給發動機的,而SparkContext的配置參數則由SparkConf負責,SparkConf就是你的操作面板。

SparkConf的構造很簡單,主要是通過ConcurrentHashMap來維護各種Spark的配置屬性。SparkConf代碼結構見代碼清單3-1。Spark的配置屬性都是以「spark.」開頭的字符串。

代碼清單3-1  SparkConf代碼結構

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging {  
  2.   importSparkConf._  
  3.   def this()= this(true)  
  4.   private val settings = newConcurrentHashMap[String, String]()  
  5.   if(loadDefaults) {  
  6.     // 加載任何以spark.開頭的系統屬性  
  7.     for ((key, value) <-Utils.getSystemProperties if key.startsWith("spark.")) {  
  8.       set(key, value)  
  9.     }  
  10.   }  
  11. //其餘代碼省略  

現在開始介紹SparkContext,SparkContext的初始化步驟如下:

1)        創建Spark執行環境SparkEnv;

2)        創建RDD清理器metadataCleaner;

3)        創建並初始化SparkUI;

4)        Hadoop相關配置及Executor環境變量的設置

5)        創建任務調度TaskScheduler;

6)        創建和啓動DAGScheduler;

7)        TaskScheduler的啓動;

8)        初始化塊管理器BlockManager(BlockManager是存儲體系的主要組件之一,將在第4章介紹);

9)        啓動測量系統MetricsSystem;

10)     創建和啓動Executor分配管理器ExecutorAllocationManager;

11)     ContextCleaner的創建與啓動;

12)     Spark環境更新;

13)     創建DAGSchedulerSource和BlockManagerSource;

14)     將SparkContext標記爲激活。

 

SparkContext的主構造器參數爲SparkConf,其實現如下。

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. class SparkContext(config: SparkConf) extends Logging withExecutorAllocationClient {  
  2. private val creationSite: CallSite = Utils.getCallSite()  
  3.   private val allowMultipleContexts:Boolean =  
  4.    config.getBoolean("spark.driver.allowMultipleContexts"false)  
  5.  SparkContext.markPartiallyConstructed(this, allowMultipleContexts)  

上面代碼中的CallSite存儲了線程棧中最靠近棧頂的用戶類及最靠近棧底的Scala或者Spark核心類信息。Utils.getCallSite的詳細信息見附錄A。SparkContext默認只有一個實例(由屬性spark.driver.allowMultipleContexts來控制,用戶需要多個SparkContext實例時,可以將其設置爲true),方法markPartiallyConstructed用來確保實例的唯一性,並將當前SparkContext標記爲正在構建中。

         接下來會對SparkConf進行拷貝,然後對各種配置信息進行校驗,代碼如下。

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. private[spark] val conf =config.clone()  
  2. conf.validateSettings()  
  3.   
  4. if (!conf.contains("spark.master")) {  
  5.   throw newSparkException("A master URL must be set in your configuration")  
  6. }  
  7. if (!conf.contains("spark.app.name")) {  
  8.   throw newSparkException("An application name must be set in yourconfiguration")  
  9. }  

從上面校驗的代碼看到必須指定屬性spark.master 和spark.app.name,否則會拋出異常,結束初始化過程。spark.master用於設置部署模式,spark.app.name指定應用程序名稱。

3.2 創建執行環境SparkEnv

       SparkEnv是Spark的執行環境對象,其中包括衆多與Executor執行相關的對象。由於在local模式下Driver會創建Executor,local-cluster部署模式或者Standalone部署模式下Worker另起的CoarseGrainedExecutorBackend進程中也會創建Executor,所以SparkEnv存在於Driver或者CoarseGrainedExecutorBackend進程中。創建SparkEnv 主要使用SparkEnv的createDriverEnvcreateDriverEnv方法有三個參數,conf、isLocal和 listenerBus。

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1.  val isLocal = (master == "local" ||master.startsWith("local["))  
  2.  private[spark] vallistenerBus = newLiveListenerBus  
  3.  conf.set("spark.executor.id","driver")  
  4.   
  5.  private[spark] valenv =SparkEnv.createDriverEnv(conf,isLocal, listenerBus)  
  6. SparkEnv.set(env)  

上面代碼中的conf是對SparkConf的拷貝,isLocal標識是否是單機模式,listenerBus採用監聽器模式維護各類事件的處理,在3.14節會詳細介紹。

SparkEnv的方法createDriverEnv最終調用create創建SparkEnv。SparkEnv的構造步驟如下:

1)        創建安全管理器SecurityManager;

2)        創建基於Akka的分佈式消息系統ActorSystem;

3)        創建Map任務輸出跟蹤器mapOutputTracker;

4)        實例化ShuffleManager;

5)        創建ShuffleMemoryManager;

6)        創建塊傳輸服務BlockTransferService;

7)        創建BlockManagerMaster;

8)        創建塊管理器BlockManager;

9)        創建廣播管理器BroadcastManager;

10)    創建緩存管理器CacheManager;

11)    創建HTTP文件服務器HttpFileServer;

12)    創建測量系統MetricsSystem;

13)    創建SparkEnv;

 

3.2.1 安全管理器SecurityManager

         SecurityManager主要對權限、賬號進行設置,如果使用Hadoop YARN作爲集羣管理器,則需要使用證書生成 secret key登錄,最後給當前系統設置默認的口令認證實例,此實例採用匿名內部類實現,參見代碼清單3-2。

代碼清單3-2  SecurityManager的實現

 

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. private val secretKey =generateSecretKey()  
  2.   
  3.  // 使用HTTP連接設置口令認證  
  4.  if (authOn) {  
  5.   Authenticator.setDefault(  
  6.      newAuthenticator() {  
  7.        override defgetPasswordAuthentication(): PasswordAuthentication = {  
  8.          var passAuth:PasswordAuthentication = null  
  9.          val userInfo =getRequestingURL().getUserInfo()  
  10.          if (userInfo !=null) {  
  11.            val  parts = userInfo.split(":",2)  
  12.            passAuth = newPasswordAuthentication(parts(0),parts(1).toCharArray())  
  13.          }  
  14.          return passAuth  
  15.        }  
  16.      }  
  17.    )  
  18.  }<span style="font-family: Arial, Helvetica, sans-serif; background-color: rgb(255, 255, 255);"> </span>  

3.2.2 基於Akka的分佈式消息系統ActorSystem

         ActorSystem是Spark中最基礎的設施,Spark既使用它發送分佈式消息,又用它實現併發編程。怎麼,消息系統可以實現併發?要解釋清楚這個問題,首先應該簡單的介紹下Scala語言的Actor併發編程模型:Scala認爲Java線程通過共享數據以及通過鎖來維護共享數據的一致性是糟糕的做法,容易引起鎖的爭用,而且線程的上下文切換會帶來不少開銷,降低併發程序的性能,甚至會引入死鎖的問題。在Scala中只需要自定義類型繼承Actor,並且提供act方法,就如同Java裏實現Runnable接口,需要實現run方法一樣。但是不能直接調用act方法,而是通過發送消息的方式(Scala發送消息是異步的),傳遞數據。如:

         Actor ! message

         Akka是Actor編程模型的高級類庫,類似於JDK 1.5之後越來越豐富的併發工具包,簡化了程序員併發編程的難度。ActorSystem便是Akka提供的用於創建分佈式消息通信系統的基礎類。Akka的具體信息見附錄B。

         正式因爲Actor輕量級的併發編程、消息發送以及ActorSystem支持分佈式消息發送等特點,Spark選擇了ActorSystem。

         SparkEnv中創建ActorSystem時用到了AkkaUtils工具類,見代碼清單3-3。AkkaUtils.createActorSystem方法用於啓動ActorSystem,見代碼清單3-4。AkkaUtils使用了Utils的靜態方法startServiceOnPort, startServiceOnPort最終會回調方法startService: Int => (T, Int),此處的startService實際是方法doCreateActorSystem。真正啓動ActorSystem是由doCreateActorSystem方法完成的,doCreateActorSystem的具體實現細節請見附錄B。Spark的Driver中Akka的默認訪問地址是akka://sparkDriver,Spark的Executor中Akka的默認訪問地址是akka://sparkExecutor。如果不指定ActorSystem的端口,那麼所有節點的ActorSystem端口在每次啓動時隨機產生。關於startServiceOnPort的實現,請見附錄A。

代碼清單3-3  使用AkkaUtils工具類創建和啓動[計算機3] [初霖4] ActorSystem

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. val(actorSystem, boundPort) =  
  2.  Option(defaultActorSystem) match {  
  3.     case Some(as)=> (as, port)  
  4.     case None =>  
  5.       valactorSystemName =if (isDriver) driverActorSystemNameelse executorActorSystemName  
  6.      AkkaUtils.createActorSystem(actorSystemName, hostname, port, conf,securityManager)  
  7.   }  

代碼清單3-4  ActorSystem的創建和啓動

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. def createActorSystem(  
  2.     name:String,  
  3.     host:String,  
  4.     port:Int,  
  5.     conf:SparkConf,  
  6.     securityManager: SecurityManager):(ActorSystem, Int) = {  
  7.   val startService: Int=> (ActorSystem, Int) = { actualPort =>  
  8.    doCreateActorSystem(name, host, actualPort, conf, securityManager)  
  9.   }  
  10.  Utils.startServiceOnPort(port, startService, conf, name)  
  11. }  

3.2.3 map任務輸出跟蹤器mapOutputTracker

         mapOutputTracker用於跟蹤map階段任務的輸出狀態,此狀態便於reduce階段任務獲取地址及中間輸出結果。每個map任務或者reduce任務都會有其唯一標識,分別爲mapId和reduceId。每個reduce任務的輸入可能是多個map任務的輸出,reduce會到各個map任務的所在節點上拉取Block,這一過程叫做shuffle。每批shuffle過程都有唯一的標識shuffleId。

         這裏先介紹下MapOutputTrackerMaster。MapOutputTrackerMaster內部使用mapStatuses:TimeStampedHashMap[Int,Array[MapStatus]]來維護跟蹤各個map任務的輸出狀態。其中key對應shuffleId,Array存儲各個map任務對應的狀態信息MapStatus。由於MapStatus維護了map輸出Block的地址BlockManagerId,所以reduce任務知道從何處獲取map任務的中間輸出。MapOutputTrackerMaster還使用cachedSerializedStatuses:TimeStampedHashMap[Int, Array[Byte]]維護序列化後的各個map任務的輸出狀態。其中key對應shuffleId,Array存儲各個序列化MapStatus生成的字節數組。

         Driver和Executor處理MapOutputTrackerMaster的方式有所不同:

  • 如果當前應用程序是Driver,則創建MapOutputTrackerMaster,然後創建MapOutputTrackerMasterActor,並且註冊到ActorSystem中。
  • 如果當前應用程序是Executor,則創建MapOutputTrackerWorker,並從ActorSystem中找到MapOutputTrackerMasterActor。

無論是Driver還是Executor,最後都由mapOutputTracker的屬性trackerActor持有MapOutputTrackerMasterActor的引用,參見代碼清單3-5。

代碼清單3-5  registerOrLookup方法用於查找或者註冊Actor的實現

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. def registerOrLookup(name: String, newActor: => Actor): ActorRef ={  
  2.       if (isDriver) {  
  3.        logInfo("Registering" + name)  
  4.         actorSystem.actorOf(Props(newActor),name = name)  
  5.       } else {  
  6.        AkkaUtils.makeDriverRef(name, conf, actorSystem)  
  7.       }  
  8.     }  
  9.    
  10.     val mapOutputTracker=  if (isDriver) {  
  11.       newMapOutputTrackerMaster(conf)  
  12.     } else {  
  13.       newMapOutputTrackerWorker(conf)  
  14. }  
  15.    
  16.     mapOutputTracker.trackerActor= registerOrLookup(  
  17.      "MapOutputTracker",  
  18.       newMapOutputTrackerMasterActor(mapOutputTracker.asInstanceOf[MapOutputTrackerMaster], conf))  

在後面章節大家會知道map任務的狀態正是由Executor向持有的MapOutputTrackerMasterActor發送消息,將map任務狀態同步到mapOutputTracker的mapStatuses和cachedSerializedStatuses的。Executor究竟是如何找到MapOutputTrackerMasterActor的?registerOrLookup方法通過調用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,實際正是利用ActorSystem提供的分佈式消息機制實現的,具體細節參見附錄B。這裏第一次使用到了Akka提供的功能,以後大家會漸漸感覺到使用Akka的便捷。

3.2.4 實例化ShuffleManager

         ShuffleManager負責管理本地及遠程的block數據的shuffle操作。ShuffleManager默認爲通過反射方式生成的SortShuffleManager的實例,可以修改屬性spark.shuffle.manager爲hash來顯式[計算機5] [初霖6] 使用HashShuffleManager。SortShuffleManager通過持有的IndexShuffleBlockManager間接操作BlockManager中的DiskBlockManager將map結果寫入本地,並根據shuffleId、mapId寫入索引文件,也能通過MapOutputTrackerMaster中維護的mapStatuses從本地或者其他遠程節點讀取文件。有讀者可能會問,爲什麼需要shuffle?Spark作爲並行計算框架,同一個作業會被劃分爲多個任務在多個節點上並行執行,reduce的輸入可能存在於多個節點上,因此需要通過「洗牌」將所有reduce的輸入彙總起來,這個過程就是shuffle。這個問題以及對ShuffleManager的具體使用會在第5章和第6章詳述。ShuffleManager的實例化見代碼清單3-6。代碼清單3-6最後創建的ShuffleMemoryManager,將在3.2.5節介紹。

代碼清單3-6  ShuffleManager的實例化及ShuffleMemoryManager的創建

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. val shortShuffleMgrNames =Map(  
  2.   "hash"-> "org.apache.spark.shuffle.hash.HashShuffleManager",  
  3.   "sort"-> "org.apache.spark.shuffle.sort.SortShuffleManager")  
  4. val shuffleMgrName = conf.get("spark.shuffle.manager""sort")  
  5. val shuffleMgrClass = shortShuffleMgrNames.get  
  6. se(shuffleMgrName.toLowerCase, shuffleMgrName)  
  7. val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)  
  8.   
  9. val shuffleMemoryManager =new ShuffleMemoryManager(conf)  
 

3.2.5 shuffle線程內存管理器ShuffleMemoryManager

         ShuffleMemoryManager負責管理shuffle線程佔有內存的分配與釋放,並通過threadMemory:mutable.HashMap[Long, Long]緩存每個線程的內存字節數,見代碼清單3-7。

代碼清單3-7  ShuffleMemoryManager的數據結構

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. private[spark] class ShuffleMemoryManager(maxMemory: Long)extends Logging {  
  2.   private val threadMemory = newmutable.HashMap[Long, Long]() // threadId -> memory bytes  
  3.   def this(conf: SparkConf) = this(ShuffleMemoryManager.getMaxMemory(conf))  
 

getMaxMemory方法用於獲取shuffle所有線程佔用的最大內存,實現如下。

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. def getMaxMemory(conf: SparkConf): Long = {  
  2.     val memoryFraction =conf.getDouble("spark.shuffle.memoryFraction"0.2)  
  3.     val safetyFraction =conf.getDouble("spark.shuffle.safetyFraction"0.8)  
  4.    (Runtime.getRuntime.maxMemory * memoryFraction *safetyFraction).toLong  
  5.   }  

從上面代碼可以看出,shuffle所有線程佔用的最大內存的計算公式爲:

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. def getMaxMemory(conf: SparkConf): Long = {  
  2.     val memoryFraction =conf.getDouble("spark.shuffle.memoryFraction"0.2)  
  3.     val safetyFraction =conf.getDouble("spark.shuffle.safetyFraction"0.8)  
  4.    (Runtime.getRuntime.maxMemory * memoryFraction *safetyFraction).toLong  
  5.   }  

從上面代碼可以看出,shuffle所有線程佔用的最大內存的計算公式爲:

Java運行時最大內存 * Spark的shuffle最大內存佔比 * Spark的安全內存佔比

可以配置屬性spark.shuffle.memoryFraction修改Spark的shuffle最大內存佔比,配置屬性spark.shuffle.safetyFraction修改Spark的安全內存佔比。

注意:ShuffleMemoryManager通常運行在Executor中, Driver中的ShuffleMemoryManager 只有在local模式下才起作用。

 

3.2.6 塊傳輸服務BlockTransferService

         BlockTransferService默認爲NettyBlockTransferService(可以配置屬性spark.shuffle.blockTransferService使用NioBlockTransferService),它使用Netty提供的異步事件驅動的網絡應用框架,提供web服務及客戶端,獲取遠程節點上Block的集合。

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. val blockTransferService =  
  2.      conf.get("spark.shuffle.blockTransferService","netty").toLowerCase match {  
  3.         case "netty"=>  
  4.         case "netty"=>  
  5.           newNettyBlockTransferService(conf, securityManager, numUsableCores)  
  6.         case "nio"=>  
  7.           newNioBlockTransferService(conf, securityManager)  
  8.       }  

NettyBlockTransferService的具體實現將在第4章詳細介紹。這裏大家可能覺得奇怪,這樣的網絡應用爲何也要放在存儲體系?大家不妨先帶着疑問,直到你真正瞭解存儲體系。

3.2.7 BlockManagerMaster介紹

BlockManagerMaster負責對Block的管理和協調,具體操作依賴於BlockManagerMasterActor。Driver和Executor處理BlockManagerMaster的方式不同:

  • 如果當前應用程序是Driver,則創建BlockManagerMasterActor,並且註冊到ActorSystem中。
  • 如果當前應用程序是Executor,則從ActorSystem中找到BlockManagerMasterActor。

無論是Driver還是Executor,最後BlockManagerMaster的屬性driverActor將持有對BlockManagerMasterActor的引用。BlockManagerMaster的創建代碼如下。

[java]  view plain  copy
  在CODE上查看代碼片 派生到我的代碼片
  1. val blockManagerMaster = new BlockManagerMaster(registerOrLookup(  
  2.      "BlockManagerMaster",  
  3.       newBlockManagerMasterActor(isLocal, conf, listenerBus)), conf, isDriver)  

registerOrLookup已在3.2.3節介紹過了,不再贅述。BlockManagerMaster及BlockManagerMasterActor的具體實現將在第4章詳細介紹。

3.2.8 創建塊管理器BlockManager

         BlockManager負責對Block的管理,只有在BlockManager的初始化方法initialize被調用後,它纔是有效的。BlockManager作爲存儲系統的一部分,具體實現見第4章。BlockManager的創建代碼如下。

[java] 

[java] 
相關文章
相關標籤/搜索