Apache Spark源碼分析-- Job的提交與運行

本文以wordCount爲例,詳細說明spark建立和運行job的過程,重點是在進程及線程的建立。java

實驗環境搭建 node

在進行後續操做前,確保下列條件已知足。web

1. 下載spark binary 0.9.1shell

2. 安裝scalaapache

3. 安裝sbt瀏覽器

4. 安裝java微信

啓動spark-shell單機模式運行,即local模式 jvm

local模式運行很是簡單,只要運行如下命令便可,假設當前目錄是$SPARK_HOME函數

MASTER=local bin/spark-shell 學習

"MASTER=local"就是代表當前運行在單機模式

local cluster方式運行

localcluster模式是一種僞cluster模式,在單機環境下模擬standalone的集羣,啓動順序分別以下

1. 啓動master

2. 啓動worker

3. 啓動spark-shell

master$SPARK_HOME/sbin/start-master.sh

注意運行時的輸出,日誌默認保存在$SPARK_HOME/logs目錄。

master主要是運行類 org.apache.spark.deploy.master.Master在8080端口啓動監聽,日誌以下圖所示

修改配置

1. 進入$SPARK_HOME/conf目錄

2. 將spark-env.sh.template重命名爲spark-env.sh

3. 修改spark-env.sh,添加以下內容

export SPARK_MASTER_IP=localhostexport SPARK_LOCAL_IP=localhost運行workerbin/spark-class org.apache.spark.deploy.worker.Worker spark://localhost:7077 -i 127.0.0.1  -c 1 -m 512M

worker啓動完成,鏈接到master。打開maser的webui能夠看到鏈接上來的worker. Master WEb UI的監聽地址是http://localhost:8080

啓動spark-shellMASTER=spark://localhost:7077 bin/spark-shell

若是一切順利,將看到下面的提示信息。

Created spark context..Spark context available as sc.

能夠用瀏覽器打開localhost:4040來查看以下內容

1. stages

2. storage

3. environment

4. executors

wordcount

上述環境準備穩當以後,咱們在sparkshell中運行一下最簡單的例子,在spark-shell中輸入以下代碼

scala>sc.textFile("README.md").filter(_.contains("Spark")).count

上述代碼統計在README.md中含有Spark的行數有多少

部署過程詳解

Spark佈置環境中組件構成以下圖所示。

 



  • Driver Program 簡要來講在spark-shell中輸入的wordcount語句對應於上圖的Driver Program.

  • Cluster Manager 就是對應於上面提到的master,主要起到deploy management的做用

  • Worker Node 與Master相比,這是slave node。上面運行各個executor,executor能夠對應於線程。executor處理兩種基本的業務邏輯,一種就是driver     programme,另外一種就是job在提交以後拆分紅各個stage,每一個stage能夠運行一到多個task

Notes: 在集羣(cluster)方式下, Cluster Manager運行在一個jvm進程之中,而worker運行在另外一個jvm進程中。在local cluster中,這些jvm進程都在同一臺機器中,若是是真正的standalone或Mesos及Yarn集羣,worker與master或分佈於不一樣的主機之上。

JOB的生成和運行

job生成的簡單流程以下

1. 首先應用程序建立SparkContext的實例,如實例爲sc

2. 利用SparkContext的實例來建立生成RDD

3. 通過一連串的transformation操做,原始的RDD轉換成爲其它類型的RDD

4. 當action做用於轉換以後RDD時,會調用SparkContext的runJob方法

5. sc.runJob的調用是後面一連串反應的起點,關鍵性的躍變就發生在此處

調用路徑大體以下

1. sc.runJob->dagScheduler.runJob->submitJob

2. DAGScheduler::submitJob會建立JobSummitted的event發送給內嵌類eventProcessActor

3. eventProcessActor在接收到JobSubmmitted以後調用processEvent處理函數

4. job到stage的轉換,生成finalStage並提交運行,關鍵是調用submitStage

5. 在submitStage中會計算stage之間的依賴關係,依賴關係分爲寬依賴窄依賴兩種

6. 若是計算中發現當前的stage沒有任何依賴或者全部的依賴都已經準備完畢,則提交task

7. 提交task是調用函數submitMissingTasks來完成

8. task真正運行在哪一個worker上面是由TaskScheduler來管理,也就是上面的submitMissingTasks會調用TaskScheduler::submitTasks

9. TaskSchedulerImpl中會根據Spark的當前運行模式來建立相應的backend,若是是在單機運行則建立LocalBackend

10. LocalBackend收到TaskSchedulerImpl傳遞進來的ReceiveOffers事件

11. receiveOffers->executor.launchTask->TaskRunner.run

代碼片斷executor.lauchTask

def launchTask(context: ExecutorBackend, taskId: Long, serializedTask: ByteBuffer) {    val tr = new TaskRunner(context, taskId, serializedTask)    runningTasks.put(taskId, tr)    threadPool.execute(tr)  }

說了這麼一大通,也就是講最終的邏輯處理切切實實是發生在TaskRunner這麼一個executor以內。

運算結果是包裝成爲MapStatus而後經過一系列的內部消息傳遞,反饋到DAGScheduler,這一個消息傳遞路徑不是過於複雜,有興趣能夠自行勾勒。

更多精彩內容請關注:http://bbs.superwu.cn

關注超人學院微信二維碼:

關注超人學院java免費學習交流羣:

相關文章
相關標籤/搜索