Spark運行原理

在大數據領域,只有深挖數據科學領域,走在學術前沿,才能在底層算法和模型方面走在前面,從而佔據領先地位。算法

Spark的這種學術基因,使得它從一開始就在大數據領域創建了必定優點。不管是性能,仍是方案的統一性,對比傳統的Hadoop,優點都很是明顯。Spark提供的基於RDD的一體化解決方案,將MapReduce、Streaming、SQL、Machine Learning、Graph Processing等模型統一到一個平臺下,並以一致的API公開,並提供相同的部署方案,使得Spark的工程應用領域變得更加普遍。本文主要分如下章節:api

1、Spark專業術語定義多線程

2、Spark運行基本流程架構

3、Spark運行架構特色分佈式

4、Spark核心原理透視函數

1、Spark專業術語定義oop

一、Application:Spark應用程序性能

指的是用戶編寫的Spark應用程序,包含了Driver功能代碼和分佈在集羣中多個節點上運行的Executor代碼。大數據

Spark應用程序,由一個或多個做業JOB組成,以下圖所示。優化

Spark應用程序組成

二、Driver:驅動程序

Spark中的Driver即運行上述Application的Main()函數而且建立SparkContext,其中建立SparkContext的目的是爲了準備Spark應用程序的運行環境。在Spark中由SparkContext負責和ClusterManager通訊,進行資源的申請、任務的分配和監控等;當Executor部分運行完畢後,Driver負責將SparkContext關閉。一般SparkContext表明Driver,以下圖所示。

Driver驅動程序組成

三、Cluster Manager:資源管理器

指的是在集羣上獲取資源的外部服務,經常使用的有:Standalone,Spark原生的資源管理器,由Master負責資源的分配;Haddop Yarn,由Yarn中的ResearchManager負責資源的分配;Messos,由Messos中的Messos Master負責資源管理,以下圖所示。

Cluster Manager含義

四、Executor:執行器

Application運行在Worker節點上的一個進程,該進程負責運行Task,而且負責將數據存在內存或者磁盤上,每一個Application都有各自獨立的一批Executor,以下圖所示。

Executor運行原理

五、Worker:計算節點

集羣中任何能夠運行Application代碼的節點,相似於Yarn中的NodeManager節點。在Standalone模式中指的就是經過Slave文件配置的Worker節點,在Spark on Yarn模式中指的就是NodeManager節點,在Spark on Messos模式中指的就是Messos Slave節點,以下圖所示。

Worker運行原理

六、RDD:彈性分佈式數據集

Resillient Distributed Dataset,Spark的基本計算單元,能夠經過一系列算子進行操做(主要有Transformation和Action操做),以下圖所示。

RDD組成圖解

七、窄依賴

父RDD每個分區最多被一個子RDD的分區所用;表現爲一個父RDD的分區對應於一個子RDD的分區,或兩個父RDD的分區對應於一個子RDD 的分區。如圖所示。

窄依賴圖解

八、寬依賴

父RDD的每一個分區均可能被多個子RDD分區所使用,子RDD分區一般對應全部的父RDD分區。如圖所示。

寬依賴圖解

常見的窄依賴有:map、filter、union、mapPartitions、mapValues、join(父RDD是hash-partitioned :若是JoinAPI以前被調用的RDD API是寬依賴(存在shuffle), 並且兩個join的RDD的分區數量一致,join結果的rdd分區數量也同樣,這個時候join api是窄依賴)。

常見的寬依賴有groupByKey、partitionBy、reduceByKey、join(父RDD不是hash-partitioned :除此以外的,rdd 的join api是寬依賴)。

九、DAG:有向無環圖

Directed Acycle graph,反應RDD之間的依賴關係,如圖所示。

DAG圖解

十、DAGScheduler:有向無環圖調度器

基於DAG劃分Stage 並以TaskSet的形勢提交Stage給TaskScheduler;負責將做業拆分紅不一樣階段的具備依賴關係的多批任務;最重要的任務之一就是:計算做業和任務的依賴關係,制定調度邏輯。在SparkContext初始化的過程當中被實例化,一個SparkContext對應建立一個DAGScheduler。

DAGScheduler圖解

十一、TaskScheduler:任務調度器

將Taskset提交給worker(集羣)運行並回報結果;負責每一個具體任務的實際物理調度。如圖所示。

TaskScheduler圖解

十二、Job:做業

由一個或多個調度階段所組成的一次計算做業;包含多個Task組成的並行計算,每每由Spark Action催生,一個JOB包含多個RDD及做用於相應RDD上的各類Operation。如圖所示。

Job圖解

1三、Stage:調度階段

一個任務集對應的調度階段;每一個Job會被拆分不少組Task,每組任務被稱爲Stage,也可稱TaskSet,一個做業分爲多個階段;Stage分紅兩種類型ShuffleMapStage、ResultStage。如圖所示。

Stage圖解

1四、TaskSet:任務集

由一組關聯的,但相互之間沒有Shuffle依賴關係的任務所組成的任務集。如圖所示。

Stage圖解

提示:

1)一個Stage建立一個TaskSet;

2)爲Stage的每一個Rdd分區建立一個Task,多個Task封裝成TaskSet

1五、Task:任務

被送到某個Executor上的工做任務;單個分區數據集上的最小處理流程單元。如圖所示。

Task圖解

整體如圖所示:

彙總圖解

2、Spark運行基本流程

Spark運行基本流程

Spark運行基本流程

3、Spark運行架構特色

一、Executor進程專屬

每一個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。如圖所示。

Executor進程圖解

二、支持多種資源管理器

Spark與資源管理器無關,只要可以獲取executor進程,並能保持相互通訊就能夠了,Spark支持資源管理器包含: Standalone、On Mesos、On YARN、Or On EC2。如圖所示。

資源管理器

三、Job提交就近原則

提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack(機架)裏,由於Spark Application運行過程當中SparkContext和Executor之間有大量的信息交換;若是想在遠程集羣中運行,最好使用RPC將SparkContext提交給集羣,不要遠離Worker運行SparkContext。如圖所示。

Job提交就近原則

四、移動程序而非移動數據的原則執行

Task採用了數據本地性和推測執行的優化機制。關鍵方法:taskIdToLocations、getPreferedLocations。如圖所示。

執行原則

4、Spark核心原理透視

一、計算流程

Spark計算流程

二、從代碼構建DAG圖

Spark program

Val lines1 = sc.textFile(inputPath1). map(···)). map(···)

Val lines2 = sc.textFile(inputPath2) . map(···)

Val lines3 = sc.textFile(inputPath3)

Val dtinone1 = lines2.union(lines3)

Val dtinone = lines1.join(dtinone1)

dtinone.saveAsTextFile(···)

dtinone.filter(···).foreach(···)

Spark的計算髮生在RDD的Action操做,而對Action以前的全部Transformation,Spark只是記錄下RDD生成的軌跡,而不會觸發真正的計算。

Spark內核會在須要計算髮生的時刻繪製一張關於計算路徑的有向無環圖,也就是DAG。

構建DAG圖

三、將DAG劃分爲Stage核心算法

Application多個job多個Stage:Spark Application中能夠由於不一樣的Action觸發衆多的job,一個Application中能夠有不少的job,每一個job是由一個或者多個Stage構成的,後面的Stage依賴於前面的Stage,也就是說只有前面依賴的Stage計算完畢後,後面的Stage纔會運行。

劃分依據:Stage劃分的依據就是寬依賴,什麼時候產生寬依賴,reduceByKey, groupByKey等算子,會致使寬依賴的產生。

核心算法:從後往前回溯,遇到窄依賴加入本stage,碰見寬依賴進行Stage切分。Spark內核會從觸發Action操做的那個RDD開始從後往前推,首先會爲最後一個RDD建立一個stage,而後繼續倒推,若是發現對某個RDD是寬依賴,那麼就會將寬依賴的那個RDD建立一個新的stage,那個RDD就是新的stage的最後一個RDD。而後依次類推,繼續繼續倒推,根據窄依賴或者寬依賴進行stage的劃分,直到全部的RDD所有遍歷完成爲止。

四、將DAG劃分爲Stage剖析

從HDFS中讀入數據生成3個不一樣的RDD,經過一系列transformation操做後再將計算結果保存回HDFS。能夠看到這個DAG中只有join操做是一個寬依賴,Spark內核會以此爲邊界將其先後劃分紅不一樣的Stage. 同時咱們能夠注意到,在圖中Stage2中,從map到union都是窄依賴,這兩步操做能夠造成一個流水線操做,經過map操做生成的partition能夠不用等待整個RDD計算結束,而是繼續進行union操做,這樣大大提升了計算的效率。

DAG劃分爲Stage剖析

五、相關代碼

SparkContext.runJob(…)

DAGScheduler.runJob(...)

DAGScheduler.submitJob(...)

DAGScheduler.handleJobSubmitted(…)

DAGScheduler.submitWaitingStages(…)

DAGScheduler.submitStage(…)

六、提交Stages

調度階段的提交,最終會被轉換成一個任務集的提交,DAGScheduler經過TaskScheduler接口提交任務集,這個任務集最終會觸發TaskScheduler構建一個TaskSetManager的實例來管理這個任務集的生命週期,對於DAGScheduler來講,提交調度階段的工做到此就完成了。而TaskScheduler的具體實現則會在獲得計算資源的時候,進一步經過TaskSetManager調度具體的任務到對應的Executor節點上進行運算。

提交Stages

七、相關代碼

submitMissingTasks. submitMissingTasks

submitMissingTasks.taskIdToLocations

DAGScheduler.getPreferredLocs(…)

TaskScheduler.submitTasks(…)

TaskScheduler.createTaskSetManager(…)

TaskSetManager負責管理TaskSchedulerImpl中一個單獨TaskSet,跟蹤每個task,若是task失敗,負責重試task直到達到task重試次數的最屢次數。

調度池構建器,SchedulableBuilder決定調度順序

八、監控Job、Task、Executor

DAGScheduler監控Job與Task:要保證相互依賴的做業調度階段可以獲得順利的調度執行,DAGScheduler須要監控當前做業調度階段乃至任務的完成狀況。這經過對外暴露一系列的回調函數來實現的,對於TaskScheduler來講,這些回調函數主要包括任務的開始結束失敗、任務集的失敗,DAGScheduler根據這些任務的生命週期信息進一步維護做業和調度階段的狀態信息。

DAGScheduler監控Executor的生命狀態:TaskScheduler經過回調函數通知DAGScheduler具體的Executor的生命狀態,若是某一個Executor崩潰了,則對應的調度階段任務集的ShuffleMapTask的輸出結果也將標誌爲不可用,這將致使對應任務集狀態的變動,進而從新執行相關計算任務,以獲取丟失的相關數據。

九、獲取任務執行結果

結果DAGScheduler:一個具體的任務在Executor中執行完畢後,其結果須要以某種形式返回給DAGScheduler,根據任務類型的不一樣,任務結果的返回方式也不一樣。

兩種結果,中間結果與最終結果:對於FinalStage所對應的任務,返回給DAGScheduler的是運算結果自己,而對於中間調度階段對應的任務ShuffleMapTask,返回給DAGScheduler的是一個MapStatus裏的相關存儲信息,而非結果自己,這些存儲位置信息將做爲下一個調度階段的任務獲取輸入數據的依據。

兩種類型,DirectTaskResult與IndirectTaskResult:根據任務結果大小的不一樣,ResultTask返回的結果又分爲兩類,若是結果足夠小,則直接放在DirectTaskResult對象內中,若是超過特定尺寸則在Executor端會將DirectTaskResult先序列化,再把序列化的結果做爲一個數據塊存放在BlockManager中,而後將BlockManager返回的BlockID放在IndirectTaskResult對象中返回給TaskScheduler,TaskScheduler進而調用TaskResultGetter將IndirectTaskResult中的BlockID取出並經過BlockManager最終取得對應的DirectTaskResult。

十、任務調度整體詮釋

任務調度整體詮釋

相關文章
相關標籤/搜索