[TOC]html
根據以前old li
(百度高級大數據工程師)給的一張草圖從新整理,並用processon
繪圖一下,這樣就更加清晰了。須要注意的是,這裏是基於Spark 2.x
如下的版本,由於在以前,底層通訊是基於AKKA ACTOR
的方式,可是以後就是使用RPC
的方式了。(最近原來是想把spark 2.x
的源碼好好閱讀一下,可是公司已有的系統都是基於spark 1.x
的,而且最近才更新到spark 1.6.3
,因此也不折騰,就把spark 1.x
的好好搞透,也不影響後面進一步的深刻學習與解理,由於這些都是舉一反三的。)apache
另外,這裏的原理圖是spark standalone
模式,關於其它模式(如spark on yarn
),後面則會再整理一下。架構
原理圖以下:分佈式
說明以下:ide
Spark
集羣,其實就是經過運行spark-all.sh
腳原本啓動master
節點和worker
節點,啓動了一個個對應的master
進程和worker
進程;worker
啓動以後,向master
進程發送註冊信息(該過程基於AKKA Actor
事件驅動模型);worker
向master
註冊成功以後,會不斷向master
發送心跳包,監聽master
節點是否存活(該過程基於AKKA Actor事件驅動模型);driver
向Spark
集羣提交做業,經過spark-submit.sh
腳本,向master
節點申請資源(該過程基於AKKA Actor
事件驅動模型);master
收到Driver
提交的做業請求以後,向worker
節點指派任務,其實就是讓其啓動對應的executor
進程;worker
節點收到master
節點發來的啓動executor
進程任務,就啓動對應的executor
進程,同時向master
彙報啓動成功,處於能夠接收任務的狀態;executor
進程啓動成功後,就像Driver
進程反向註冊,以此來告訴driver
,誰能夠接收任務,執行spark
做業(該過程基於AKKA Actor
事件驅動模型);driver
接收到註冊以後,就知道了向誰發送spark
做業,這樣在spark
集羣中就有一組獨立的executor
進程爲該driver
服務;SparkContext
重要組件運行——DAGScheduler
和TaskScheduler
,DAGScheduler
根據寬依賴將做業劃分爲若干stage
,併爲每個階段組裝一批task
組成taskset
(task
裏面就包含了序列化以後的咱們編寫的spark transformation
);而後將taskset
交給TaskScheduler
,由其將任務分發給對應的executor
;executor
進程接收到driver
發送過來的taskset
,進行反序列化,而後將這些task封裝進一個叫taskrunner
的線程中,放到本地線程池中,調度咱們的做業的執行;1.爲何要向Executor發送taskset?oop
移動數據的成本遠遠高於移動計算,在大數據計算領域中,無論是spark
仍是MapReduce
,都遵循一個原則:移動計算,不移動數據!學習
2.由於最終的計算都是在worker的executor上完成的,那麼driver爲何要將spark做業提交給master而不提交給worker?測試
能夠舉個簡單的例子來講明這個問題,假如如今集羣有8 cores
、8G
內存(兩個worker
節點,資源同樣的,因此每一個worker
節點爲4 cores
、4G
),而提交的spark
任務須要4 cores
、6G
內存,若是要找worker
,請問哪個worker
能搞定?顯然都不能,因此須要經過master
來進行資源的合理分配,由於此時的計算是分佈式計算,而再也不是過去傳統的單個節點的計算了。大數據
Executor進程,會運行多少個task去處理RDD呢?這取決於RDD的partition數量,參考官方的說明:ui
One important parameter for parallel collections is the number of partitions to cut the dataset into. Spark will run one task for each partition of the cluster. Typically you want 2-4 partitions for each CPU in your cluster. Normally, Spark tries to set the number of partitions automatically based on your cluster. However, you can also set it manually by passing it as a second parameter to parallelize
(e.g. sc.parallelize(data, 10)
). Note: some places in the code use the term slices (a synonym for partitions) to maintain backward compatibility.
參考:http://spark.apache.org/docs/1.6.3/programming-guide.html#resilient-distributed-datasets-rdds
固然,使用sc.textFile的方式從外部數據集建立rdd時,也是能夠指定partition的數量的,這就意味着,當你的數據集量很大時,適當的提升partition數量,能夠提升並行度,固然,這也得取決於你的spark集羣或hadoop集羣規模(Yarn模式下)。
這裏能夠測試一下,sc.parallelize和sc.textFile方式來建立RDD時,指定partition數量的效果。
建立RDD:
scala> val rdd = sc.parallelize(List(1, 2, 3, 4, 5, 6), 3) rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:21
執行action操做,觀察日誌輸出:
scala> rdd.count() 19/01/10 11:07:09 INFO spark.SparkContext: Starting job: count at <console>:24 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Got job 21 (count at <console>:24) with 3 output partitions 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Final stage: ResultStage 21 (count at <console>:24) 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Parents of final stage: List() 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Missing parents: List() 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Submitting ResultStage 21 (ParallelCollectionRDD[12] at parallelize at <console>:21), which has no missing parents 19/01/10 11:07:09 INFO storage.MemoryStore: Block broadcast_25 stored as values in memory (estimated size 1096.0 B, free 941.6 KB) 19/01/10 11:07:09 INFO storage.MemoryStore: Block broadcast_25_piece0 stored as bytes in memory (estimated size 804.0 B, free 942.4 KB) 19/01/10 11:07:09 INFO storage.BlockManagerInfo: Added broadcast_25_piece0 in memory on localhost:58709 (size: 804.0 B, free: 511.0 MB) 19/01/10 11:07:09 INFO spark.SparkContext: Created broadcast 25 from broadcast at DAGScheduler.scala:1006 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Submitting 3 missing tasks from ResultStage 21 (ParallelCollectionRDD[12] at parallelize at <console>:21) 19/01/10 11:07:09 INFO scheduler.TaskSchedulerImpl: Adding task set 21.0 with 3 tasks 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 21.0 (TID 65, localhost, partition 0,PROCESS_LOCAL, 2026 bytes) 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 21.0 (TID 66, localhost, partition 1,PROCESS_LOCAL, 2026 bytes) 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Starting task 2.0 in stage 21.0 (TID 67, localhost, partition 2,PROCESS_LOCAL, 2026 bytes) 19/01/10 11:07:09 INFO executor.Executor: Running task 1.0 in stage 21.0 (TID 66) 19/01/10 11:07:09 INFO executor.Executor: Running task 0.0 in stage 21.0 (TID 65) 19/01/10 11:07:09 INFO executor.Executor: Finished task 0.0 in stage 21.0 (TID 65). 953 bytes result sent to driver 19/01/10 11:07:09 INFO executor.Executor: Running task 2.0 in stage 21.0 (TID 67) 19/01/10 11:07:09 INFO executor.Executor: Finished task 2.0 in stage 21.0 (TID 67). 953 bytes result sent to driver 19/01/10 11:07:09 INFO executor.Executor: Finished task 1.0 in stage 21.0 (TID 66). 953 bytes result sent to driver 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 21.0 (TID 65) in 5 ms on localhost (1/3) 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 21.0 (TID 67) in 4 ms on localhost (2/3) 19/01/10 11:07:09 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 21.0 (TID 66) in 6 ms on localhost (3/3) 19/01/10 11:07:09 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 21.0, whose tasks have all completed, from pool 19/01/10 11:07:09 INFO scheduler.DAGScheduler: ResultStage 21 (count at <console>:24) finished in 0.006 s 19/01/10 11:07:09 INFO scheduler.DAGScheduler: Job 21 finished: count at <console>:24, took 0.015381 s res25: Long = 6
能夠看到,由於前面指定了RDD的partition數量爲3,因此執行action操做時,executor上分配的task數量就爲3,即task 0.0
、task 1.0
、task 2.0
。
建立RDD:
scala> val rdd = sc.textFile("/hello", 2) rdd: org.apache.spark.rdd.RDD[String] = /hello MapPartitionsRDD[18] at textFile at <console>:21
執行action操做,觀察日誌輸出:
scala> rdd.count() 19/01/10 11:14:01 INFO mapred.FileInputFormat: Total input paths to process : 1 19/01/10 11:14:01 INFO spark.SparkContext: Starting job: count at <console>:24 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Got job 24 (count at <console>:24) with 2 output partitions 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Final stage: ResultStage 24 (count at <console>:24) 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Parents of final stage: List() 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Missing parents: List() 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Submitting ResultStage 24 (/hello MapPartitionsRDD[18] at textFile at <console>:21), which has no missing parents 19/01/10 11:14:01 INFO storage.MemoryStore: Block broadcast_31 stored as values in memory (estimated size 2.9 KB, free 1638.6 KB) 19/01/10 11:14:01 INFO storage.MemoryStore: Block broadcast_31_piece0 stored as bytes in memory (estimated size 1764.0 B, free 1640.3 KB) 19/01/10 11:14:01 INFO storage.BlockManagerInfo: Added broadcast_31_piece0 in memory on localhost:58709 (size: 1764.0 B, free: 511.0 MB) 19/01/10 11:14:01 INFO spark.SparkContext: Created broadcast 31 from broadcast at DAGScheduler.scala:1006 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 24 (/hello MapPartitionsRDD[18] at textFile at <console>:21) 19/01/10 11:14:01 INFO scheduler.TaskSchedulerImpl: Adding task set 24.0 with 2 tasks 19/01/10 11:14:01 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 24.0 (TID 76, localhost, partition 0,ANY, 2129 bytes) 19/01/10 11:14:01 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 24.0 (TID 77, localhost, partition 1,ANY, 2129 bytes) 19/01/10 11:14:01 INFO executor.Executor: Running task 0.0 in stage 24.0 (TID 76) 19/01/10 11:14:01 INFO executor.Executor: Running task 1.0 in stage 24.0 (TID 77) 19/01/10 11:14:01 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/hello:0+31 19/01/10 11:14:01 INFO rdd.HadoopRDD: Input split: hdfs://localhost:9000/hello:31+31 19/01/10 11:14:01 INFO executor.Executor: Finished task 0.0 in stage 24.0 (TID 76). 2137 bytes result sent to driver 19/01/10 11:14:01 INFO executor.Executor: Finished task 1.0 in stage 24.0 (TID 77). 2137 bytes result sent to driver 19/01/10 11:14:01 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 24.0 (TID 76) in 23 ms on localhost (1/2) 19/01/10 11:14:01 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 24.0 (TID 77) in 23 ms on localhost (2/2) 19/01/10 11:14:01 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 24.0, whose tasks have all completed, from pool 19/01/10 11:14:01 INFO scheduler.DAGScheduler: ResultStage 24 (count at <console>:24) finished in 0.023 s 19/01/10 11:14:01 INFO scheduler.DAGScheduler: Job 24 finished: count at <console>:24, took 0.035795 s res29: Long = 6
那麼效果跟前面是同樣的,只是這裏是從hdfs上讀取文件,同時partition數量設置爲2.