如何理解Spark應用執行的階段

從Spark應用的提交到執行完成有不少步驟,爲了便於理解,咱們把應用執行的整個過程劃分爲三個階段。而咱們知道Spark有多種運行模式,不一樣模式下這三個階段的執行流程也不相同。sql

本節介紹這三個階段的劃分,並概要介紹不一樣模式下各個階段的執行流程,各個模式的詳細流程會在後面的章節進行分析。apache

應用執行的階段劃分

咱們知道,Spark應用能夠在多種模式下運行。所謂多種模式主要是針對資源分配方式來講的,Spark應用能夠在yarn,k8s,mesos等分佈式資源管理平臺上運行,也能夠啓動自帶的master和worker端來分配和管理資源(standalone模式)。例如:咱們能夠經過如下命令來向yarn提交一個spark任務:app

 $SPARK_HOME/bin/spark-submit \
 --class org.apache.spark.examples.SparkPi \
 --master yarn \
 --deploy-mode client \
 $SPARK_HOME/examples/jars/spark-examples*.jar

                                    代碼1 spark應用提交命令dom

要注意的是,在執行以上應用提交命令時yarn資源管理集羣必須已經啓動。另外,Spark應用的執行是經過Driver端和Executor端共同配合完成的。分佈式

要完成以上應用的執行,須要經歷不少步驟,爲了便於更好的理解Spark應用從提交到執行完成的整個過程,咱們把整個過程劃分紅三個階段:ide

  • 應用的提交函數

  • 執行環境的準備學習

  • 任務的調度和執行ui

如圖1所示:spa

image.png

                                        圖1 應用執行整體流程

無論以哪一種模式運行,Spark應用的執行過程均可以劃分紅這三個階段。下面對這三個階段分別進行說明。

三個階段概要說明

這三個階段以及每一個階段要完成的目標如圖2所示。

image.png

                             圖2 Spark應用執行的3階段目標概述

咱們根據如下代碼爲例,來說解Spark應用執行的各個階段。

 # HelloWorld.scala
 import scala.math.random
 import org.apache.spark.sql.SparkSession
 
 object HelloWorld {
   def main(argsArray[String]) {
 val spark =SparkSession.builder.appName("HelloWorld").getOrCreate()
 val rdd = spark.sparkContext.parallelize(Seq("Hello""World"))
 rdd.collect()
 }
 }

第1階段:應用的提交

這個階段在Driver端完成,主要目標是:準備依賴包並肯定Spark應用的執行主類。具體的任務包括:

  • 解析任務提交的參數,並對參數進行解析和保存。

  • 準備(可能會下載)任務啓動參數指定的依賴文件或程序包。

  • 根據Spark應用的執行模式和應用的編寫語言,來肯定執行的主類名稱。

  • 實例化執行主類,生成SparkApplication對象,並調用SparkApplication#start()函數來運行Spark應用(如果Java或scala代碼實際上是:執行Spark應用中的main函數)。

注意:第1階段完成時,Driver端並無向資源管理平臺申請任何資源,也沒有啓動任何Spark內部的服務。

第2階段:執行環境的準備

經過第1階段,已經找到了運行在Driver端的Spark應用的執行主類,並建立了SparkApplication對象:app。此時,在app.start()函數中會直接調用主類的main函數開始執行應用,從而進入第2階段。

第2階段主要目標是:建立SparkSession(包括SparkContext和SparkEnv),完成資源的申請和Executor的建立。第2階段完成後Task的執行環境就準備好了。

也就是說,第2階段不只會在Driver端進行初始化,並且還要準備好Executor。這一階段的任務主要是在Driver端執行建立SparkSession的代碼來完成,也就是執行下面一行代碼:

 val spark =SparkSession.builder.appName("HelloWorld").getOrCreate()

第2階段的Driver端主要完成如下步驟:

  • 建立SparkContext和SparkEnv對象,在建立這兩個對象時,向Cluster Manager申請資源,啓動各個服務模塊,並對服務模塊進行初始化。

  • 這些服務模塊包括:DAG調度服務,任務調度服務,shuffle服務,文件傳輸服務,數據塊管理服務,內存管理服務等。

第2階段的Executor端主要完成如下步驟:

  • Driver端向Cluster Manager申請資源,如果Yarn模式會在NodeManager上建立ApplicationMaster,並由ApplicationMaster向Cluster Manager來申請資源,並啓動Container,在Container中啓動Executor。

  • 在啓動Executor時向Driver端註冊BlockManager服務,並建立心跳服務RPC環境,經過該RPC環境向Driver彙報Executor的狀態信息。

詳細的執行步驟,會在後面介紹每種模式的運行原理時,詳細分析。第2階段執行完成後的Spark集羣狀態以下圖:

image.png

第3階段:任務的調度和執行

經過第2階段已經完成了Task執行環境的初始化,此時,在Driver端已經完成了SparkContext和SparkEnv的建立,資源已經申請到了,而且已經啓動了Executor。

這一階段會執行接下來的數據處理的代碼:

 val rdd = spark.sparkContext.parallelize(Seq("Hello""World"))
 rdd.collect()

第3階段Driver端主要完成如下步驟:

  • 執行Spark的處理代碼,當執行map操做時,生成新的RDD;

  • 當執行Action操做時,觸發Job的提交,此時會執行如下步驟:

    • 根據RDD的血緣,把Job劃分紅相互依賴的Stage;

    • 把每一個Stage拆分紅一個或多個Task;

    • 把這些Task提交給已經建立好的Executor去執行;

    • 獲取Executor的執行狀態信息,直到Executor完成全部Task的執行;

    • 獲取執行結果和最終的執行狀態。

小結

本節介紹了Spark應用的執行過程,經過本節的學習應該對Spark應用的執行過程有一個整體的理解。接下來會根據具體的運行模式來詳細分析每一個階段的執行步驟。

相關文章
相關標籤/搜索