要想學習理解一款流行分佈式系統的源碼不是一件容易的事情,必定要屢次迭代,看無數遍而且領悟其設計思想。第一次看不要糾結於細節,每次迭代過程當中增長一點點細節的理解,最終達到豁然開朗的地步。緩存
學習優秀的源代碼是提升自身技能的最好途徑,比作無數個低水平的項目效果要顯著的多,好了,閒話少說,讓咱們試圖來理解Spark的世界吧。網絡
一、大框架框架
首先要掌握幾個基本概念,Spark是分佈式計算框架,核心思想是經過將計算任務儘可能分配到源數據一致的機器上執行,下降網絡延時;同時引入Dag圖依賴關係生成一系列計算任務,固然緩存等機制是不可避免要用到的,爲了提升性能嘛。分佈式
大部分分佈式計算的核心思想是相似的,經過成熟的分佈式框架在集羣間通訊和同步消息,提供橫向擴展能力,知足大數據計算的需求。在Spark中具體的分佈式消息傳遞是經過Akka模塊來支持的。函數
核心類:oop
SparkContext,DagSchedule,TaskScheduleImp(TaskSchedule的實現),Stage,Task,TaskDescription,TaskInfo,RDD,BlockManager等性能
(1)SparkContext學習
建立spark任務主要是經過它來完成的,Spark程序上下文,裏面包含了DagSchedule,TaskSchedule,BlockManager等等。大數據
(2)DagSchedulespa
矢量圖計算,解析整個Spark任務生成Stage調用樹,每一個Stage的劃分主要是看該Stage是否包含Shuffle過程來決定,Stage在調用的時候生成TaskSet,並經過TaskSchedule分配到具體的Executor上,每一個TaskSet包含1到多個Task,Task具體分紅ResultTask和ShuffleResultTask兩種,兩種的區別從命名上就能夠區分出來,前者直接計算獲得結果,後者是和Shuffle過程相關的。
(3)TaskScheduleImp
經過具體的TaskScheduleEndpoint在集羣間通訊,發佈任務等。
與集羣通訊是經過TaskScheduleEndpoint來執行的。最經常使用的是CoarseGrainedSchedulerBackend
(4)Executor
執行單元,通常每臺集羣機器會分配一個Executor,每一個Executor管理本地的多個TaskSet。
Executor經過ExecutorBackend來和Master的ScheduleEndpoint通訊,相應的最經常使用的Endpoint是CoarseGrainedExecutorBackend。
(5)RDD
數據集,Spark定義了不少的數據集(RDD),好比HadoopRDD,JdbcRDD等。
RDD中的具體的數據有時是經過BlockManager來管理的,RDD中能尋址到數據所在的機器。
(6)Task
具體的任務,Master定義好Task後發佈到集羣中對應機器的Exector去執行,執行結果經過DirectResult和IndirectResult返回,後者經過包含告終果數據所在的Shuffle地址或者塊地址等尋址信息。
(7)BlockManager
管理整個集羣的Block,默認Block大小是128M,內存和磁盤數據的對應關係等也是經過相關的類來管理的。
以上是初步的比較籠統的一個框架結構,主要用於增強理解,要想更好的理解Spark必需要經過不斷的讀源碼,後續時間筆者會依次和你們分享更具體的源碼心得。
(8)MapOutTracker
跟蹤整個集羣的MapStatus,不一樣集羣之間經過MapOutTrackerMaster等通訊來同步信息。
二、DagSchedule任務調度
class DAGScheduler(
private[scheduler] val sc: SparkContext,
private[scheduler] val taskScheduler: TaskScheduler,
listenerBus: LiveListenerBus,
mapOutputTracker: MapOutputTrackerMaster,
blockManagerMaster: BlockManagerMaster,
env: SparkEnv,
clock: Clock = new SystemClock())
extends Logging
(1)Dag接受到的各類命令都經過Dag內部事件的方式被執行(而不是直接執行),便於Dag內部作一些排序調度的判斷準備工做。
命令的種類:心跳、Exector的註冊註銷、Task的執行結束失敗
(2)兩種主要的stage:ResultStage,ShuffleMapStage
getShuffleMapStage:生成或者得到Shuffle類型的stage,同時要生成Shuffle的locs位置信息在裏面
getParentStage:根據Rdd各依賴Rdd的Shuffle屬性,獲取全部Stage列表,這些Stage執行完成以後才能執行本Stage。
getMissingParentStages:獲取全部缺失的父Stage,這裏只判斷Shuffle Stage的狀況,ResultStage不考慮,若是缺失則生成該ShuffleToMapStage。
其餘一系列維護Stage、Job關係的屬性和方法。
(3)主要的Submit方式
SubmitJob:提交做業
生成該做業的finalStage,而後再生成ActiveJob,組裝各狀態HashMap,提交該finalStage
SubmitStage:提交某個Stage,與SubmitJob的區別是不生成新的ActiveJob。
submitMissingTasks:真正的提交Stage,當全部父Stage都準備就緒時執行,要重點看。stage的全部partitions生成多個Task。最後將這些tasks合併成TaskSet並提交到TaskSchedule(taskScheduler.submitTasks( new TaskSet(...))).
(4)TaskCompleted事件處理
廣播事件到listenerBus;
找到Task所在的Stage:
a、若是是ResultTask,則更新該Stage屬於的Job的狀態,並判斷Stage是否全部Task都執行完成,若是是則觸發StageCompleted事件。好像也多是JobCompleted事件,還要再看一次。
b、ShuffleMapTask,則找到對應的ShuffleStage,更新對應的Task所在分區的MapStatus(或者位置信息等),更新mapOutputTracker狀態屬性,最後啓動全部知足提交狀態的等待Stage(waitingStages)。
(5)handleExecutorAdded
當有額外的Executor加進來的時候,只是執行SubmitWaitingStage命令,這時候可能會有等待Stage知足了執行條件。
三、Task
ResultTask:直接在RDD上執行func
ShuffleMapTask:執行ShuffleDependcy的shuffleHandler方法,主要是一些聚合函數的計算,而後將對應的RDD分區數據執行這些聚合計算以後的結果寫入到shuffleManager管理的writer中去,多是寫入到內存也可能寫入到disk(猜想),shuffleManager通常會用到BlockManager來管理數據的存儲。
Shuffle結果通常存儲到集羣的臨時目錄中,具體規則可參見DiskBlockManager和FileShuffleBlockResolver等類實現。
具體內容參見代碼。
shuffleManager有這麼幾種:
(1)HashShuffleManager
(2)SortShuffleManager
固然也能夠自定義