Spark源碼分析心得

要想學習理解一款流行分佈式系統的源碼不是一件容易的事情,必定要屢次迭代,看無數遍而且領悟其設計思想。第一次看不要糾結於細節,每次迭代過程當中增長一點點細節的理解,最終達到豁然開朗的地步。緩存

學習優秀的源代碼是提升自身技能的最好途徑,比作無數個低水平的項目效果要顯著的多,好了,閒話少說,讓咱們試圖來理解Spark的世界吧。網絡

一、大框架框架

首先要掌握幾個基本概念,Spark是分佈式計算框架,核心思想是經過將計算任務儘可能分配到源數據一致的機器上執行,下降網絡延時;同時引入Dag圖依賴關係生成一系列計算任務,固然緩存等機制是不可避免要用到的,爲了提升性能嘛。分佈式

大部分分佈式計算的核心思想是相似的,經過成熟的分佈式框架在集羣間通訊和同步消息,提供橫向擴展能力,知足大數據計算的需求。在Spark中具體的分佈式消息傳遞是經過Akka模塊來支持的。函數

核心類:oop

SparkContext,DagSchedule,TaskScheduleImp(TaskSchedule的實現),Stage,Task,TaskDescription,TaskInfo,RDD,BlockManager等性能

(1)SparkContext學習

建立spark任務主要是經過它來完成的,Spark程序上下文,裏面包含了DagSchedule,TaskSchedule,BlockManager等等。大數據

(2)DagSchedulespa

矢量圖計算,解析整個Spark任務生成Stage調用樹,每一個Stage的劃分主要是看該Stage是否包含Shuffle過程來決定,Stage在調用的時候生成TaskSet,並經過TaskSchedule分配到具體的Executor上,每一個TaskSet包含1到多個Task,Task具體分紅ResultTask和ShuffleResultTask兩種,兩種的區別從命名上就能夠區分出來,前者直接計算獲得結果,後者是和Shuffle過程相關的。

(3)TaskScheduleImp

經過具體的TaskScheduleEndpoint在集羣間通訊,發佈任務等。

與集羣通訊是經過TaskScheduleEndpoint來執行的。最經常使用的是CoarseGrainedSchedulerBackend

(4)Executor

執行單元,通常每臺集羣機器會分配一個Executor,每一個Executor管理本地的多個TaskSet。

Executor經過ExecutorBackend來和Master的ScheduleEndpoint通訊,相應的最經常使用的Endpoint是CoarseGrainedExecutorBackend。

(5)RDD

數據集,Spark定義了不少的數據集(RDD),好比HadoopRDD,JdbcRDD等。

RDD中的具體的數據有時是經過BlockManager來管理的,RDD中能尋址到數據所在的機器。

(6)Task

具體的任務,Master定義好Task後發佈到集羣中對應機器的Exector去執行,執行結果經過DirectResult和IndirectResult返回,後者經過包含告終果數據所在的Shuffle地址或者塊地址等尋址信息。

(7)BlockManager

管理整個集羣的Block,默認Block大小是128M,內存和磁盤數據的對應關係等也是經過相關的類來管理的。

以上是初步的比較籠統的一個框架結構,主要用於增強理解,要想更好的理解Spark必需要經過不斷的讀源碼,後續時間筆者會依次和你們分享更具體的源碼心得。

(8)MapOutTracker

跟蹤整個集羣的MapStatus,不一樣集羣之間經過MapOutTrackerMaster等通訊來同步信息。

 

二、DagSchedule任務調度

class DAGScheduler(
    private[scheduler] val sc: SparkContext,
    private[scheduler] val taskScheduler: TaskScheduler,
    listenerBus: LiveListenerBus,
    mapOutputTracker: MapOutputTrackerMaster,
    blockManagerMaster: BlockManagerMaster,
    env: SparkEnv,
    clock: Clock = new SystemClock())
  extends Logging

(1)Dag接受到的各類命令都經過Dag內部事件的方式被執行(而不是直接執行),便於Dag內部作一些排序調度的判斷準備工做。

命令的種類:心跳、Exector的註冊註銷、Task的執行結束失敗

(2)兩種主要的stage:ResultStage,ShuffleMapStage

getShuffleMapStage:生成或者得到Shuffle類型的stage,同時要生成Shuffle的locs位置信息在裏面

getParentStage:根據Rdd各依賴Rdd的Shuffle屬性,獲取全部Stage列表,這些Stage執行完成以後才能執行本Stage。

getMissingParentStages:獲取全部缺失的父Stage,這裏只判斷Shuffle Stage的狀況,ResultStage不考慮,若是缺失則生成該ShuffleToMapStage。

其餘一系列維護Stage、Job關係的屬性和方法。

(3)主要的Submit方式

SubmitJob:提交做業

生成該做業的finalStage,而後再生成ActiveJob,組裝各狀態HashMap,提交該finalStage

SubmitStage:提交某個Stage,與SubmitJob的區別是不生成新的ActiveJob。

submitMissingTasks:真正的提交Stage,當全部父Stage都準備就緒時執行,要重點看。stage的全部partitions生成多個Task。最後將這些tasks合併成TaskSet並提交到TaskSchedule(taskScheduler.submitTasks( new TaskSet(...))).

(4)TaskCompleted事件處理

廣播事件到listenerBus;

找到Task所在的Stage:

a、若是是ResultTask,則更新該Stage屬於的Job的狀態,並判斷Stage是否全部Task都執行完成,若是是則觸發StageCompleted事件。好像也多是JobCompleted事件,還要再看一次。

b、ShuffleMapTask,則找到對應的ShuffleStage,更新對應的Task所在分區的MapStatus(或者位置信息等),更新mapOutputTracker狀態屬性,最後啓動全部知足提交狀態的等待Stage(waitingStages)。

(5)handleExecutorAdded

當有額外的Executor加進來的時候,只是執行SubmitWaitingStage命令,這時候可能會有等待Stage知足了執行條件。

 

三、Task

ResultTask:直接在RDD上執行func

ShuffleMapTask:執行ShuffleDependcy的shuffleHandler方法,主要是一些聚合函數的計算,而後將對應的RDD分區數據執行這些聚合計算以後的結果寫入到shuffleManager管理的writer中去,多是寫入到內存也可能寫入到disk(猜想),shuffleManager通常會用到BlockManager來管理數據的存儲。

Shuffle結果通常存儲到集羣的臨時目錄中,具體規則可參見DiskBlockManager和FileShuffleBlockResolver等類實現。

具體內容參見代碼。

shuffleManager有這麼幾種:

(1)HashShuffleManager

(2)SortShuffleManager

固然也能夠自定義

相關文章
相關標籤/搜索