Application:咱們本身的 Spark 程序。算法
TaskRunner:將咱們編寫的代碼,也就是要執行的算子以及函數拷貝,反序列化,而後執行 task。緩存
Task:task 有兩種 ShuffleMapTask 和 ResultTask,只有最後一個 Stage 是 ResultTask,其他是 ShuffleMapTask。網絡
1.submit 提交,在當前節點啓動一個 Driver 進程函數
2 . Driver執行Application(Application老是先建立SparkContext)構造SparkContext在初始化的時候作的最重要的兩件事情就是構建DAGScheduler和TaskScheduler源碼分析
3.在構造出 TaskScheduler 的時候,TaskScheduler 會你負責鏈接 Master,並向Master 註冊 Application。
4.Master 通知 Worker 啓動 Executor;Master 接收到 Application 註冊的請求後會使用本身的資源調度算在 Spark 集羣的 Worker 上讓 Worker 爲這個 Application啓動多個 Executor。(調度算法)性能
5. Executor 啓動以後會反向註冊到 TaskScheduler 上。當全部 Executor 都註冊到Driver 上Driver 的 SparkContext 初始化結束,會繼續執行咱們的代碼。優化
6.每執行到一個 Action 操做就會建立一個 job;並把 job 提交給 DAGScheduler。線程
7.DAGScheduler 會將 job 劃分爲多個 stage,而後每一個 stage 建立一個 TaskSet(裏面包含了不少 task)。並把每個 Taskset 給 TaskScheduler。(stage 劃分算法)blog
8.TaskScheduler 會把 TaskSet 裏的每個 Task 提交到 Executor 上運行(task 分配算法)遞歸
9.Executor 每收到一個 Task 都會用一個 TaskRunner 來封裝 task,而後從線程池裏取出一個線程來執行 task。因此最後整個 Spark 應用程序的執行及時 Stage 分批次做爲 TaskSet 提交到Executor 執行。每一個 Task 針對 RDD 的一個 partition 執行咱們定義的算子和函數。
Master
1.主備切換機制
2.註冊機制
Worker 的註冊:
第一步 Worker 啓動以後就會主動向 Master 進行註冊
第二步過濾,將狀態是 dead 的 Worker 過濾掉,對於狀態爲 unknown 的Worker,清理舊的 Worker 信息替換爲新的 Worker 信息
第三步把 Worker 加入內存緩存中(HashMap)
第四步用持久化引擎將 Worker 信息進行持久化
第五步調用 scheduler()方法進行調度
Driver 的註冊:
第一步用 Spark-submit 提交 Application 時首先就會註冊 Driver
第二步把 Driver 信息加入內存緩存中(HashMap)
第三步加入等待調度隊列(ArrayBuffer)
第四步用持久化引擎將 Worker 信息進行持久化
第五步調用 scheduler()方法進行調度
Application 的註冊(很重要):
第一步 Driver 啓動好以後會執行咱們的 Application 代碼,初始化SparkContext,底層的 SparkDeploySchedulerBackend 會向 Master 註冊
第二步把 Application 信息加入內存緩存中(HashMap)
第三步加入等待調度 Application 隊列(ArrayBuffer)
第四步用持久化引擎將 Application 信息進行持久化
第五步調用 scheduler()方法進行調度
三、狀態改變處理機制源碼分析
四、資源調度機制源碼分析(schedule(),兩種資源調度算法)
Application 的調度
spreadOutApps:每一個可用 Worker 均分
非 spreadOutApps:儘量少的 Worker 數量
DAGScheduler
Stage 劃分算法:
從觸發 Action 操做的那個 rdd 往前倒推,首先爲最後一個 rdd 建立一個 stage,而後往前倒推,若是發現某個 rdd 是寬依賴,那麼就會將寬依賴的那個 rdd 劃分爲一個新的 stage,而後以此類推,直到全部 rdd 遍歷完爲止。源代碼歸結爲如下三部:
從 finalStage 倒推
經過寬依賴來進行新的 Stage 的劃分
使用遞歸優先提交父 Stage
對於每一種有 shuffle 的操做,好比:groupByKey、reduceByKey、countByKey底層都對應了三個 RDD:MapPartitionsRDD、ShuffleRDD、MapPartitionsRDD,第一對應一個 Stage,後兩個對應一個 Stage
Task 的最佳位置
Stage 從最後一個 RDD 開始找,哪一個 RDD 的 Partition 被 cache 或者 checkpoint了,那麼 task 的最佳位置就是 cache 或者 checkpoint 的位置。好處:這樣 task在該節點執行,直接找緩存或者 checkpoint 的數據,不用再計算以前的 RDD 了。找完全部的 RDD 若是都沒有緩存或者 checkpoint,那麼久沒有最佳位置,那麼此task 就要遵循 TaskScheduler 的分配。
TaskScheduler
給每一個 TaskSet 都會建立一個 TaskSetManager,TaskSetManager 會負責他的那個 TaskSet 的運行情況的監視和管理。
Task 的分配:Spark 用本地化級別這種模型去優化 Task 的分配和啓動,優先但願在最佳本地化的地方啓動 Task
PROCESS_LOCAL:進程本地化,代碼和數據在同一個進程中,也就是在同一個 executor 中;計算數據的 task 由 executor 執行,數據在 executor 的 BlockManager中;性能最好NODE_LOCAL:節點本地化,代碼和數據在同一個節點中;好比說,數據做爲一個 HDFS block 塊,就在節點上,而 task 在節點上某個 executor 中運行;或者是,數據和 task 在一個節點上的不一樣 executor 中;數據須要在進程間進行傳輸
NO_PREF:對於 task 來講,數據從哪裏獲取都同樣,沒有好壞之分
RACK_LOCAL:機架本地化,數據和 task 在一個機架的兩個節點上;數據須要經過網絡在節點之間進行傳輸
ANY:數據和 task 可能在集羣中的任何地方,並且不在一個機架中,性能最差