即日起開始spark源碼閱讀之旅,這個過程是至關痛苦的,也許有大量的看不懂,可是天天一個方法,一點點看,相信總歸會有極大地提升的。那麼下面開始:java
建立sparkConf對象,那麼究竟它幹了什麼了類,從代碼層面,咱們能夠看到咱們須要setMaster啊,setAppName啊,set blabla啊。。。等等~算法
val sparkConf = new SparkConf().setMaster("local").setAppName("TopActiveLocations").set("spark.executor.memory", "3g")sql
那麼咱們就一點一點看一下,SparkConf是怎麼實現的:緩存
class SparkConf(loadDefaults: Boolean) extends Cloneable with Logging { import SparkConf._ /** Create a SparkConf that loads defaults from system properties and the classpath */ def this() = this(true) private val settings = new ConcurrentHashMap[String, String]() if (loadDefaults) { // Load any spark.* system properties for ((key, value) <- Utils.getSystemProperties if key.startsWith("spark.")) { set(key, value) } }
/** Set a configuration variable. */
def set(key: String, value: String): SparkConf = {
if (key == null) {
throw new NullPointerException("null key")
}
if (value == null) {
throw new NullPointerException("null value for " + key)
}
logDeprecationWarning(key)
settings.put(key, value)
this
}安全
你會發現,它聲明瞭一個settings的ConcurrentHashMap,用的正是 java.util.concurrent.ConcurrentHashMap,從ConcurrentHashMap代碼中能夠看出,它引入了一個「分段鎖」的概念,具體能夠理解爲把一個大的Map拆分紅N個小的HashTable,根據key.hashCode()來決定把key放到哪一個HashTable中。。。。額。。。咱們是在玩spark,言歸正傳。併發
而後呢在聲明對象是,SparkConf傳入的是一個boolean類型的變量,這個變量的做用是是否加載Spark的conf下的配置信息,這個從def this() = this(true)能夠看出,默認是爲true的,這也就是爲何咱們代碼中提交集羣,不用去專門set配置項的緣由,並且大部分夥計不知道這裏還能夠傳值~app
隨後,若是爲true的狀況下,它會去getSystemProperties進行加載。異步
def getSystemProperties: Map[String, String] = {
System.getProperties.stringPropertyNames().asScala
.map(key => (key, System.getProperty(key))).toMap
}分佈式
/** * Enumerates all key/value pairs in the specified hashtable * and omits the property if the key or value is not a string. * @param h the hashtable */ private synchronized void enumerateStringProperties(Hashtable<String, String> h) { if (defaults != null) { defaults.enumerateStringProperties(h); } for (Enumeration e = keys() ; e.hasMoreElements() ;) { Object k = e.nextElement(); Object v = get(k); if (k instanceof String && v instanceof String) { h.put((String) k, (String) v); } } }
最終都存入了以前的map中,咱們繼續深刻建立sparkContext對象。oop
val sc = new SparkContext(sparkConf)
而後我發現 它幹了一大堆一大堆的變態的事情,首先咱們看一下sparkContext的構造器:
class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient { // The call site where this SparkContext was constructed. private val creationSite: CallSite = Utils.getCallSite() // If true, log warnings instead of throwing exceptions when multiple SparkContexts are active private val allowMultipleContexts: Boolean = config.getBoolean("spark.driver.allowMultipleContexts", false) // In order to prevent multiple SparkContexts from being active at the same time, mark this // context as having started construction. // NOTE: this must be placed at the beginning of the SparkContext constructor. SparkContext.markPartiallyConstructed(this, allowMultipleContexts) val startTime = System.currentTimeMillis()
首先,建立了CallSite對象,那麼這個對象是幹什麼的呢,它存儲了線程棧中最靠近棧頂的用戶類及最靠近棧底的Scala或者Spark核心類信息。
這裏,config.getBoolean("spark.driver.allowMultipleContexts", false)默認爲false,曾經我覺得只能在spark中建立一個Sparkcontext對象,其實能夠建立多個(我勒個去啊,那是否是說明能夠同時建立streaming對象以及sparkContext對象,將streaming與sparksql同時聲明,一塊兒作數據處理了,有待驗證) 若是須要建立多個,就在配置參數中設置爲true. markPartiallyConstructed會確保其惟一性。
接下來呢會拷貝config,而且進行默認值賦值,與爲空判斷,這裏能夠看到spark.master 和spark.app.name 是必須設置的,不然會拋出。
隨之調用
SparkEnv.createDriverEnv(conf, isLocal, listenerBus, SparkContext.numDriverCores(master))方法,建立SparkEnv.查閱資料,SparkEnv呢,又幹了N多事情以下:
1.建立安全管理器SecurityManager;
SecurityManager主要對權限、帳號進行設置,若是使用Hadoop YARN做爲集羣管理器,則須要使用證書生成secret key登陸,最後給當前系統設置默認的口令認證明例。
2.基於Akka的分佈式消息系統ActorSystem
Scala認爲Java線程經過共享數據以及經過鎖來維護共享數據的一致性是糟糕的作法,容易引發鎖的爭用,下降併發程序的性能,甚至會引入死鎖的問題。在Scala中只須要自定義類型繼承Actor,而且提供act方法,就如同Java裏實現Runnable接口,須要實現run方法同樣。可是不能直接調用act方法,而是經過發送消息的方式(Scala發送消息是異步的)傳遞數據。
3.下來呢,該建立MapOutputTrackerMaster或MapOutputTrackerWorker,那麼他倆是什麼呢?map任務的狀態正是由Executor向持有的MapOutputTracker-MasterActor發送消息,將map任務狀態同步到mapOutputTracker的mapStatuses,Executor到底是如何找到MapOutputTrackerMasterActor的?registerOrLookup方法經過調用AkkaUtils.makeDriverRef找到MapOutputTrackerMasterActor,利用ActorSystem提供的分佈式消息機制實現的.
4.隨之開始對ShuffleManager實例進行建立及加載。
ShuffleManager默認爲經過反射方式生成的SortShuffleManager的實例,能夠修改屬性spark.shuffle.manager爲hash來顯式控制使用HashShuffleManager。這裏再說明下,什麼是shuffle?shuffle就是個混洗的過程,同一個做業會被劃分爲多個任務在多個節點上並行執行,reduce的輸入可能存在於多個節點上,須要經過「洗牌」將全部reduce的輸入彙總起來,這個過程就是shuffle。 那麼spark是經過反射,來加載對應配置項的實體類:
ShuffleMemoryManager負責管理shuffle線程佔有內存的分配與釋放,並經過thread-Memory:mutable.HashMap[Long,Long]緩存每一個線程的內存字節數。出,shuffle全部線程佔用的最大內存的計算公式爲:
Java運行時最大內存*Spark的shuffle最大內存佔比*Spark的安全內存佔比,能夠配置屬性spark.shuffle.memoryFraction修改Spark的shuffle最大內存佔比,配置屬性spark.shuffle.safetyFraction修改Spark的安全內存,以下代碼:
5.下來,建立BlockManager,BlockManager負責對Block的管理,只有在BlockManager的初始化方法initialize被調用後,它纔是有效的。BlockManager做爲存儲系統的一部分。這麼就繼續深刻,圍繞BlockManager進行閱讀。
查閱資料,BlockManager主要由如下部分組成:
·shuffle客戶端ShuffleClient;
·BlockManagerMaster(對存在於全部Executor上的BlockManager統一管理);
·磁盤塊管理器DiskBlockManager
·磁盤存儲DiskStore;
·Tachyon存儲TachyonStore;
·非廣播Block清理器metadataCleaner和廣播Block清理器broadcastCleaner;
·壓縮算法實現
ShuffleServerId默認使用當前BlockManager的BlockManagerId。BlockManager的初始化:
那麼BlockManager的實質運行機制以下圖:
(1)表示Executor 的BlockManager中的BlockManagerMaster與Driver的BlockManagerActor進行消息通訊,好比註冊BlockManager、更新Block的信息、獲取Block所在的BlockManager、刪除Executor等。
(2)是shuffleRead與shufflewrite過程,也是BlockManager的讀寫操做。
(3)當內存不足時,寫入磁盤,寫入磁盤的數據也是由DiskBlockManager進行管理。
(4)經過訪問遠端節點的Executor的BlockManager中的TransportServer提供的RPC服務下載或者上傳Block;
(5)遠端節點的Executor的BlockManager訪問本地Executor的BlockManager中的TransportServer提供的RPC服務下載或者上傳Block;
(6)當存儲體系選擇Tachyon做爲存儲時,對於BlockManager的讀寫操做實際調用了TachyonStore的putBytes、putArray、putIterator、getBytes、getValues等。
以上過程就發生在咱們提交jar包或啓動thriftServer的時候,只要注意看日誌就會發現。好了,今天就到這裏,明天繼續玩~
參考文獻:《深刻理解Spark核心思想與源碼解析》