Apache Spark 的設計與實現(job邏輯執行圖)

Job 邏輯執行圖

General logical plan

deploy 典型的 Job 邏輯執行圖如上所示,通過下面四個步驟能夠獲得最終執行結果:
  • 從數據源(能夠是本地 file,內存數據結構, HDFS,HBase 等)讀取數據建立最初的 RDD。上一章例子中的 parallelize() 至關於 createRDD()。
  • 對 RDD 進行一系列的 transformation() 操做,每個 transformation() 會產生一個或多個包含不一樣類型 T 的 RDD[T]。T 能夠是 Scala 裏面的基本類型或數據結構,不限於 (K, V)。但若是是 (K, V),K 不能是 Array 等複雜類型(由於難以在複雜類型上定義 partition 函數)。
  • 對最後的 final RDD 進行 action() 操做,每一個 partition 計算後產生結果 result。
  • 將 result 回送到 driver 端,進行最後的 f(list[result]) 計算。例子中的 count() 實際包含了action() 和 sum() 兩步計算。
RDD 能夠被 cache 到內存或者 checkpoint 到磁盤上。RDD 中的 partition 個數不固定,一般由用戶設定。RDD 和 RDD 之間 partition 的依賴關係能夠不是 1 對 1,如上圖既有 1 對 1 關係,也有多對多的關係。

邏輯執行圖的生成

瞭解了 Job 的邏輯執行圖後,寫程序時候會在腦中造成相似上面的數據依賴圖。然而,實際生成的 RDD 個數每每比咱們想一想的個數多。 要解決邏輯執行圖生成問題,實際須要解決:
  • 如何產生 RDD,應該產生哪些 RDD?
  • 如何創建 RDD 之間的依賴關係?

1. 如何產生 RDD,應該產生哪些 RDD?

解決這個問題的初步想法是讓每個 transformation() 方法返回(new)一個 RDD。事實也基本如此,只是某些 transformation() 比較複雜,會包含多個子 transformation(),於是會生成多個 RDD。這就是 實際 RDD 個數比咱們想象的多一些 的緣由。 如何計算每一個 RDD 中的數據?邏輯執行圖其實是 computing chain,那麼 transformation() 的計算邏輯在哪裏被 perform?每一個 RDD 裏有 compute() 方法,負責接收來自上一個 RDD 或者數據源的 input records,perform transformation() 的計算邏輯,而後輸出 records。 產生哪些 RDD 與 transformation() 的計算邏輯有關,下面討論一些典型的 transformation() 及其建立的 RDD。官網上已經解釋了每一個 transformation 的含義。iterator(split) 的意思是 foreach record in the partition。這裏空了不少,是由於那些 transformation() 較爲複雜,會產生多個 RDD,具體會在下一節圖示出來。
Transformation Generated RDDs Compute()
map(func) MappedRDD iterator(split).map(f)
filter(func) FilteredRDD iterator(split).filter(f)
flatMap(func) FlatMappedRDD iterator(split).flatMap(f)
mapPartitions(func) MapPartitionsRDD f(iterator(split))
mapPartitionsWithIndex(func) MapPartitionsRDD f(split.index, iterator(split))
sample(withReplacement, fraction, seed) PartitionwiseSampledRDD PoissonSampler.sample(iterator(split)) BernoulliSampler.sample(iterator(split))
pipe(command, [envVars]) PipedRDD
union(otherDataset)
intersection(otherDataset)
distinct([numTasks]))
groupByKey([numTasks])
reduceByKey(func, [numTasks])
sortByKey([ascending], [numTasks])
join(otherDataset, [numTasks])
cogroup(otherDataset, [numTasks])
cartesian(otherDataset)
coalesce(numPartitions)
repartition(numPartitions)

2. 如何創建 RDD 之間的聯繫?

RDD 之間的數據依賴問題實際包括三部分:
  • RDD 自己的依賴關係。要生成的 RDD(之後用 RDD x 表示)是依賴一個 parent RDD,仍是多個 parent RDDs?
  • RDD x 中會有多少個 partition ?
  • RDD x 與其 parent RDDs 中 partition 之間是什麼依賴關係?是依賴 parent RDD 中一個仍是多個 partition?
第一個問題能夠很天然的解決,好比 x = rdda.transformation(rddb) (e.g., x = a.join(b)) 就表示 RDD x 同時依賴於 RDD a 和 RDD b。 第二個問題中的 partition 個數通常由用戶指定,不指定的話通常取 max(numPartitions[parent RDD 1], .., numPartitions[parent RDD n])。 第三個問題比較複雜。須要考慮這個 transformation() 的語義,不一樣的 transformation() 的依賴關係不一樣。好比 map() 是 1:1,而 groupByKey() 邏輯執行圖中的 ShuffledRDD 中的每一個 partition 依賴於 parent RDD 中全部的 partition,還有更復雜的狀況。 再次考慮第三個問題,RDD x 中每一個 partition 能夠依賴於 parent RDD 中一個或者多個 partition。並且這個依賴能夠是徹底依賴或者部分依賴。部分依賴指的是 parent RDD 中某 partition 中一部分數據與 RDD x 中的一個 partition 相關,另外一部分數據與 RDD x 中的另外一個 partition 相關。下圖展現了徹底依賴和部分依賴。 Dependency 前三個是徹底依賴,RDD x 中的 partition 與 parent RDD 中的 partition/partitions 徹底相關。最後一個是部分依賴,RDD x 中的 partition 只與 parent RDD 中的 partition 一部分數據相關,另外一部分數據與 RDD x 中的其餘 partition 相關。 在 Spark 中,徹底依賴被稱爲 NarrowDependency,部分依賴被稱爲 ShuffleDependency。其實 ShuffleDependency 跟 MapReduce 中 shuffle 的數據依賴相同(mapper 將其 output 進行 partition,而後每一個 reducer 會將全部 mapper 輸出中屬於本身的 partition 經過 HTTP fetch 獲得)。
  • 第一種 1:1 的狀況被稱爲 OneToOneDependency。
  • 第二種 N:1 的狀況被稱爲 N:1 NarrowDependency。
  • 第三種 N:N 的狀況被稱爲 N:N NarrowDependency。不屬於前兩種狀況的徹底依賴都屬於這個類別。
  • 第四種被稱爲 ShuffleDependency。
對於 NarrowDependency,具體 RDD x 中的 partitoin i 依賴 parrent RDD 中一個 partition 仍是多個 partitions,是由 RDD x 中的 getParents(partition i) 決定(下圖中某些例子會詳細介紹)。還有一種 RangeDependency 的徹底依賴,不過該依賴目前只在 UnionRDD 中使用,下面會介紹。 因此,總結下來 partition 之間的依賴關係以下:
  • NarrowDependency (使用黑色實線或黑色虛線箭頭表示)
    • OneToOneDependency (1:1)
      • NarrowDependency (N:1)
    • NarrowDependency (N:N)
    • RangeDependency (只在 UnionRDD 中使用)
  • ShuffleDependency (使用紅色箭頭表示)
之因此要劃分 NarrowDependency 和 ShuffleDependency 是爲了生成物理執行圖,下一章會具體介紹。
須要注意的是第三種 NarrowDependency (N:N) 不多在兩個 RDD 之間出現。由於若是 parent RDD 中的 partition 同時被 child RDD 中多個 partitions 依賴,那麼最後生成的依賴圖每每與 ShuffleDependency 同樣。只是對於 parent RDD 中的 partition 來講一個是徹底依賴,一個是部分依賴,而箭頭數沒有少。因此 Spark 定義的 NarrowDependency 實際上是 「each partition of the parent RDD is used by at most one partition of the child RDD「,也就是隻有 OneToOneDependency (1:1) 和 NarrowDependency (N:1) 兩種狀況。可是,本身設計的奇葩 RDD 確實能夠呈現出 NarrowDependency (N:N) 的狀況。這裏描述的比較亂,其實看懂下面的幾個典型的 RDD 依賴便可。
如何計算獲得 RDD x 中的數據(records)?下圖展現了 OneToOneDependency 的數據依賴,雖然 partition 和 partition 之間是 1:1,但不表明計算 records 的時候也是讀一個 record 計算一個 record。 下圖右邊上下兩個 pattern 之間的差異相似於下面兩個程序的差異: Dependency code1 of iter.f()
int[] array = {1, 2, 3, 4, 5}
for(int i = 0; i < array.length; i++)
    f(array[i])
  code2 of f(iter)
int[] array = {1, 2, 3, 4, 5}
f(array)
 

3. 給出一些典型的 transformation() 的計算過程及數據依賴圖

1) union(otherRDD) union union() 將兩個 RDD 簡單合併在一塊兒,不改變 partition 裏面的數據。RangeDependency 實際上也是 1:1,只是爲了訪問 union() 後的 RDD 中的 partition 方便,保留了原始 RDD 的 range 邊界。 2) groupByKey(numPartitions) groupByKey 上一章已經介紹了 groupByKey 的數據依賴,這裏算是 溫故而知新 吧。 groupByKey() 只須要將 Key 相同的 records 聚合在一塊兒,一個簡單的 shuffle 過程就能夠完成。ShuffledRDD 中的 compute() 只負責將屬於每一個 partition 的數據 fetch 過來,以後使用 mapPartitions() 操做(前面的 OneToOneDependency 展現過)進行 aggregate,生成 MapPartitionsRDD,到這裏 groupByKey() 已經結束。最後爲了統一返回值接口,將 value 中的 ArrayBuffer[] 數據結構抽象化成 Iterable[]。
groupByKey() 沒有在 map 端進行 combine,由於 map 端 combine 只會省掉 partition 裏面重複 key 佔用的空間,當重複 key 特別多時,能夠考慮開啓 combine。 這裏的 ArrayBuffer 實際上應該是 CompactBuffer,An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
ParallelCollectionRDD 是最基礎的 RDD,直接從 local 數據結構 create 出的 RDD 屬於這個類型,好比
val pairs = sc.parallelize(List(1, 2, 3, 4, 5), 3)
  生成的 pairs 就是 ParallelCollectionRDD。 2) reduceyByKey(func, numPartitions) reduceyByKey reduceyByKey() 至關於傳統的 MapReduce,整個數據流也與 Hadoop 中的數據流基本同樣。reduceyByKey() 默認在 map 端開啓 combine(),所以在 shuffle 以前先經過 mapPartitions 操做進行 combine,獲得 MapPartitionsRDD,而後 shuffle 獲得 ShuffledRDD,而後再進行 reduce(經過 aggregate + mapPartitions() 操做來實現)獲得 MapPartitionsRDD。 3) distinct(numPartitions) distinct distinct() 功能是 deduplicate RDD 中的全部的重複數據。因爲重複數據可能分散在不一樣的 partition 裏面,所以須要 shuffle 來進行 aggregate 後再去重。然而,shuffle 要求數據類型是 <K, V>。若是原始數據只有 Key(好比例子中 record 只有一個整數),那麼須要補充成 <K, null>。這個補充過程由 map() 操做完成,生成 MappedRDD。而後調用上面的 reduceByKey() 來進行 shuffle,在 map 端進行 combine,而後 reduce 進一步去重,生成 MapPartitionsRDD。最後,將 <K, null> 還原成 K,仍然由 map() 完成,生成 MappedRDD。藍色的部分就是調用的 reduceByKey()。 4) cogroup(otherRDD, numPartitions) cogroup 與 groupByKey() 不一樣,cogroup() 要 aggregate 兩個或兩個以上的 RDD。 那麼 CoGroupedRDD 與 RDD a 和 RDD b 的關係都必須是 ShuffleDependency 麼?是否存在 OneToOneDependency? 首先要明確的是 CoGroupedRDD 存在幾個 partition 能夠由用戶直接設定,與 RDD a 和 RDD b 無關。然而,若是 CoGroupedRDD 中 partition 個數與 RDD a/b 中的 partition 個數不同,那麼不可能存在 1:1 的關係。 再次,cogroup() 的計算結果放在 CoGroupedRDD 中哪一個 partition 是由用戶設置的 partitioner 肯定的(默認是 HashPartitioner)。那麼能夠推出:即便 RDD a/b 中的 partition 個數與 CoGroupedRDD 中的同樣,若是 RDD a/b 中的 partitioner 與 CoGroupedRDD 中的不同,也不可能存在 1:1 的關係。好比,在上圖的 example 裏面,RDD a 是 RangePartitioner,b 是 HashPartitioner,CoGroupedRDD 也是 RangePartitioner 且 partition 個數與 a 的相同。那麼很天然地,a 中的每一個 partition 中 records 能夠直接送到 CoGroupedRDD 中對應的 partition。RDD b 中的 records 必須再次進行劃分與 shuffle 後才能進入對應的 partition。 最後,通過上面分析, 對於兩個或兩個以上的 RDD 聚合,當且僅當聚合後的 RDD 中 partitioner 類別及 partition 個數與前面的 RDD 都相同,纔會與前面的 RDD 構成 1:1 的關係。不然,只能是 ShuffleDependency。這個算法對應的代碼能夠在 CoGroupedRDD.getDependencies() 中找到,雖然比較難理解。
Spark 代碼中如何表示 CoGroupedRDD 中的 partition 依賴於多個 parent RDDs 中的 partitions? 首先,將 CoGroupedRDD 依賴的全部 RDD 放進數組 rdds[RDD] 中。再次,foreach i,若是 CoGroupedRDD 和 rdds(i) 對應的 RDD 是 OneToOneDependency 關係,那麼 Dependecy[i] = new OneToOneDependency(rdd),不然 = new ShuffleDependency(rdd)。最後,返回與每一個 parent RDD 的依賴關係數組 deps[Dependency]。 Dependency 類中的 getParents(partition id) 負責給出某個 partition 按照該 dependency 所依賴的 parent RDD 中的 partitions: List[Int]。 getPartitions() 負責給出 RDD 中有多少個 partition,以及每一個 partition 如何序列化。
5) intersection(otherRDD) intersection intersection() 功能是抽取出 RDD a 和 RDD b 中的公共數據。先使用 map() 將 RDD[T] 轉變成 RDD[(T, null)],這裏的 T 只要不是 Array 等集合類型便可。接着,進行 a.cogroup(b),藍色部分與前面的 cogroup() 同樣。以後再使用 filter() 過濾掉 [iter(groupA()), iter(groupB())] 中 groupA 或 groupB 爲空的 records,獲得 FilteredRDD。最後,使用 keys() 只保留 key 便可,獲得 MappedRDD。 6) join(otherRDD, numPartitions) join join() 將兩個 RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一塊兒。與 intersection() 相似,首先進行 cogroup(),獲得 <K, (Iterable[V1], Iterable[V2])>類型的 MappedValuesRDD,而後對 Iterable[V1] 和 Iterable[V2] 作笛卡爾集,並將集合 flat() 化。 這裏給出了兩個 example,第一個 example 的 RDD 1 和 RDD 2 使用 RangePartitioner 劃分,而 CoGroupedRDD 使用 HashPartitioner,與 RDD 1/2 都不同,所以是 ShuffleDependency。第二個 example 中, RDD 1 事先使用 HashPartitioner 對其 key 進行劃分,獲得三個 partition,與 CoGroupedRDD 使用的 HashPartitioner(3) 一致,所以數據依賴是 1:1。若是 RDD 2 事先也使用 HashPartitioner 對其 key 進行劃分,獲得三個 partition,那麼 join() 就不存在 ShuffleDependency 了,這個 join() 也就變成了 hashjoin()。 7) sortByKey(ascending, numPartitions) sortByKey sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的數據依賴很簡單,先使用 shuffle 將 records 彙集在一塊兒(放到對應的 partition 裏面),而後將 partition 內的全部 records 按 key 排序,最後獲得的 MapPartitionsRDD 中的 records 就有序了。
目前 sortByKey() 先使用 Array 來保存 partition 中全部的 records,再排序。
8) cartesian(otherRDD) cartesian Cartesian 對兩個 RDD 作笛卡爾集,生成的 CartesianRDD 中 partition 個數 = partitionNum(RDD a) * partitionNum(RDD b)。 這裏的依賴關係與前面的不太同樣,CartesianRDD 中每一個partition 依賴兩個 parent RDD,並且其中每一個 partition 徹底依賴 RDD a 中一個 partition,同時又徹底依賴 RDD b 中另外一個 partition。這裏沒有紅色箭頭,由於全部依賴都是 NarrowDependency。
CartesianRDD.getDependencies() 返回 rdds[RDD a, RDD b]。CartesianRDD 中的 partiton i 依賴於 (RDD a).List(i / numPartitionsInRDDb) 和 (RDD b).List(i % numPartitionsInRDDb)。
9) coalesce(numPartitions, shuffle = false) Coalesce coalesce() 能夠將 parent RDD 的 partition 個數進行調整,好比從 5 個減小到 3 個,或者從 5 個增長到 10 個。須要注意的是當 shuffle = false 的時候,是不能增長 partition 個數的(不能從 5 個變爲 10 個)。 coalesce() 的核心問題是 如何確立 CoalescedRDD 中 partition 和其 parent RDD 中 partition 的關係。
  • coalesce(shuffle = false) 時,因爲不能進行 shuffle,問題變爲 parent RDD 中哪些partition 能夠合併在一塊兒。合併因素除了要考慮 partition 中元素個數外,還要考慮 locality 及 balance 的問題。所以,Spark 設計了一個很是複雜的算法來解決該問題(算法部分我尚未深究)。注意Example: a.coalesce(3, shuffle = false)展現了 N:1 的 NarrowDependency。
  • coalesce(shuffle = true) 時,因爲能夠進行 shuffle,問題變爲如何將 RDD 中全部 records 平均劃分到 N 個 partition 中。很簡單,在每一個 partition 中,給每一個 record 附加一個 key,key 遞增,這樣通過 hash(key) 後,key 能夠被平均分配到不一樣的 partition 中,相似 Round-robin 算法。在第二個例子中,RDD a 中的每一個元素,先被加上了遞增的 key(如 MapPartitionsRDD 第二個 partition 中 (1, 3) 中的 1)。在每一個 partition 中,第一個元素 (Key, Value) 中的 key 由 (new Random(index)).nextInt(numPartitions) 計算獲得,index 是該 partition 的索引,numPartitions 是 CoalescedRDD 中的 partition 個數。接下來元素的 key 是遞增的,而後 shuffle 後的 ShuffledRDD 能夠獲得均分的 records,而後通過複雜算法來創建 ShuffledRDD 和 CoalescedRDD 之間的數據聯繫,最後過濾掉 key,獲得 coalesce 後的結果 MappedRDD。
10) repartition(numPartitions) 等價於 coalesce(numPartitions, shuffle = true)

Primitive transformation()

combineByKey() 分析了這麼多 RDD 的邏輯執行圖,它們之間有沒有共同之處?若是有,是怎麼被設計和實現的? 仔細分析 RDD 的邏輯執行圖會發現,ShuffleDependency 左邊的 RDD 中的 record 要求是 <key, value> 型的,通過 ShuffleDependency 後,包含相同 key 的 records 會被 aggregate 到一塊兒,而後在 aggregated 的 records 上執行不一樣的計算邏輯。實際執行時(後面的章節會具體談到)不少 transformation() 如 groupByKey(),reduceByKey() 是邊 aggregate 數據邊執行計算邏輯的,所以共同之處就是 aggregate 同時 compute()。Spark 使用 combineByKey() 來實現這個 aggregate + compute() 的基礎操做。 combineByKey() 的定義以下:
def combineByKey[C](createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null): RDD[(K, C)]
  其中主要有三個參數 createCombiner,mergeValue 和 mergeCombiners。簡單解釋下這三個函數及 combineByKey() 的意義,注意它們的類型: 假設一組具備相同 K 的 <K, V> records 正在一個個流向 combineByKey(),createCombiner 將第一個 record 的 value 初始化爲 c (好比,c = value),而後從第二個 record 開始,來一個 record 就使用 mergeValue(c, record.value) 來更新 c,好比想要對這些 records 的全部 values 作 sum,那麼使用 c = c + record.value。等到 records 所有被 mergeValue(),獲得結果 c。假設還有一組 records(key 與前面那組的 key 均相同)一個個到來,combineByKey() 使用前面的方法不斷計算獲得 c'。如今若是要求這兩組 records 總的 combineByKey() 後的結果,那麼可使用 final c = mergeCombiners(c, c') 來計算。

Discussion

至此,咱們討論瞭如何生成 job 的邏輯執行圖,這些圖也是 Spark 看似簡單的 API 背後的複雜計算邏輯及數據依賴關係。 整個 job 會產生哪些 RDD 由 transformation() 語義決定。一些 transformation(), 好比 cogroup() 會被不少其餘操做用到。 RDD 自己的依賴關係由 transformation() 生成的每個 RDD 自己語義決定。如 CoGroupedRDD 依賴於全部參加 cogroup() 的 RDDs。 RDD 中 partition 依賴關係分爲 NarrowDependency 和 ShuffleDependency。前者是徹底依賴,後者是部分依賴。NarrowDependency 裏面又包含多種狀況,只有先後兩個 RDD 的 partition 個數以及 partitioner 都同樣,纔會出現 NarrowDependency。 從數據處理邏輯的角度來看,MapReduce 至關於 Spark 中的 map() + reduceByKey(),但嚴格來說 MapReduce 中的 reduce() 要比 reduceByKey() 的功能強大些,詳細差異會在 Shuffle details 一章中繼續討論。
相關文章
相關標籤/搜索