從Spark應用的提交到執行完成有不少步驟,爲了便於理解,咱們把應用執行的整個過程劃分爲三個階段。而咱們知道Spark有多種運行模式,不一樣模式下這三個階段的執行流程也不相同。sql
本節介紹這三個階段的劃分,並概要介紹不一樣模式下各個階段的執行流程,各個模式的詳細流程會在後面的章節進行分析。apache
咱們知道,Spark應用能夠在多種模式下運行。所謂多種模式主要是針對資源分配方式來講的,Spark應用能夠在yarn,k8s,mesos等分佈式資源管理平臺上運行,也能夠啓動自帶的master和worker端來分配和管理資源(standalone模式)。例如:咱們能夠經過如下命令來向yarn提交一個spark任務:app
$SPARK_HOME/bin/spark-submit \
--class org.apache.spark.examples.SparkPi \
--master yarn \
--deploy-mode client \
$SPARK_HOME/examples/jars/spark-examples*.jar
代碼1 spark應用提交命令dom
要注意的是,在執行以上應用提交命令時yarn資源管理集羣必須已經啓動。另外,Spark應用的執行是經過Driver端和Executor端共同配合完成的。分佈式
要完成以上應用的執行,須要經歷不少步驟,爲了便於更好的理解Spark應用從提交到執行完成的整個過程,咱們把整個過程劃分紅三個階段:ide
應用的提交函數
執行環境的準備學習
任務的調度和執行ui
如圖1所示:spa
圖1 應用執行整體流程
無論以哪一種模式運行,Spark應用的執行過程均可以劃分紅這三個階段。下面對這三個階段分別進行說明。
這三個階段以及每一個階段要完成的目標如圖2所示。
圖2 Spark應用執行的3階段目標概述
咱們根據如下代碼爲例,來說解Spark應用執行的各個階段。
# HelloWorld.scala
import scala.math.random
import org.apache.spark.sql.SparkSession
object HelloWorld {
def main(args: Array[String]) {
val spark =SparkSession.builder.appName("HelloWorld").getOrCreate()
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
}
}
這個階段在Driver端完成,主要目標是:準備依賴包並肯定Spark應用的執行主類。具體的任務包括:
解析任務提交的參數,並對參數進行解析和保存。
準備(可能會下載)任務啓動參數指定的依賴文件或程序包。
根據Spark應用的執行模式和應用的編寫語言,來肯定執行的主類名稱。
實例化執行主類,生成SparkApplication對象,並調用SparkApplication#start()函數來運行Spark應用(如果Java或scala代碼實際上是:執行Spark應用中的main函數)。
注意:第1階段完成時,Driver端並無向資源管理平臺申請任何資源,也沒有啓動任何Spark內部的服務。
經過第1階段,已經找到了運行在Driver端的Spark應用的執行主類,並建立了SparkApplication對象:app。此時,在app.start()函數中會直接調用主類的main函數開始執行應用,從而進入第2階段。
第2階段主要目標是:建立SparkSession(包括SparkContext和SparkEnv),完成資源的申請和Executor的建立。第2階段完成後Task的執行環境就準備好了。
也就是說,第2階段不只會在Driver端進行初始化,並且還要準備好Executor。這一階段的任務主要是在Driver端執行建立SparkSession的代碼來完成,也就是執行下面一行代碼:
val spark =SparkSession.builder.appName("HelloWorld").getOrCreate()
第2階段的Driver端主要完成如下步驟:
建立SparkContext和SparkEnv對象,在建立這兩個對象時,向Cluster Manager申請資源,啓動各個服務模塊,並對服務模塊進行初始化。
這些服務模塊包括:DAG調度服務,任務調度服務,shuffle服務,文件傳輸服務,數據塊管理服務,內存管理服務等。
第2階段的Executor端主要完成如下步驟:
Driver端向Cluster Manager申請資源,如果Yarn模式會在NodeManager上建立ApplicationMaster,並由ApplicationMaster向Cluster Manager來申請資源,並啓動Container,在Container中啓動Executor。
在啓動Executor時向Driver端註冊BlockManager服務,並建立心跳服務RPC環境,經過該RPC環境向Driver彙報Executor的狀態信息。
詳細的執行步驟,會在後面介紹每種模式的運行原理時,詳細分析。第2階段執行完成後的Spark集羣狀態以下圖:
經過第2階段已經完成了Task執行環境的初始化,此時,在Driver端已經完成了SparkContext和SparkEnv的建立,資源已經申請到了,而且已經啓動了Executor。
這一階段會執行接下來的數據處理的代碼:
val rdd = spark.sparkContext.parallelize(Seq("Hello", "World"))
rdd.collect()
第3階段Driver端主要完成如下步驟:
執行Spark的處理代碼,當執行map操做時,生成新的RDD;
當執行Action操做時,觸發Job的提交,此時會執行如下步驟:
根據RDD的血緣,把Job劃分紅相互依賴的Stage;
把每一個Stage拆分紅一個或多個Task;
把這些Task提交給已經建立好的Executor去執行;
獲取Executor的執行狀態信息,直到Executor完成全部Task的執行;
獲取執行結果和最終的執行狀態。
本節介紹了Spark應用的執行過程,經過本節的學習應該對Spark應用的執行過程有一個整體的理解。接下來會根據具體的運行模式來詳細分析每一個階段的執行步驟。