Spark做業運行架構原理解析

[TOC]html


1 說明

根據以前old li(百度高級大數據工程師)給的一張草圖從新整理,並用processon繪圖一下,這樣就更加清晰了。須要注意的是,這裏是基於Spark 2.x如下的版本,由於在以前,底層通訊是基於AKKA ACTOR的方式,可是以後就是使用RPC的方式了。(最近原來是想把spark 2.x的源碼好好閱讀一下,可是公司已有的系統都是基於spark 1.x的,而且最近才更新到spark 1.6.3,因此也不折騰,就把spark 1.x的好好搞透,也不影響後面進一步的深刻學習與解理,由於這些都是舉一反三的。)apache

另外,這裏的原理圖是spark standalone模式,關於其它模式(如spark on yarn),後面則會再整理一下。架構

2 運行架構原理圖與解析

原理圖以下:分佈式

Spark做業運行架構原理解析

說明以下:ide

  • 1.啓動Spark集羣,其實就是經過運行spark-all.sh腳原本啓動master節點和worker節點,啓動了一個個對應的master進程和worker進程;
  • 2.worker啓動以後,向master進程發送註冊信息(該過程基於AKKA Actor事件驅動模型);
  • 3.workermaster註冊成功以後,會不斷向master發送心跳包,監聽master節點是否存活(該過程基於AKKA Actor事件驅動模型);
  • 4.driverSpark集羣提交做業,經過spark-submit.sh腳本,向master節點申請資源(該過程基於AKKA Actor事件驅動模型);
  • 5.master收到Driver提交的做業請求以後,向worker節點指派任務,其實就是讓其啓動對應的executor進程;
  • 6.worker節點收到master節點發來的啓動executor進程任務,就啓動對應的executor進程,同時向master彙報啓動成功,處於能夠接收任務的狀態;
  • 7.當executor進程啓動成功後,就像Driver進程反向註冊,以此來告訴driver,誰能夠接收任務,執行spark做業(該過程基於AKKA Actor事件驅動模型);
  • 8.driver接收到註冊以後,就知道了向誰發送spark做業,這樣在spark集羣中就有一組獨立的executor進程爲該driver服務;
  • 9.SparkContext重要組件運行——DAGSchedulerTaskSchedulerDAGScheduler根據寬依賴將做業劃分爲若干stage,併爲每個階段組裝一批task組成tasksettask裏面就包含了序列化以後的咱們編寫的spark transformation);而後將taskset交給TaskScheduler,由其將任務分發給對應的executor
  • 10.executor進程接收到driver發送過來的taskset,進行反序列化,而後將這些task封裝進一個叫taskrunner的線程中,放到本地線程池中,調度咱們的做業的執行;

3 疑惑與解答

1.爲何要向Executor發送taskset?oop

移動數據的成本遠遠高於移動計算,在大數據計算領域中,無論是spark仍是MapReduce,都遵循一個原則:移動計算,不移動數據學習

2.由於最終的計算都是在worker的executor上完成的,那麼driver爲何要將spark做業提交給master而不提交給worker?測試

能夠舉個簡單的例子來講明這個問題,假如如今集羣有8 cores8G內存(兩個worker節點,資源同樣的,因此每一個worker節點爲4 cores4G),而提交的spark任務須要4 cores6G內存,若是要找worker,請問哪個worker能搞定?顯然都不能,因此須要經過master來進行資源的合理分配,由於此時的計算是分佈式計算,而再也不是過去傳統的單個節點的計算了。大數據

4 task數量問題

Executor進程,會運行多少個task去處理RDD呢?這取決於RDD的partition數量,參考官方的說明:ui

One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize (e.g. sc.parallelize(data, 10)). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.

參考:http://spark.apache.org/docs/1.6.3/programming-guide.html#resilient-distributed-datasets-rdds

固然,使用sc.textFile的方式從外部數據集建立rdd時,也是能夠指定partition的數量的,這就意味着,當你的數據集量很大時,適當的提升partition數量,能夠提升並行度,固然,這也得取決於你的spark集羣或hadoop集羣規模(Yarn模式下)。

這裏能夠測試一下,sc.parallelize和sc.textFile方式來建立RDD時,指定partition數量的效果。

4.1 Parallelized Collections

建立RDD:

scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:21

執行action操做,觀察日誌輸出:

scala> rdd.count()
19/01/10 11:07:09 INFO spark.SparkContext: Starting job: count at <console>:24
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Got job 21 (count at <console>:24) with 3 output partitions
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Final stage: ResultStage 21 (count at <console>:24)
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Parents of final stage: List()
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Missing parents: List()
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Submitting ResultStage 21 (ParallelCollectionRDD[12] at parallelize at <console>:21), which has no missing parents
19/01/10 11:07:09 INFO storage.MemoryStore: Block broadcast_25 stored as values in memory (estimated size 1096.0 B, free 941.6 KB)
19/01/10 11:07:09 INFO storage.MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 804.0 B, free 942.4 KB)
19/01/10 11:07:09 INFO storage.BlockManagerInfo: Added broadcast_25_piece0 in memory on localhost:58709 (size: 804.0 B, free: 511.0 MB)
19/01/10 11:07:09 INFO spark.SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 21 (ParallelCollectionRDD[12] at parallelize at <console>:21)
19/01/10 11:07:09 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 3 tasks
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 65, localhost, partition 0,PROCESS_LOCAL, 2026 bytes)
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 21.0 (TID 66, localhost, partition 1,PROCESS_LOCAL, 2026 bytes)
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 21.0 (TID 67, localhost, partition 2,PROCESS_LOCAL, 2026 bytes)
19/01/10 11:07:09 INFO executor.Executor: Running task 1.0 in stage 21.0 (TID 66)
19/01/10 11:07:09 INFO executor.Executor: Running task 0.0 in stage 21.0 (TID 65)
19/01/10 11:07:09 INFO executor.Executor: Finished task 0.0 in stage 21.0 (TID 65). 953 bytes result sent to driver
19/01/10 11:07:09 INFO executor.Executor: Running task 2.0 in stage 21.0 (TID 67)
19/01/10 11:07:09 INFO executor.Executor: Finished task 2.0 in stage 21.0 (TID 67). 953 bytes result sent to driver
19/01/10 11:07:09 INFO executor.Executor: Finished task 1.0 in stage 21.0 (TID 66). 953 bytes result sent to driver
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 65) in 5 ms on localhost (1/3)
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 21.0 (TID 67) in 4 ms on localhost (2/3)
19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 21.0 (TID 66) in 6 ms on localhost (3/3)
19/01/10 11:07:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool
19/01/10 11:07:09 INFO scheduler.DAGScheduler: ResultStage 21 (count at <console>:24) finished in 0.006 s
19/01/10 11:07:09 INFO scheduler.DAGScheduler: Job 21 finished: count at <console>:24, took 0.015381 s
res25: Long = 6

能夠看到,由於前面指定了RDD的partition數量爲3,因此執行action操做時,executor上分配的task數量就爲3,即task 0.0task 1.0task 2.0

4.2 External Datasets

建立RDD:

scala> val rdd = sc.textFile("/hello", 2)
rdd: org.apache.spark.rdd.RDD[String] = /hello MapPartitionsRDD[18] at textFile at <console>:21

執行action操做,觀察日誌輸出:

scala> rdd.count()
19/01/10 11:14:01 INFO mapred.FileInputFormat: Total input paths to process : 1
19/01/10 11:14:01 INFO spark.SparkContext: Starting job: count at <console>:24
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Got job 24 (count at <console>:24) with 2 output partitions
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 24 (count at <console>:24)
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Parents of final stage: List()
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Missing parents: List()
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Submitting ResultStage 24 (/hello MapPartitionsRDD[18] at textFile at <console>:21), which has no missing parents
19/01/10 11:14:01 INFO storage.MemoryStore: Block broadcast_31 stored as values in memory (estimated size 2.9 KB, free 1638.6 KB)
19/01/10 11:14:01 INFO storage.MemoryStore: Block broadcast_31_piece0 stored as bytes in memory (estimated size 1764.0 B, free 1640.3 KB)
19/01/10 11:14:01 INFO storage.BlockManagerInfo: Added broadcast_31_piece0 in memory on localhost:58709 (size: 1764.0 B, free: 511.0 MB)
19/01/10 11:14:01 INFO spark.SparkContext: Created broadcast 31 from broadcast at DAGScheduler.scala:1006
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 24 (/hello MapPartitionsRDD[18] at textFile at <console>:21)
19/01/10 11:14:01 INFO scheduler.TaskSchedulerImpl: Adding task set 24.0 with 2 tasks
19/01/10 11:14:01 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 24.0 (TID 76, localhost, partition 0,ANY, 2129 bytes)
19/01/10 11:14:01 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 24.0 (TID 77, localhost, partition 1,ANY, 2129 bytes)
19/01/10 11:14:01 INFO executor.Executor: Running task 0.0 in stage 24.0 (TID 76)
19/01/10 11:14:01 INFO executor.Executor: Running task 1.0 in stage 24.0 (TID 77)
19/01/10 11:14:01 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/hello:0+31
19/01/10 11:14:01 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/hello:31+31
19/01/10 11:14:01 INFO executor.Executor: Finished task 0.0 in stage 24.0 (TID 76). 2137 bytes result sent to driver
19/01/10 11:14:01 INFO executor.Executor: Finished task 1.0 in stage 24.0 (TID 77). 2137 bytes result sent to driver
19/01/10 11:14:01 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 24.0 (TID 76) in 23 ms on localhost (1/2)
19/01/10 11:14:01 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 24.0 (TID 77) in 23 ms on localhost (2/2)
19/01/10 11:14:01 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks have all completed, from pool
19/01/10 11:14:01 INFO scheduler.DAGScheduler: ResultStage 24 (count at <console>:24) finished in 0.023 s
19/01/10 11:14:01 INFO scheduler.DAGScheduler: Job 24 finished: count at <console>:24, took 0.035795 s
res29: Long = 6

那麼效果跟前面是同樣的,只是這裏是從hdfs上讀取文件,同時partition數量設置爲2.

相關文章
相關標籤/搜索