前面已經分析到了 driver 進程成功在某臺 worker 上啓動了,下面就開始執行咱們寫的那些代碼了。以一個 wordcount 程序爲例,代碼以下:java
val conf = new SparkConf()
.setAppName("WordCount")
.setMaster("local")
val sc = new SparkContext(conf)
val lines = sc.textFile("./file/localfile")
val words = lines.flatMap(line => line.split(" "))
val wordPairs = words.map(word => (word, 1))
val wordCounts = wordPairs.reduceByKey(_ + _)
wordCounts.foreach(wordCount => println(wordCount._1 + " " + wordCount._2))
複製代碼
首先會去初始化咱們的 SparkContext 對象,在初始化 SparkContext 對象前會先建立一個 SparkConf 對象用來配置各類參數。SparkContext 對象的初始化代碼在 org.apache.spark.SparkContext 的 374-594 行(spark 2.1.1 的源碼中),這塊代碼大概作了這些事情:算法
1,建立 Spark 的執行環境 SparkEnv;apache
2,建立並初始化 SparkUI;後端
3,Hadoop 相關配置和 Executor 環境變量的設置;緩存
4,建立心跳接收器,用來和 Executor 作通訊;ide
5,建立和啓動 TaskScheduler;函數
6,建立和啓動 DAGScheduler;oop
7,初始化 BlockManager;ui
8,啓動測量系統 MetricsSystem;this
9,建立和啓動 ExecutorAllocationManager;
10,建立和啓動 ContextCleaner;
11,Spark Environment Update;
12,向系統的測量系統註冊 DAGSchedulerSource,BlockManagerSource,executorAllocationManagerSource;
13,標記當前 SparkContext 爲激活狀態(這個代碼在 SparkContext 類的最後,2237 行);
以上就是 SparkContext 初始化過程,咱們最主要的是分析 TaskScheduler 和 DAGScheduler 的建立和啓動過程,這篇主要來看 TaskScheduler。
建立和啓動 TaskScheduler 的代碼從 501 行開始,代碼以下:
// 建立 TaskScheduler,DAGScheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// 開啓 taskScheduler
_taskScheduler.start()
複製代碼
上面代碼能夠看出經過調用 SparkContext.createTaskScheduler 方法去建立 TaskScheduler。
createTaskScheduler 方法會根據傳入的 master 參數進行模式匹配,咱們使用的是 spark 的 standalone cluster 模式,對應的 master 確定是這種的:「spark://xxxx」。
因此會匹配到 SPARK_REGEX(sparkUrl)
這裏,這裏的 SPARK_REGEX 的匹配規則是 val SPARK_REGEX = """spark://(.*)""".r
。
匹配成功後,會去建立 TaskSchedulerImpl 對象、StandaloneSchedulerBackend 對象,而後而後將 StandaloneSchedulerBackend 對象帶入到 TaskSchedulerImpl 的 initialize 方法中進行初始化操做。最後返回 StandaloneSchedulerBackend 和 TaskSchedulerImpl 對象。
這裏的 TaskSchedulerImpl 是 TaskScheduler trait 的實現類,做用是:從 DAGScheduler 接收不一樣的 stage 任務,並向集羣提交這些任務。
以上分析對應的代碼以下:
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
複製代碼
下面主要看 scheduler.initialize(backend) 這裏,執行 TaskSchedulerImpl 的 initialize 方法。這個方法要傳入一個 SchedulerBackend 對象。
initialize 的過程當中首先會將 StandaloneSchedulerBackend 對象持有到該 TaskSchedulerImpl 對象裏。
而後去建立調度池 rootPool,這裏面緩存了調度隊列等相關信息。
而後再去根據 schedulingMode 去進行模式匹配使用哪一種調度算法,schedulingMode 初始化時候默認值是 FIFO,因此這裏默認的調度算法會匹配到 FIFO 模式。
模式匹配成功後建立 FIFOSchedulableBuilder,用來操做 rootPool 中的調度隊列。
def initialize(backend: SchedulerBackend) {
this.backend = backend
// 建立調度池
rootPool = new Pool("", schedulingMode, 0, 0)
// 模式匹配調度算法
// schedulingMode 有個默認值爲 FIFO,具體能夠點到對應的代碼看
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
}
}
schedulableBuilder.buildPools()
}
複製代碼
初始化完成後就會返回 (backend, scheduler)
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
複製代碼
而後爲 SparkContext 對象中對應的 _schedulerBackend 和 _taskScheduler 對象賦值,這裏的 _schedulerBackend 是 StandaloneSchedulerBackend,_taskScheduler 是 TaskSchedulerImpl。
_schedulerBackend = sched
_taskScheduler = ts
複製代碼
接下來就是建立 DAGScheduler。
DAGScheduler 主要做用是:分析用戶提交的應用,根據 RDD 之間的依賴關係建立 DAG 圖,而後根據 DAG 圖劃分紅多個 stage,爲每一個 stage 分一組 task 去處理一批數據。而後將這些 task 交給 TaskScheduler,TaskScheduler 會經過 ClusterManager 在集羣中找到符合要求的 Worker 上的 Executor 去啓動這些 task。
具體源碼能夠看後面的文章。
_dagScheduler = new DAGScheduler(this)
複製代碼
在 new DAGScheduler(this)
這個構造函數裏 this 就是當前的 SparkContext 對象,後續代碼裏會經過 SparkContext.taskScheduler
方法拿到裏面的 _taskScheduler 對象,而後會調用 TaskScheduler.setDAGScheduler
方法設置好 DAGScheduler 的引用。這裏設置好以後就會開啓 TaskScheduler。
_taskScheduler.start()
複製代碼
_taskScheduler.start()
方法中回去調用 backend.start
方法,在這裏就是 StandaloneSchedulerBackend
中的 start 方法。
override def start() {
// 這裏會去調用 StandaloneSchedulerBackend 的 start 方法
backend.start()
if (!isLocal && conf.getBoolean("spark.speculation", false)) {
logInfo("Starting speculative execution thread")
speculationScheduler.scheduleAtFixedRate(new Runnable {
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
checkSpeculatableTasks()
}
}, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
}
}
複製代碼
StandaloneSchedulerBackend 是一個 SchedulerBackend trait 的實現類,是 TaskScheduler 調度後端接口,不一樣的集羣部署模式會有不一樣的實現。
TaskScheduler 給 task 分配資源的時候其實是經過 SchedulerBackend 去完成的。StandaloneSchedulerBackend 是用於 standalone cluster 模式下的 SchedulerBackend。做用於 driver 內,用於和 Executor 通訊,Task 的資源分配。
StandaloneSchedulerBackend 的 start 方法會去調用其父類的 start 方法,也就是 CoarseGrainedSchedulerBackend 的 start 方法。
這個方法內會去建立一個 DriverEndPointRef。
override def start() {
val properties = new ArrayBuffer[(String, String)]
for ((key, value) <- scheduler.sc.conf.getAll) {
if (key.startsWith("spark.")) {
properties += ((key, value))
}
}
// 根據配置參數屬性去建立 DriverEndPointRef
driverEndpoint = createDriverEndpointRef(properties)
}
複製代碼
createDriverEndpointRef 這個方法內部會先想 rpcEnv 上註冊一個建立好的 DriverEndpoint。
protected def createDriverEndpointRef( properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
// 向 rpcEnv 註冊 createDriverEndpoint 這個方法返回的 DriverEndpoint
rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
}
複製代碼
createDriverEndpoint 這個方法是用來建立 DriverEndpoint 的,這個 DriverEndpoint 就是用來 提交 task 到 Executor,並接收 Executor 的返回結果的。
protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
// 建立 DriverEndpoint
new DriverEndpoint(rpcEnv, properties)
}
複製代碼
至此,TaskScheduler 建立和啓動已經完成。