Spark 學習(七) Spark的運行流程

一,Spark中的基本概念html

二,Spark的運行流程web

三,Spark在不一樣集羣的運行架構算法

  3.1 Spark on Standalone運行流程shell

  3.2 Spark on YARN運行過程安全

 

 

正文

文章原文:https://www.cnblogs.com/qingyunzong/p/8945933.html多線程

一,Spark中的基本概念

  在進行Spark的運做流程分析前請看下圖:架構

  

  在這對上面的這些名詞進行解釋:框架

  (1)Application:表示你的應用程序異步

  (2)Driver:表示main()函數,建立SparkContext。由SparkContext負責與ClusterManager通訊,進行資源的申請,任務的分配和監控等。程序執行完畢後關閉SparkContext分佈式

  (3)Executor:某個Application運行在Worker節點上的一個進程,該進程負責運行某些task,而且負責將數據存在內存或者磁盤上。在Spark on Yarn模式下,其進程名稱爲 CoarseGrainedExecutor Backend,一個CoarseGrainedExecutor Backend進程有且僅有一個executor對象,它負責將Task包裝成taskRunner,並從線程池中抽取出一個空閒線程運行Task,這樣,每一個CoarseGrainedExecutorBackend能並行運行Task的數據就取決於分配給它的CPU的個數。

  (4)Worker:集羣中能夠運行Application代碼的節點。在Standalone模式中指的是經過slave文件配置的worker節點,在Spark on Yarn模式中指的就是NodeManager節點。

  (5)Task:在Executor進程中執行任務的工做單元,多個Task組成一個Stage

  (6)Job:包含多個Task組成的並行計算,是由Action行爲觸發的

  (7)Stage:每一個Job會被拆分不少組Task,做爲一個TaskSet,其名稱爲Stage

  (8)DAGScheduler:根據Job構建基於Stage的DAG,並提交Stage給TaskScheduler,其劃分Stage的依據是RDD之間的依賴關係

  (9)TaskScheduler:將TaskSet提交給Worker(集羣)運行,每一個Executor運行什麼Task就是在此處分配的。

  

二,Spark的運行流程

  2.1 運行流程

  

  再次結合上面的圖,對流程進行解析:

  (1)構建Spark Application的運行環境(啓動Driver),Driver向資源管理器(能夠是Standalone、Mesos或YARN)註冊並申請運行Executor資源;

  (2)資源管理器分配Executor資源並啓動StandaloneExecutorBackend,Executor運行狀況將隨着心跳發送到資源管理器上;

  (3)Driver構建成DAG圖,將DAG圖分解成Stage,並把Taskset發送給Task Scheduler。Executor向SparkContext申請Task

  (4)Task Scheduler將Task發放給Executor運行同時Driver將應用程序代碼發放給Executor。

  (5)Task在Executor上運行,運行完畢釋放全部資源。

  詳細圖解:

  

  2.2 spark運行特色:

  (1)每一個Application獲取專屬的executor進程,該進程在Application期間一直駐留,並以多線程方式運行tasks。這種Application隔離機制有其優點的,不管是從調度角度看(每一個Driver調度它本身的任務),仍是從運行角度看(來自不一樣Application的Task運行在不一樣的JVM中)。固然,這也意味着Spark Application不能跨應用程序共享數據,除非將數據寫入到外部存儲系統。

  (2)Spark與資源管理器無關,只要可以獲取executor進程,並能保持相互通訊就能夠了。

  (3)提交SparkContext的Client應該靠近Worker節點(運行Executor的節點),最好是在同一個Rack裏,由於Spark Application運行過程當中SparkContext和Executor之間有大量的信息交換;若是想在遠程集羣中運行,最好使用RPC將SparkContext提交給集羣,不要遠離Worker運行SparkContext。

  (4)Task採用了數據本地性和推測執行的優化機制。

  2.3 DAGScheduler

  Job=多個stage,Stage=多個同種task, Task分爲ShuffleMapTask和ResultTask,Dependency分爲ShuffleDependency和NarrowDependency

  面向stage的切分,切分依據爲寬依賴

  維護waiting jobs和active jobs,維護waiting stages、active stages和failed stages,以及與jobs的映射關係

  主要職能:

  一、接收提交Job的主入口,submitJob(rdd, ...)runJob(rdd, ...)。在SparkContext裏會調用這兩個方法。  

  生成一個Stage並提交,接着判斷Stage是否有父Stage未完成,如有,提交併等待父Stage,以此類推。結果是:DAGScheduler裏增長了一些waiting stage和一個runningstage。

  running stage提交後,分析stage裏Task的類型,生成一個Task描述,即TaskSet。

  調用TaskScheduler.submitTask(taskSet, ...)方法,把Task描述提交給TaskScheduler。TaskScheduler依據資源量和觸發分配條件,會爲這個TaskSet分配資源並觸發執行DAGScheduler提交job後,異步返回JobWaiter對象,可以返回job運行狀態,可以cancel job,執行成功後會處理並返回結果

  二、處理TaskCompletionEvent 

  若是task執行成功,對應的stage裏減去這個task,作一些計數工做: 

  若是task是ResultTask,計數器Accumulator加一,在job裏爲該task置true,job finish總數加一。加完後若是finish數目與partition數目相等,說明這個stage完成了,標記stage完成,從running stages裏減去這個stage,作一些stage移除的清理工做

  若是task是ShuffleMapTask,計數器Accumulator加一,在stage里加上一個output location,裏面是一個MapStatus類。MapStatusShuffleMapTask執行完成的返回,包含location信息和block size(能夠選擇壓縮或未壓縮)。同時檢查該stage完成,向MapOutputTracker註冊本stage裏的shuffleId和location信息。而後檢查stage的output location裏是否存在空,若存在空,說明一些task失敗了,整個stage從新提交;不然,繼續從waiting stages裏提交下一個須要作的stage

  若是task是重提交,對應的stage裏增長這個task

  若是task是fetch失敗,立刻標記對應的stage完成,從running stages裏減去。若是不容許retry,abort整個stage;不然,從新提交整個stage。另外,把這個fetch相關的location和map任務信息,從stage裏剔除,從MapOutputTracker註銷掉。最後,若是此次fetch的blockManagerId對象不爲空,作一次ExecutorLost處理,下次shuffle會換在另外一個executor上去執行。

  其餘task狀態會由TaskScheduler處理,如Exception, TaskResultLost, commitDenied等。

  三、其餘與job相關的操做還包括:cancel job, cancel stage, resubmit failed stage等

   2.4 TaskScheduler

  維護task和executor對應關係,executor和物理資源對應關係,在排隊的task和正在跑的task。

  內部維護一個任務隊列,根據FIFO或Fair策略,調度任務。

   TaskScheduler自己是個接口,spark裏只實現了一個TaskSchedulerImpl,理論上任務調度能夠定製。

三,Spark在不一樣集羣的運行架構

  Spark注重創建良好的生態系統,它不只支持多種外部文件存儲系統,提供了多種多樣的集羣運行模式。部署在單臺機器上時,既能夠用本地(Local)模式運行,也可使用僞分佈式模式來運行;當以分佈式集羣部署的時候,能夠根據本身集羣的實際狀況選擇Standalone模式(Spark自帶的模式)、YARN-Client模式或者YARN-Cluster模式。Spark的各類運行模式雖然在啓動方式、運行位置、調度策略上各有不一樣,但它們的目的基本都是一致的,就是在合適的位置安全可靠的根據用戶的配置和Job的須要運行和管理Task。

  3.1 Spark on Standalone運行流程

  Standalone模式是Spark實現的資源調度框架,其主要的節點有Client節點、Master節點和Worker節點。其中Driver既能夠運行在Master節點上中,也能夠運行在本地Client端。當用spark-shell交互式工具提交Spark的Job時,Driver在Master節點上運行;當使用spark-submit工具提交Job或者在Eclips、IDEA等開發平臺上使用」new SparkConf().setMaster(「spark://master:7077」)」方式運行Spark任務時,Driver是運行在本地Client端上的。

  運行過程文字說明:

  

1、咱們提交一個任務,任務就叫Application
2、初始化程序的入口SparkContext, 
  2.1 初始化DAG Scheduler
  2.2 初始化Task Scheduler
3、Task Scheduler向master去進行註冊並申請資源(CPU Core和Memory)
4、Master根據SparkContext的資源申請要求和Worker心跳週期內報告的信息決定在哪一個Worker上分配資源,而後在該Worker上獲取資源,而後啓動StandaloneExecutorBackend;順便初
      始化好了一個線程池
5、StandaloneExecutorBackend向Driver(SparkContext)註冊,這樣Driver就知道哪些Executor爲他進行服務了。
   到這個時候其實咱們的初始化過程基本完成了,咱們開始執行transformation的代碼,可是代碼並不會真正的運行,直到咱們遇到一個action操做。生產一個job任務,進行stage的劃分
6、SparkContext將Applicaiton代碼發送給StandaloneExecutorBackend;而且SparkContext解析Applicaiton代碼,構建DAG圖,並提交給DAG Scheduler分解成Stage
(當碰到Action操做 時,就會催生Job;每一個Job中含有1個或多個Stage,Stage通常在獲取外部數據和shuffle以前產生)。
7、將Stage(或者稱爲TaskSet)提交給Task Scheduler。Task Scheduler負責將Task分配到相應的Worker,最後提交給StandaloneExecutorBackend執行; 8、對task進行序列化,並根據task的分配算法,分配task 9、對接收過來的task進行反序列化,把task封裝成一個線程 10、開始執行Task,並向SparkContext報告,直至Task完成。 十一、資源註銷

  運行過程圖形說明:

  3.2 Spark on YARN運行過程

  YARN是一種統一資源管理機制,在其上面能夠運行多套計算框架。目前的大數據技術世界,大多數公司除了使用Spark來進行數據計算,因爲歷史緣由或者單方面業務處理的性能考慮而使用着其餘的計算框架,好比MapReduce、Storm等計算框架。Spark基於此種狀況開發了Spark on YARN的運行模式,因爲藉助了YARN良好的彈性資源管理機制,不只部署Application更加方便,並且用戶在YARN集羣中運行的服務和Application的資源也徹底隔離,更具實踐應用價值的是YARN能夠經過隊列的方式,管理同時運行在集羣中的多個服務。

 

Spark on YARN模式根據Driver在集羣中的位置分爲兩種模式:一種是YARN-Client模式,另外一種是YARN-Cluster(或稱爲YARN-Standalone模式)。

  任何框架與YARN的結合,都必須遵循YARN的開發模式。在分析Spark on YARN的實現細節以前,有必要先分析一下YARN框架的一些基本原理。

  參考:http://www.cnblogs.com/qingyunzong/p/8615096.html

  Yarn-Client模式中,Driver在客戶端本地運行,這種模式可使得Spark Application和客戶端進行交互,由於Driver在客戶端,因此能夠經過webUI訪問Driver的狀態,默認是http://hadoop1:4040訪問,而YARN經過http:// hadoop1:8088訪問。

YARN-client的工做流程分爲如下幾個步驟:

  

1.Spark Yarn Client向YARN的ResourceManager申請啓動Application Master。同時在SparkContent初始化中將建立DAGScheduler和TASKScheduler等,
因爲咱們選擇的是Yarn-Client模式,程序會選擇YarnClientClusterScheduler和YarnClientSchedulerBackend; 2.ResourceManager收到請求後,在集羣中選擇一個NodeManager,爲該應用程序分配第一個Container,要求它在這個Container中啓動應用程序的ApplicationMaster,
與YARN-Cluster區別的是在該ApplicationMaster不運行SparkContext,只與SparkContext進行聯繫進行資源的分派; 3.Client中的SparkContext初始化完畢後,與ApplicationMaster創建通信,向ResourceManager註冊,根據任務信息向ResourceManager申請資源(Container); 4.一旦ApplicationMaster申請到資源(也就是Container)後,便與對應的NodeManager通訊,要求它在得到的Container中啓動啓動CoarseGrainedExecutorBackend,
CoarseGrainedExecutorBackend啓動後會向Client中的SparkContext註冊並申請Task;
5.Client中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向Driver彙報運行的狀態和進度,
以讓Client隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務;
6.應用程序運行完成後,Client的SparkContext向ResourceManager申請註銷並關閉本身。

  圖例:

  

  

  在YARN-Cluster模式中,當用戶向YARN中提交一個應用程序後,YARN將分兩個階段運行該應用程序:第一個階段是把Spark的Driver做爲一個ApplicationMaster在YARN集羣中先啓動;第二個階段是由ApplicationMaster建立應用程序,而後爲它向ResourceManager申請資源,並啓動Executor來運行Task,同時監控它的整個運行過程,直到運行完成。

YARN-cluster的工做流程分爲如下幾個步驟:

  

1.   Spark Yarn Client向YARN中提交應用程序,包括ApplicationMaster程序、啓動ApplicationMaster的命令、須要在Executor中運行的程序等;

2.   ResourceManager收到請求後,在集羣中選擇一個NodeManager,爲該應用程序分配第一個Container,要求它在這個Container中啓動應用程序的ApplicationMaster,
其中ApplicationMaster進行SparkContext等的初始化;
3. ApplicationMaster向ResourceManager註冊,這樣用戶能夠直接經過ResourceManage查看應用程序的運行狀態,而後它將採用輪詢的方式經過RPC協議爲各個任務申請資源,
並監控它們的運行狀態直到運行結束;
4. 一旦ApplicationMaster申請到資源(也就是Container)後,便與對應的NodeManager通訊,要求它在得到的Container中啓動啓動CoarseGrainedExecutorBackend,
CoarseGrainedExecutorBackend啓動後會向ApplicationMaster中的SparkContext註冊並申請Task。這一點和Standalone模式同樣,只不過SparkContext在
Spark Application中初始化時,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler進行任務的調度,其中YarnClusterScheduler只是對TaskSchedulerImpl
的一個簡單包裝,增長了對Executor的等待邏輯等;
5. ApplicationMaster中的SparkContext分配Task給CoarseGrainedExecutorBackend執行,CoarseGrainedExecutorBackend運行Task並向ApplicationMaster
彙報運行的狀態和進度,以讓ApplicationMaster隨時掌握各個任務的運行狀態,從而能夠在任務失敗時從新啓動任務;
6. 應用程序運行完成後,ApplicationMaster向ResourceManager申請註銷並關閉本身。

  圖例:

  YARN-Client 與 YARN-Cluster 區別

  理解YARN-Client和YARN-Cluster深層次的區別以前先清楚一個概念:Application Master。在YARN中,每一個Application實例都有一個ApplicationMaster進程,它是Application啓動的第一個容器。它負責和ResourceManager打交道並請求資源,獲取資源以後告訴NodeManager爲其啓動Container。從深層次的含義講YARN-Cluster和YARN-Client模式的區別其實就是ApplicationMaster進程的區別。

一、YARN-Cluster模式下,Driver運行在AM(Application Master)中,它負責向YARN申請資源,並監督做業的運行情況。當用戶提交了做業以後,就能夠關掉Client,
做業會繼續在YARN上運行,於是YARN-Cluster模式不適合運行交互類型的做業; 二、YARN-Client模式下,Application Master僅僅向YARN請求Executor,Client會和請求的Container通訊來調度他們工做,也就是說Client不能離開。

  

  

相關文章
相關標籤/搜索