概覽
拿到系統後,部署系統是第一件事,那麼系統部署成功之後,
各個節點都啓動了哪些服務?
部署圖

從部署圖中能夠看到
- 每一個 Worker 上存在一個或者多個 ExecutorBackend 進程。每一個進程包含一個 Executor 對象,該對象持有一個線程池,每一個線程能夠執行一個 task。
- 每一個 application 包含一個 driver 和多個 executors,每一個 executor 裏面運行的 tasks 都屬於同一個 application。
- 在 Standalone 版本中,ExecutorBackend 被實例化成 CoarseGrainedExecutorBackend 進程。
在我部署的集羣中每一個 Worker 只運行了一個 CoarseGrainedExecutorBackend 進程,沒有發現如何配置多個 CoarseGrainedExecutorBackend 進程。(應該是運行多個 applications 的時候會產生多個進程,這個我尚未實驗,) 想了解 Worker 和 Executor 的關係詳情,能夠參閱
@OopsOutOfMemory 同窗寫的
Spark Executor Driver資源調度小結 。
- Worker 經過持有 ExecutorRunner 對象來控制 CoarseGrainedExecutorBackend 的啓停。
瞭解了部署圖以後,咱們先給出一個 job 的例子,而後概覽一下 job 如何生成與運行。
Job 例子
咱們使用 Spark 自帶的 examples 包中的 GroupByTest,假設在 Master 節點運行,命令是
/* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers] */
bin/run-example GroupByTest 100 10000 1000 36
GroupByTest 具體代碼以下
package org.apache.spark.examples
import java.util.Random
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.SparkContext._
/**
* Usage: GroupByTest [numMappers] [numKVPairs] [valSize] [numReducers]
*/
object GroupByTest {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("GroupBy Test")
var numMappers = 100
var numKVPairs = 10000
var valSize = 1000
var numReducers = 36
val sc = new SparkContext(sparkConf)
val pairs1 = sc.parallelize(0 until numMappers, numMappers).flatMap { p =>
val ranGen = new Random
var arr1 = new Array[(Int, Array[Byte])](numKVPairs)
for (i <- 0 until numKVPairs) {
val byteArr = new Array[Byte](valSize)
ranGen.nextBytes(byteArr)
arr1(i) = (ranGen.nextInt(Int.MaxValue), byteArr)
}
arr1
}.cache
// Enforce that everything has been calculated and in cache
pairs1.count
println(pairs1.groupByKey(numReducers).count)
sc.stop()
}
}
閱讀代碼後,用戶頭腦中 job 的執行流程是這樣的:

具體流程很簡單,這裏來估算下 data size 和執行結果:
- 初始化 SparkConf()。
- 初始化 numMappers=100, numKVPairs=10,000, valSize=1000, numReducers= 36。
- 初始化 SparkContext。這一步很重要,是要確立 driver 的地位,裏面包含建立 driver 所需的各類 actors 和 objects。
- 每一個 mapper 生成一個
arr1: Array[(Int, Byte[])]
,length 爲 numKVPairs。每個 Byte[] 的 length 爲 valSize,Int 爲隨機生成的整數。Size(arr1) = numKVPairs * (4 + valSize) = 10MB
,因此Size(pairs1) = numMappers * Size(arr1) =1000MB
。這裏的數值計算結果都是約等於。
- 每一個 mapper 將產生的 arr1 數組 cache 到內存。
- 而後執行一個 action 操做 count(),來統計全部 mapper 中 arr1 中的元素個數,執行結果是
numMappers * numKVPairs = 1,000,000
。這一步主要是爲了將每一個 mapper 產生的 arr1 數組 cache 到內存。
- 在已經被 cache 的 paris1 上執行 groupByKey 操做,groupByKey 產生的 reducer (也就是 partition) 個數爲 numReducers。理論上,若是 hash(Key) 比較平均的話,每一個 reducer 收到的<int, array[byte]="">record 個數爲
numMappers * numKVPairs / numReducer = 27,777
,大小爲 Size(pairs1) / numReducer = 27MB
。
- reducer 將收到的
<Int, Byte[]>
records 中擁有相同 Int 的 records 聚在一塊兒,獲得 <Int, list(Byte[], Byte[], ..., Byte[])>
。
- 最後 count 將全部 reducer 中 records 個數進行加和,最後結果實際就是 pairs1 中不一樣的 Int 總個數。
Job 邏輯執行圖
Job 的實際執行流程比用戶頭腦中的要複雜,須要先創建邏輯執行圖(或者叫數據依賴圖),而後劃分邏輯執行圖生成 DAG 型的物理執行圖,而後生成具體 task 執行。分析一下這個 job 的邏輯執行圖: 使用
RDD.toDebugString
能夠看到整個 logical plan (RDD 的數據依賴關係)以下
MapPartitionsRDD[3] at groupByKey at GroupByTest.scala:51 (36 partitions)
ShuffledRDD[2] at groupByKey at GroupByTest.scala:51 (36 partitions)
FlatMappedRDD[1] at flatMap at GroupByTest.scala:38 (100 partitions)
ParallelCollectionRDD[0] at parallelize at GroupByTest.scala:38 (100 partitions)
用圖表示就是:
須要注意的是 data in the partition 展現的是每一個 partition 應該獲得的計算結果,並不意味着這些結果都同時存在於內存中。
根據上面的分析可知:
- 用戶首先 init 了一個0-99 的數組:
0 until numMappers
- parallelize() 產生最初的 ParrallelCollectionRDD,每一個 partition 包含一個整數 i。
- 執行 RDD 上的 transformation 操做(這裏是 flatMap)之後,生成 FlatMappedRDD,其中每一個 partition 包含一個 Array[(Int, Array[Byte])]。
- 第一個 count() 執行時,先在每一個 partition 上執行 count,而後執行結果被髮送到 driver,最後在 driver 端進行 sum。
- 因爲 FlatMappedRDD 被 cache 到內存,所以這裏將裏面的 partition 都換了一種顏色表示。
- groupByKey 產生了後面兩個 RDD,爲何產生這兩個在後面章節討論。
- 若是 job 須要 shuffle,通常會產生 ShuffledRDD。該 RDD 與前面的 RDD 的關係相似於 Hadoop 中 mapper 輸出數據與 reducer 輸入數據之間的關係。
- MapPartitionsRDD 裏包含 groupByKey() 的結果。
- 最後將 MapPartitionsRDD 中的 每一個value(也就是Array[Byte])都轉換成 Iterable 類型。
- 最後的 count 與上一個 count 的執行方式相似。
能夠看到邏輯執行圖描述的是 job 的數據流:job 會通過哪些 transformation(),中間生成哪些 RDD 及 RDD 之間的依賴關係。
Job 物理執行圖
邏輯執行圖表示的是數據上的依賴關係,不是 task 的執行圖。在 Hadoop 中,用戶直接面對 task,mapper 和 reducer 的職責分明:一個進行分塊處理,一個進行 aggregate。Hadoop 中 整個數據流是固定的,只須要填充 map() 和 reduce() 函數便可。Spark 面對的是更復雜的數據處理流程,數據依賴更加靈活,很難將數據流和物理 task 簡單地統一在一塊兒。所以 Spark 將數據流和具體 task 的執行流程分開,並設計算法將邏輯執行圖轉換成 task 物理執行圖,轉換算法後面的章節討論。 針對這個 job,咱們先畫出它的物理執行 DAG 圖以下:

能夠看到 GroupByTest 這個 application 產生了兩個 job,第一個 job 由第一個 action(也就是
pairs1.count
)觸發產生,分析一下第一個 job:
- 整個 job 只包含 1 個 stage(不明白什麼是stage不要緊,後面章節會解釋,這裏只需知道有這樣一個概念)。
- Stage 0 包含 100 個 ResultTask。
- 每一個 task 先計算 flatMap,產生 FlatMappedRDD,而後執行 action() 也就是 count(),統計每一個 partition 裏 records 的個數,好比 partition 99 裏面只含有 9 個 records。
- 因爲 pairs1 被聲明要進行 cache,所以在 task 計算獲得 FlatMappedRDD 後會將其包含的 partitions 都 cache 到 executor 的內存。
- task 執行完後,driver 收集每一個 task 的執行結果,而後進行 sum()。
- job 0 結束。
第二個 job 由
pairs1.groupByKey(numReducers).count
觸發產生。分析一下該 job:
- 整個 job 包含 2 個 stage。
- Stage 1 包含 100 個 ShuffleMapTask,每一個 task 負責從 cache 中讀取 pairs1 的一部分數據並將其進行相似 Hadoop 中 mapper 所作的 partition,最後將 partition 結果寫入本地磁盤。
- Stage 0 包含 36 個 ResultTask,每一個 task 首先 shuffle 本身要處理的數據,邊 fetch 數據邊進行 aggregate 以及後續的 mapPartitions() 操做,最後進行 count() 計算獲得 result。
- task 執行完後,driver 收集每一個 task 的執行結果,而後進行 sum()。
- job 1 結束。
能夠看到物理執行圖並不簡單。與 MapReduce 不一樣的是,Spark 中一個 application 可能包含多個 job,每一個 job 包含多個 stage,每一個 stage 包含多個 task。
怎麼劃分 job,怎麼劃分 stage,怎麼劃分 task 等等問題會在後面的章節介紹。
Discussion
到這裏,咱們對整個系統和 job 的生成與執行有了概念,並且還探討了 cache 等特性。 接下來的章節會討論 job 生成與執行涉及到的系統核心功能,包括:
- 如何生成邏輯執行圖
- 如何生成物理執行圖
- 如何提交與調度 Job
- Task 如何生成、執行與結果處理
- 如何進行 shuffle
- cache機制
- broadcast 機制
本文轉載於:http://spark-internals.books.yourtion.com/