Spark源碼分析:多種部署方式之間的區別與聯繫(1)

 從官方的文檔咱們能夠知道,Spark的部署方式有不少種:local、Standalone、Mesos、YARN.....不一樣部署方式的後臺處理進程是不同的,可是若是咱們從代碼的角度來看,其實流程都差很少。
  從代碼中,咱們能夠得知其實Spark的部署方式其實比官方文檔中介紹的還要多,這裏我來列舉一下: 微信

  一、local:這種方式是在本地啓動一個線程來運行做業;
  二、local[N]:也是本地模式,可是啓動了N個線程;
  三、local[*]:仍是本地模式,可是用了系統中全部的核;
  四、local[N,M]:這裏有兩個參數,第一個表明的是用到的核個數;第二個參數表明的是允許該做業失敗M次。上面的幾種模式沒有指定M參數,其默認值都是1;
  五、local-cluster[N, cores, memory]:本地僞集羣模式,參數的含義我就不說了,看名字就知道;式;
  六、spark:// :這是用到了Spark的Standalone模
  七、(mesos|zk)://:這是Mesos模式;
  八、yarn-standalone\yarn-cluster\yarn-client:這是YARN模式。前面兩種表明的是集羣模式;後面表明的是客戶端模式;
  九、simr://:這種你就不知道了吧?simr實際上是Spark In MapReduce的縮寫。咱們知道MapReduce 1中是沒有YARN的,若是你在MapReduce 1中使用Spark,那麼就用這種模式吧。

  整體來講,上面列出的各類部署方式運行的流程大體同樣:都是從SparkContext切入,在SparkContext的初始化過程當中主要作了如下幾件事:
  一、根據SparkConf建立SparkEnv oop

01 // Create the Spark execution environment (cache, map output tracker, etc)
02   private[spark] val env = SparkEnv.create(
03     conf,
04     "<driver>",
05     conf.get("spark.driver.host"),
06     conf.get("spark.driver.port").toInt,
07     isDriver = true,
08     isLocal = isLocal,
09     listenerBus = listenerBus)
10   SparkEnv.set(env)

  二、初始化executor的環境變量executorEnvs
  這個步驟代碼太多了,我就不貼出來。
  三、建立TaskScheduler post

1 // Create and start the scheduler
2   private[spark] var taskScheduler = SparkContext.createTaskScheduler(this, master)

  四、建立DAGScheduler this

1 @volatile private[spark] var dagScheduler: DAGScheduler = _
2   try {
3     dagScheduler = new DAGScheduler(this)
4   } catch {
5     case e: Exception => throw
6       newSparkException("DAGScheduler
7                      cannot be initialized due to %s".format(e.getMessage))
8   }

  五、啓動TaskScheduler spa

1 // start TaskScheduler after taskScheduler
2 // sets DAGScheduler reference in DAGScheduler's
3   // constructor
4   taskScheduler.start()

  那麼,DAGScheduler和TaskScheduler都是什麼?
  DAGScheduler稱爲做業調度,它基於Stage的高層調度模塊的實現,它爲每一個Job的Stages計算DAG,記錄哪些RDD和Stage的輸出已經實物化,而後找到最小的調度方式來運行這個Job。而後以Task Sets的形式提交給底層的任務調度模塊來具體執行。
  TaskScheduler稱爲任務調度。它是低層次的task調度接口,目前僅僅被TaskSchedulerImpl實現。這個接口能夠以插件的形式應用在不一樣的task調度器中。每一個TaskScheduler只給一個SparkContext調度task,這些調度器接受來自DAGScheduler中的每一個stage提交的tasks,並負責將這些tasks提交給cluster運行。若是提交失敗了,它將會重試;並處理stragglers。全部的事件都返回到DAGScheduler中。
  在建立DAGScheduler的時候,程序已經將taskScheduler做爲參數傳進去了,代碼以下: 插件

01 def this(sc: SparkContext, taskScheduler: TaskScheduler) = {
02     this(
03       sc,
04       taskScheduler,
05       sc.listenerBus,
06       sc.env.mapOutputTracker.asInstanceOf[MapOutputTrackerMaster],
07       sc.env.blockManager.master,
08       sc.env)
09   }
10  
11   def this(sc: SparkContext) = this(sc, sc.taskScheduler)

也就是DAGScheduler封裝了TaskScheduler。TaskScheduler中有兩個比較重要的方法: 線程

1 // Submit a sequence of tasks to run.
2 def submitTasks(taskSet: TaskSet): Unit
3  
4 // Cancel a stage.
5 def cancelTasks(stageId: Int, interruptThread: Boolean)

  這些方法在DAGScheduler中被調用,而TaskSchedulerImpl實現了TaskScheduler,爲各類調度模式提供了任務調度接口,在TaskSchedulerImpl中還實現了resourceOffers和statusUpdate兩個接口給Backend調用,用於提供調度資源和更新任務狀態。
  在YARN模式中,還提供了YarnClusterScheduler類,他只是簡單地繼承TaskSchedulerImpl類,主要重寫了getRackForHost(hostPort: String)和postStartHook() 方法。繼承圖以下: orm


若是想及時瞭解Spark、Hadoop或者Hbase相關的文章,歡迎關注微信公共賬號: iteblog_hadoop


在下篇文章中,我將介紹上面九種部署模式涉及到的各類類及其之間的關係。歡迎關注本博客!這裏先列出下篇文章用到的類圖
相關文章
相關標籤/搜索