本系列文章源自JerryLead的SparkInternals,本文只是在做者的原文基礎上加入本身的理解,批註,和部分源碼,做爲學習之用
注:原文是基於Spark 1.0.2 , 而本篇筆記是基於spark 2.2.0, 對比後發現核心部分變化不大,依舊值得參考html
典型的 Job 邏輯執行圖如上所示,通過下面四個步驟能夠獲得最終執行結果:java
RDD 能夠被 cache 到內存或者 checkpoint 到磁盤上。RDD 中的 partition 個數不固定,一般由用戶設定。RDD 和 RDD 之間 partition 的依賴關係能夠不是 1 對 1,如上圖既有 1 對 1 關係,也有多對多的關係。git
瞭解了 Job 的邏輯執行圖後,寫程序時候會在腦中造成相似上面的數據依賴圖。然而,實際生成的 RDD 個數每每比咱們想一想的個數多。github
要解決邏輯執行圖生成問題,實際須要解決:算法
解決這個問題的初步想法是讓每個 transformation() 方法返回(new)一個 RDD。事實也基本如此,只是某些 transformation() 比較複雜,會包含多個子 transformation(),於是會生成多個 RDD。這就是實際 RDD 個數比咱們想象的多一些 的緣由。apache
如何計算每一個 RDD 中的數據?邏輯執行圖其實是 computing chain,那麼 transformation() 的計算邏輯在哪裏被 perform?每一個 RDD 裏有 compute() 方法,負責接收來自上一個 RDD 或者數據源的 input records,perform transformation() 的計算邏輯,而後輸出 records。數組
/** * RDD.scala * :: DeveloperApi :: * Implemented by subclasses to compute a given partition. */
@DeveloperApi
def compute(split: Partition, context: TaskContext): Iterator[T]
//MapPartitionsRDD.scala
override def compute(split: Partition, context: TaskContext): Iterator[U] =
f(context, split.index, firstParent[T].iterator(split, context))
複製代碼
產生哪些 RDD 與 transformation() 的計算邏輯有關,下面討論一些典型的 transformation() 及其建立的 RDD。官網上已經解釋了每一個 transformation 的含義。iterator(split) 的意思是 foreach record in the partition。這裏空了不少,是由於那些 transformation() 較爲複雜,會產生多個 RDD,具體會在下一節圖示出來。數據結構
Transformation | Generated RDDs | Compute() |
---|---|---|
map(func) | MappedRDD | iterator(split).map(f) |
filter(func) | FilteredRDD | iterator(split).filter(f) |
flatMap(func) | FlatMappedRDD | iterator(split).flatMap(f) |
mapPartitions(func) | MapPartitionsRDD | f(iterator(split)) |
mapPartitionsWithIndex(func) | MapPartitionsRDD | f(split.index, iterator(split)) |
sample(withReplacement, fraction, seed) | PartitionwiseSampledRDD | PoissonSampler.sample(iterator(split)) BernoulliSampler.sample(iterator(split)) |
pipe(command, [envVars]) | PipedRDD | |
union(otherDataset) | ||
intersection(otherDataset) | ||
distinct([numTasks])) | ||
groupByKey([numTasks]) | ||
reduceByKey(func, [numTasks]) | ||
sortByKey([ascending], [numTasks]) | ||
join(otherDataset, [numTasks]) | ||
cogroup(otherDataset, [numTasks]) | ||
cartesian(otherDataset) | ||
coalesce(numPartitions) | ||
repartition(numPartitions) |
spark 2.x 沒找到MappedRDD,FilteredRDD,FlatMappedRDD,都統一改成了MapPartitionsRDD,在執行邏輯上沒有改變閉包
RDD 之間的數據依賴問題實際包括三部分:app
第一個問題能夠很天然的解決,好比x = rdda.transformation(rddb)
(e.g., x = a.join(b)) 就表示 RDD x 同時依賴於 RDD a 和 RDD b。
第二個問題中的 partition 個數通常由用戶指定,不指定的話通常取max(numPartitions[parent RDD 1], .., numPartitions[parent RDD n])
。
object Partitioner {
/** * Choose a partitioner to use for a cogroup-like operation between a number of RDDs. * * If any of the RDDs already has a partitioner, choose that one. * * Otherwise, we use a default HashPartitioner. For the number of partitions, if * spark.default.parallelism is set, then we'll use the value from SparkContext * defaultParallelism, otherwise we'll use the max number of upstream partitions. * * Unless spark.default.parallelism is set, the number of partitions will be the * same as the number of partitions in the largest upstream RDD, as this should * be least likely to cause out-of-memory errors. * feng:取最大的RDD partitions * We use two method parameters (rdd, others) to enforce callers passing at least 1 RDD. */
def defaultPartitioner(rdd: RDD[_], others: RDD[_]*): Partitioner = {
val rdds = (Seq(rdd) ++ others)
//feng:先過濾出有partition的rdd
val hasPartitioner = rdds.filter(_.partitioner.exists(_.numPartitions > 0))
if (hasPartitioner.nonEmpty) {
//直接取rdd中partitions數量最大的rdd的partitioner
hasPartitioner.maxBy(_.partitions.length).partitioner.get
} else {
//使用默認的HashPartitioner,並行度自定義優先,不然就取rdd中最大的partition數量,其實就是rdd中最大的並行度
if (rdd.context.conf.contains("spark.default.parallelism")) {
new HashPartitioner(rdd.context.defaultParallelism)
} else {
new HashPartitioner(rdds.map(_.partitions.length).max)
}
}
}
}
複製代碼
第三個問題比較複雜。須要考慮這個 transformation() 的語義,不一樣的 transformation() 的依賴關係不一樣。好比 map() 是 1:1,而 groupByKey() 邏輯執行圖中的 ShuffledRDD 中的每一個 partition 依賴於 parent RDD 中全部的 partition,還有更復雜的狀況。
再次考慮第三個問題,RDD x 中每一個 partition 能夠依賴於 parent RDD 中一個或者多個 partition。並且這個依賴能夠是徹底依賴或者部分依賴。部分依賴指的是 parent RDD 中某 partition 中一部分數據與 RDD x 中的一個 partition 相關,另外一部分數據與 RDD x 中的另外一個 partition 相關。下圖展現了徹底依賴和部分依賴。
前三個是徹底依賴,RDD x 中的 partition 與 parent RDD 中的 partition/partitions 徹底相關。最後一個是部分依賴,RDD x 中的 partition 只與 parent RDD 中的 partition 一部分數據相關,另外一部分數據與 RDD x 中的其餘 partition 相關。
在 Spark 中,徹底依賴被稱爲 NarrowDependency,部分依賴被稱爲 ShuffleDependency。其實 ShuffleDependency 跟 MapReduce 中 shuffle 的數據依賴相同(mapper 將其 output 進行 partition,而後每一個 reducer 會將全部 mapper 輸出中屬於本身的 partition 經過 HTTP fetch 獲得)。
對於 NarrowDependency,具體 RDD x 中的 partitoin i 依賴 parrent RDD 中一個 partition 仍是多個 partitions,是由 NarrowDependency中的 getParents(partition i)
決定(下圖中某些例子會詳細介紹)。還有一種 RangeDependency 的徹底依賴,不過該依賴目前只在 UnionRDD 中使用,下面會介紹。
//map, filter等就是OneToOneDependency
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
//判斷partitionId的合理性,必須在child RDD的合理partition範圍內
if (partitionId >= outStart && partitionId < outStart + length) {
//算出parent RDD中對應的partition id
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
複製代碼
因此,總結下來 partition 之間的依賴關係以下:
對於NarrowDependency (N:1)的依賴關係,repartition和coalesce有用到,但NarrowDependency (N:N)具體體如今哪裏?
之因此要劃分 NarrowDependency 和 ShuffleDependency 是爲了生成物理執行圖,下一章會具體介紹。
須要注意的是第三種 NarrowDependency (N:N) 不多在兩個 RDD 之間出現。由於若是 parent RDD 中的 partition 同時被 child RDD 中多個 partitions 依賴,那麼最後生成的依賴圖每每與 ShuffleDependency 同樣。只是對於 parent RDD 中的 partition 來講一個是徹底依賴,一個是部分依賴,而箭頭數沒有少。因此 Spark 定義的 NarrowDependency 實際上是 「each partition of the parent RDD is used by at most one partition of the child RDD「,也就是隻有 OneToOneDependency (1:1) 和 NarrowDependency (N:1) 兩種狀況。可是,本身設計的奇葩 RDD 確實能夠呈現出 NarrowDependency (N:N) 的狀況。這裏描述的比較亂,其實看懂下面的幾個典型的 RDD 依賴便可。
在spark 2.1.1 中NarrowDependency的定義是:
Base class for dependencies where each partition of the child RDD depends on a small number of partitions of the parent RDD. Narrow dependencies allow for pipelined execution.
只說明每一個child RDD partition依賴於一小組parent RDD的partition,並無寫明parent RDD中的每一個partition最多被child RDD中的一個partition使用,先略過NarrowDependency (N:N) 的狀況吧
如何計算獲得 RDD x 中的數據(records)?下圖展現了 OneToOneDependency 的數據依賴,雖然 partition 和 partition 之間是 1:1,但不表明計算 records 的時候也是讀一個 record 計算一個 record。 下圖右邊上下兩個 pattern 之間的差異相似於下面兩個程序的差異:
code1 of iter.f()
int[] array = {1, 2, 3, 4, 5}
for(int i = 0; i < array.length; i++)
f(array[i])
複製代碼
code2 of f(iter)
int[] array = {1, 2, 3, 4, 5}
f(array)
複製代碼
1) union(otherRDD)
/** Build the union of a list of RDDs passed as variable-length arguments. */
def union[T: ClassTag](first: RDD[T], rest: RDD[T]*): RDD[T] = withScope {
union(Seq(first) ++ rest)
}
/** Build the union of a list of RDDs. */
def union[T: ClassTag](rdds: Seq[RDD[T]]): RDD[T] = withScope {
val partitioners = rdds.flatMap(_.partitioner).toSet //合併同類
//feng:若是要進行合併的兩個RDD都包含有partitioner,同時這兩個RDD引用的是相同的partitioner時
if (rdds.forall(_.partitioner.isDefined) && partitioners.size == 1) {
//能夠當作是OneToOneDependency,假設RDD a,b,各有p各partitions,用同一個partitioner合併後仍是p個partitions
new PartitionerAwareUnionRDD(this, rdds)
} else {
//有一個RDD不包含partitioner,或者兩個RDD的partitioner的算子不相同時
new UnionRDD(this, rdds) //RangeDependency
}
}
複製代碼
union() 將兩個 RDD 簡單合併在一塊兒,不改變 partition 裏面的數據。RangeDependency 實際上也是 1:1,只是爲了訪問 union() 後的 RDD 中的 partition 方便,保留了原始 RDD 的 range 邊界。
從代碼上看,union也能夠是OneToOneDependency的
2) groupByKey(numPartitions)
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {
groupByKey(defaultPartitioner(self)) //默認使用HashPartitioner
}
/** * @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an `OutOfMemoryError`. */
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
// groupByKey shouldn't use map side combine because map side combine does not
// reduce the amount of data shuffled and requires all map side data be inserted
// into a hash table, leading to more objects in the old gen.
// feng:map side combine沒有用,還可能浪費多一倍空間
//CompactBuffer相似ArrayBuffer,但比ArrayBuffer更加高效,只是取出前兩個元素的值單獨存起來,這種優化對短數組有用
val createCombiner = (v: V) => CompactBuffer(v)
val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
bufs.asInstanceOf[RDD[(K, Iterable[V])]]
}
/** * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C's into a single one. */
@Experimental
def combineByKeyWithClassTag[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C,
partitioner: Partitioner,
mapSideCombine: Boolean = true,
serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
if (keyClass.isArray) {
if (mapSideCombine) {
throw new SparkException("Cannot use map-side combining with array keys.")
}
if (partitioner.isInstanceOf[HashPartitioner]) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
}
//閉包處理
val aggregator = new Aggregator[K, V, C](
self.context.clean(createCombiner),
self.context.clean(mergeValue),
self.context.clean(mergeCombiners))
//若是父子分區函數相同
if (self.partitioner == Some(partitioner)) {
//這裏調用的是mapPartitions.生成MapPartitionsRDD
self.mapPartitions(iter => {
val context = TaskContext.get()
new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
}, preservesPartitioning = true)
} else { //partitioner不一樣才shuffle,生成ShuffledRDD
new ShuffledRDD[K, V, C](self, partitioner)
.setSerializer(serializer)
.setAggregator(aggregator)
.setMapSideCombine(mapSideCombine)
}
}
複製代碼
上一章已經介紹了 groupByKey 的數據依賴,這裏算是溫故而知新 吧。
groupByKey() 只須要將 Key 相同的 records 聚合在一塊兒,一個簡單的 shuffle 過程就能夠完成。ShuffledRDD 中的 compute() 只負責將屬於每一個 partition 的數據 fetch 過來,以後使用 mapPartitions() 操做(前面的 OneToOneDependency 展現過)進行 aggregate,生成 MapPartitionsRDD,到這裏 groupByKey() 已經結束。最後爲了統一返回值接口,將 value 中的 ArrayBuffer[] 數據結構抽象化成 Iterable[]。
groupByKey() 沒有在 map 端進行 combine,由於 map 端 combine 只會省掉 partition 裏面重複 key 佔用的空間,當重複 key 特別多時,能夠考慮開啓 combine。
這裏的 ArrayBuffer 實際上應該是 CompactBuffer,An append-only buffer similar to ArrayBuffer, but more memory-efficient for small buffers.
在spark 2.1.1中groupByKey並無能夠開啓mapSideCombine的設置,設置死就是mapSideCombine = false
另外,在哪裏體現出最後的結果是MapPartitionsRDD?
ParallelCollectionRDD 是最基礎的 RDD,直接從 local 數據結構 create 出的 RDD 屬於這個類型,好比
val pairs = sc.parallelize(List(1, 2, 3, 4, 5), 3)
複製代碼
生成的 pairs 就是 ParallelCollectionRDD。
/** SparkContext.scala * @note Parallelize acts lazily. If `seq` is a mutable collection and is altered after the call * to parallelize and before the first action on the RDD, the resultant RDD will reflect the * modified collection. Pass a copy of the argument to avoid this. * 懶執行,seq最好是不可變的,不然若在執行前改變了seq,那麼parallelize的值也會改變 */
def parallelize[T: ClassTag](
seq: Seq[T],
numSlices: Int = defaultParallelism): RDD[T] = withScope {
assertNotStopped()
new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
}
複製代碼
2) reduceByKey(func, numPartitions)
/** * Merge the values for each key using an associative and commutative reduce function. This will * also perform the merging locally on each mapper before sending results to a reducer, similarly * to a "combiner" in MapReduce. */
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
//默認是mapSideCombine: Boolean = true
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
}
複製代碼
reduceByKey() 至關於傳統的 MapReduce,整個數據流也與 Hadoop 中的數據流基本同樣。reduceByKey() 默認在 map 端開啓 combine(),所以在 shuffle 以前先經過 mapPartitions 操做進行 combine,獲得 MapPartitionsRDD,而後 shuffle 獲得 ShuffledRDD,而後再進行 reduce(經過 aggregate + mapPartitions() 操做來實現)獲得 MapPartitionsRDD。
和groupByKey同樣,partitioner相同是能夠生成MapPartitionsRDD,不必定是ShuffledRDD
3) distinct(numPartitions)
def distinct(): RDD[T] = withScope {
distinct(partitions.length)
}
/** * Return a new RDD containing the distinct elements in this RDD. * feng:distinct直接使用了reduceByKey */
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
map(x => (x, null)).reduceByKey((x, y) => x, ).map(_._1)
}
複製代碼
distinct() 功能是 deduplicate RDD 中的全部的重複數據。因爲重複數據可能分散在不一樣的 partition 裏面,所以須要 shuffle 來進行 aggregate 後再去重。然而,shuffle 要求數據類型是 <K, V>
。若是原始數據只有 Key(好比例子中 record 只有一個整數),那麼須要補充成 <K, null>
。這個補充過程由 map() 操做完成,生成 MappedRDD。而後調用上面的 reduceByKey() 來進行 shuffle,在 map 端進行 combine,而後 reduce 進一步去重,生成 MapPartitionsRDD。最後,將 <K, null>
還原成 K,仍然由 map() 完成,生成 MappedRDD。藍色的部分就是調用的 reduceByKey()。
4) cogroup(otherRDD, numPartitions)
def cogroup[W](
other: RDD[(K, W)],
numPartitions: Int): RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
//默認HashPartitioner
cogroup(other, new HashPartitioner(numPartitions))
}
def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
: RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
throw new SparkException("HashPartitioner cannot partition array keys.")
}
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
//MapPartitionsRDD,將cg的value轉化爲Iterable
cg.mapValues { case Array(vs, w1s) =>
(vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}
複製代碼
與 groupByKey() 不一樣,cogroup() 要 aggregate 兩個或兩個以上的 RDD。那麼 CoGroupedRDD 與 RDD a 和 RDD b 的關係都必須是 ShuffleDependency 麼?是否存在 OneToOneDependency?
首先要明確的是 CoGroupedRDD 存在幾個 partition 能夠由用戶直接設定,與 RDD a 和 RDD b 無關。然而,若是 CoGroupedRDD 中 partition 個數與 RDD a/b 中的 partition 個數不同,那麼不可能存在 1:1 的關係。
再次,cogroup() 的計算結果放在 CoGroupedRDD 中哪一個 partition 是由用戶設置的 partitioner 肯定的(默認是 HashPartitioner)。那麼能夠推出:即便 RDD a/b 中的 partition 個數與 CoGroupedRDD 中的同樣,若是 RDD a/b 中的 partitioner 與 CoGroupedRDD 中的不同,也不可能存在 1:1 的關係。好比,在上圖的 example 裏面,RDD a 是 RangePartitioner,b 是 HashPartitioner,CoGroupedRDD 也是 RangePartitioner 且 partition 個數與 a 的相同。那麼很天然地,a 中的每一個 partition 中 records 能夠直接送到 CoGroupedRDD 中對應的 partition。RDD b 中的 records 必須再次進行劃分與 shuffle 後才能進入對應的 partition。
最後,通過上面分析,對於兩個或兩個以上的 RDD 聚合,當且僅當聚合後的 RDD 中 partitioner 類別及 partition 個數與前面的 RDD 都相同,纔會與前面的 RDD 構成 1:1 的關係。不然,只能是 ShuffleDependency.這個算法對應的代碼能夠在CoGroupedRDD.getDependencies()
中找到,雖然比較難理解。
/*CoGroupedRDD.scala*/
override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
//scala中,若是比較的對象是null那麼是判斷引用否相等,不然調用的是equals
//當前rdd的partitioner和聚合後的partitioner對比,若是是hash partitioner,equals方法要對比類型和分區數
if (rdd.partitioner == Some(part)) {
logDebug("Adding one-to-one dependency with " + rdd)
new OneToOneDependency(rdd)
} else {
logDebug("Adding shuffle dependency with " + rdd)
new ShuffleDependency[K, Any, CoGroupCombiner](
rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
}
}
}
//負責給出 RDD 中有多少個 partition,以及每一個 partition 如何序列化
override def getPartitions: Array[Partition] = {
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.length) {
// Each CoGroupPartition will have a dependency per contributing RDD
array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
// Assume each RDD contributed a single dependency, and get it
dependencies(j) match {
case s: ShuffleDependency[_, _, _] =>
None
case _ =>
Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
}
}.toArray)
}
array
}
複製代碼
Spark 代碼中如何表示 CoGroupedRDD 中的 partition 依賴於多個 parent RDDs 中的 partitions?
首先,將 CoGroupedRDD 依賴的全部 RDD 放進數組 rdds[RDD] 中。再次,foreach i,若是 CoGroupedRDD 和 rdds(i) 對應的 RDD 是 OneToOneDependency 關係,那麼 Dependecy[i] = new OneToOneDependency(rdd),不然 = new ShuffleDependency(rdd)。最後,返回與每一個 parent RDD 的依賴關係數組 deps[Dependency]。
Dependency 類中的 getParents(partition id) 負責給出某個 partition 按照該 dependency 所依賴的 parent RDD 中的 partitions: List[Int]。
getPartitions() 負責給出 RDD 中有多少個 partition,以及每一個 partition 如何序列化。
5) intersection(otherRDD)
def intersection(other: RDD[T]): RDD[T] = withScope {
//MapPartitionsRDD => CoGroupedRDD
//兩個RDD分別先map
this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
// => MapPartitionsRDD
.filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
.keys // => MapPartitionsRDD
}
複製代碼
intersection() 功能是抽取出 RDD a 和 RDD b 中的公共數據。先使用 map() 將 RDD[T] 轉變成 RDD[(T, null)],這裏的 T 只要不是 Array 等集合類型便可。接着,進行 a.cogroup(b),藍色部分與前面的 cogroup() 同樣。以後再使用 filter() 過濾掉 [iter(groupA()), iter(groupB())] 中 groupA 或 groupB 爲空的 records,獲得 FilteredRDD。最後,使用 keys() 只保留 key 便可,獲得 MappedRDD。
/** * Return an RDD containing all pairs of elements with matching keys in `this` and `other`. Each * pair of elements will be returned as a (k, (v1, v2)) tuple, where (k, v1) is in `this` and * (k, v2) is in `other`. Uses the given Partitioner to partition the output RDD. */
def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
//flatMapValues,不是flatMap
this.cogroup(other, partitioner).flatMapValues( pair =>
for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
//遍歷邏輯同下
// while(pair._1.iterator.hasNext){
// v <- pair._1.iterator.next
// while(pair._2.iterator.hasNext){
// w <- pair._2.iterator.next
// yield (v, w)
// }
// }
)
}
複製代碼
join() 將兩個 RDD[(K, V)] 按照 SQL 中的 join 方式聚合在一塊兒。與 intersection() 相似,首先進行 cogroup(),獲得<K, (Iterable[V1], Iterable[V2])>
類型的 MappedValuesRDD,而後對 Iterable[V1] 和 Iterable[V2] 作笛卡爾集,並將集合 flat() 化。
這裏給出了兩個 example,第一個 example 的 RDD 1 和 RDD 2 使用 RangePartitioner 劃分,而 CoGroupedRDD 使用 HashPartitioner,與 RDD 1/2 都不同,所以是 ShuffleDependency。第二個 example 中, RDD 1 事先使用 HashPartitioner 對其 key 進行劃分,獲得三個 partition,與 CoGroupedRDD 使用的 HashPartitioner(3) 一致,所以數據依賴是 1:1。若是 RDD 2 事先也使用 HashPartitioner 對其 key 進行劃分,獲得三個 partition,那麼 join() 就不存在 ShuffleDependency 了,這個 join() 也就變成了 hashjoin()。
7) sortByKey(ascending, numPartitions)
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
: RDD[(K, V)] = self.withScope{
val part = new RangePartitioner(numPartitions, self, ascending)
new ShuffledRDD[K, V, V](self, part)
.setKeyOrdering(if (ascending) ordering else ordering.reverse)
}
複製代碼
sortByKey() 將 RDD[(K, V)] 中的 records 按 key 排序,ascending = true 表示升序,false 表示降序。目前 sortByKey() 的數據依賴很簡單,先使用 shuffle 將 records 彙集在一塊兒(放到對應的 partition 裏面),而後將 partition 內的全部 records 按 key 排序,最後獲得的 MapPartitionsRDD 中的 records 就有序了。
目前 sortByKey() 先使用 Array 來保存 partition 中全部的 records,再排序。
8) cartesian(otherRDD)
/*RDD.scala*/
def cartesian[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope {
new CartesianRDD(sc, this, other)
}
複製代碼
Cartesian 對兩個 RDD 作笛卡爾集,生成的 CartesianRDD 中 partition 個數 = partitionNum(RDD a) * partitionNum(RDD b)。
這裏的依賴關係與前面的不太同樣,CartesianRDD 中每一個partition 依賴兩個 parent RDD,並且其中每一個 partition 徹底依賴 RDD a 中一個 partition,同時又徹底依賴 RDD b 中另外一個 partition。這裏沒有紅色箭頭,由於全部依賴都是 NarrowDependency。
CartesianRDD.getDependencies() 返回 rdds[RDD a, RDD b]。CartesianRDD 中的 partiton i 依賴於 (RDD a).List(i / numPartitionsInRDDb) 和 (RDD b).List(i % numPartitionsInRDDb)。
override def getPartitions: Array[Partition] = {
// create the cross product split
//feng:Cartesian 對兩個 RDD 作笛卡爾集
// 生成的 CartesianRDD 中 partition 個數 = partitionNum(RDD a) * partitionNum(RDD b)
val array = new Array[Partition](rdd1.partitions.length * rdd2.partitions.length)
for (s1 <- rdd1.partitions; s2 <- rdd2.partitions) {
val idx = s1.index * numPartitionsInRdd2 + s2.index
array(idx) = new CartesianPartition(idx, rdd1, rdd2, s1.index, s2.index)
}
array
}
override def getDependencies: Seq[Dependency[_]] = List(
new NarrowDependency(rdd1) {
def getParents(id: Int): Seq[Int] = List(id / numPartitionsInRdd2)
},
new NarrowDependency(rdd2) {
def getParents(id: Int): Seq[Int] = List(id % numPartitionsInRdd2)
}
)
複製代碼
9) coalesce(numPartitions, shuffle = false)
/** RDD.scala *當shuffle: Boolean = false,擴增partition數量不會生效 */
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null)
: RDD[T] = withScope {
require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
//在每一個 partition 中,第一個元素的key
var position = (new Random(index)).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
position = position + 1 //key遞增
(position, t)
}
} : Iterator[(Int, T)]
// include a shuffle step so that our upstream tasks are still distributed
new CoalescedRDD(
new ShuffledRDD[Int, T, T](mapPartitionsWithIndex(distributePartition),
new HashPartitioner(numPartitions)),
numPartitions,
partitionCoalescer).values
} else {
//不shuffle就生成CoalescedRDD
new CoalescedRDD(this, numPartitions, partitionCoalescer)
}
}
複製代碼
coalesce() 能夠將 parent RDD 的 partition 個數進行調整,好比從 5 個減小到 3 個,或者從 5 個增長到 10 個。須要注意的是當 shuffle = false 的時候,是不能增長 partition 個數的(不能從 5 個變爲 10 個)。
coalesce() 的核心問題是如何確立 CoalescedRDD 中 partition 和其 parent RDD 中 partition 的關係。
Example: a.coalesce(3, shuffle = false)
展現了 N:1 的 NarrowDependency。var position = (new Random(index)).nextInt(numPartitions);position = position + 1
計算獲得,index 是該 partition 的索引,numPartitions 是 CoalescedRDD 中的 partition 個數。接下來元素的 key 是遞增的,而後 shuffle 後的 ShuffledRDD 能夠獲得均分的 records,而後通過複雜算法來創建 ShuffledRDD 和 CoalescedRDD 之間的數據聯繫,最後過濾掉 key,獲得 coalesce 後的結果 MappedRDD。10) repartition(numPartitions)
等價於 coalesce(numPartitions, shuffle = true)
/* * If you are decreasing the number of partitions in this RDD, consider using `coalesce`, * which can avoid performing a shuffle. * feng:減低partition數量可用coalesce代替,避免shuffle */
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
coalesce(numPartitions, shuffle = true)
}
複製代碼
combineByKey()
分析了這麼多 RDD 的邏輯執行圖,它們之間有沒有共同之處?若是有,是怎麼被設計和實現的?
仔細分析 RDD 的邏輯執行圖會發現,ShuffleDependency 左邊的 RDD 中的 record 要求是 <key, value> 型的,通過 ShuffleDependency 後,包含相同 key 的 records 會被 aggregate 到一塊兒,而後在 aggregated 的 records 上執行不一樣的計算邏輯。實際執行時(後面的章節會具體談到)不少 transformation() 如 groupByKey(),reduceByKey() 是邊 aggregate 數據邊執行計算邏輯的,所以共同之處就是 aggregate 同時 compute()。Spark 使用 combineByKey() 來實現這個 aggregate + compute() 的基礎操做。
combineByKey() 的定義以下:
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners)(null)
}
複製代碼
其中主要有三個參數 createCombiner,mergeValue 和 mergeCombiners。簡單解釋下這三個函數及 combineByKey() 的意義,注意它們的類型:
假設一組具備相同 K 的 <K, V> records 正在一個個流向 combineByKey(),createCombiner 將第一個 record 的 value 初始化爲 c (好比,c = value),而後從第二個 record 開始,來一個 record 就使用 mergeValue(c, record.value) 來更新 c,好比想要對這些 records 的全部 values 作 sum,那麼使用 c = c + record.value。等到 records 所有被 mergeValue(),獲得結果 c。假設還有一組 records(key 與前面那組的 key 均相同)一個個到來,combineByKey() 使用前面的方法不斷計算獲得 c'。如今若是要求這兩組 records 總的 combineByKey() 後的結果,那麼可使用 final c = mergeCombiners(c, c') 來計算。
至此,咱們討論瞭如何生成 job 的邏輯執行圖,這些圖也是 Spark 看似簡單的 API 背後的複雜計算邏輯及數據依賴關係。
整個 job 會產生哪些 RDD 由 transformation() 語義決定。一些 transformation(), 好比 cogroup() 會被不少其餘操做用到。
RDD 自己的依賴關係由 transformation() 生成的每個 RDD 自己語義決定。如 CoGroupedRDD 依賴於全部參加 cogroup() 的 RDDs。
RDD 中 partition 依賴關係分爲 NarrowDependency 和 ShuffleDependency。前者是徹底依賴,後者是部分依賴。NarrowDependency 裏面又包含多種狀況,只有先後兩個 RDD 的 partition 個數以及 partitioner 都同樣,纔會出現 NarrowDependency。
從數據處理邏輯的角度來看,MapReduce 至關於 Spark 中的 map() + reduceByKey(),但嚴格來說 MapReduce 中的 reduce() 要比 reduceByKey() 的功能強大些,詳細差異會在 Shuffle details 一章中繼續討論。
@DeveloperApi
abstract class Dependency[T] extends Serializable {
def rdd: RDD[T]
}
/** * :: DeveloperApi :: * Base class for dependencies where each partition of the child RDD depends on a small number * of partitions of the parent RDD. Narrow dependencies allow for pipelined execution. * * 這裏只說明每一個child RDD partition依賴於一小組parent RDD的partition * 並無寫明parent RDD中的每一個partition最多被child RDD中的一個partition使用,須要NarrowDependency的更準肯定義 * NarrowDependency不須要shuffle */
@DeveloperApi
abstract class NarrowDependency[T](_rdd: RDD[T]) extends Dependency[T] {
/** * Get the parent partitions for a child partition. * @param partitionId a partition of the child RDD * @return the partitions of the parent RDD that the child partition depends upon */
def getParents(partitionId: Int): Seq[Int]
override def rdd: RDD[T] = _rdd
}
/** * :: DeveloperApi :: * Represents a dependency on the output of a shuffle stage. Note that in the case of shuffle, * the RDD is transient since we don't need it on the executor side. * * @param _rdd the parent RDD * @param partitioner partitioner used to partition the shuffle output * @param serializer [[org.apache.spark.serializer.Serializer Serializer]] to use. If not set * explicitly then the default serializer, as specified by `spark.serializer` * config option, will be used. * @param keyOrdering key ordering for RDD's shuffles * @param aggregator map/reduce-side aggregator for RDD's shuffle * @param mapSideCombine whether to perform partial aggregation (also known as map-side combine) */
@DeveloperApi
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag]( @transient private val _rdd: RDD[_ <: Product2[K, V]], val partitioner: Partitioner, val serializer: Serializer = SparkEnv.get.serializer, val keyOrdering: Option[Ordering[K]] = None, val aggregator: Option[Aggregator[K, V, C]] = None, val mapSideCombine: Boolean = false)
extends Dependency[Product2[K, V]] {
override def rdd: RDD[Product2[K, V]] = _rdd.asInstanceOf[RDD[Product2[K, V]]]
private[spark] val keyClassName: String = reflect.classTag[K].runtimeClass.getName
private[spark] val valueClassName: String = reflect.classTag[V].runtimeClass.getName
// Note: It's possible that the combiner class tag is null, if the combineByKey
// methods in PairRDDFunctions are used instead of combineByKeyWithClassTag.
private[spark] val combinerClassName: Option[String] =
Option(reflect.classTag[C]).map(_.runtimeClass.getName)
val shuffleId: Int = _rdd.context.newShuffleId()
val shuffleHandle: ShuffleHandle = _rdd.context.env.shuffleManager.registerShuffle(
shuffleId, _rdd.partitions.length, this)
_rdd.sparkContext.cleaner.foreach(_.registerShuffleForCleanup(this))
}
/** * :: DeveloperApi :: * Represents a one-to-one dependency between partitions of the parent and child RDDs. * feng:parent和child裏面的partitions是一一對應 eg:map, filter * partitionId就是partition在RDD中的序號, 因此若是是一一對應, 那麼parent和child中的partition的序號應該是同樣的 */
@DeveloperApi
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = List(partitionId)
}
/** * :: DeveloperApi :: * Represents a one-to-one dependency between ranges of partitions in the parent and child RDDs. * @param rdd the parent RDD * @param inStart the start of the range in the parent RDD * @param outStart the start of the range in the child RDD * @param length the length of the range * parent RDD中的某個區間的partitions對應到child RDD中的某個區間的partitions * 因爲是range, 因此直接記錄起點和length就能夠了, 沒有必要加入每一箇中間rdd * eg: 目前只在union中使用,多個parent RDD合併到一個child RDD, 故每一個parent RDD都對應到child RDD中的一個區間 */
@DeveloperApi
class RangeDependency[T](rdd: RDD[T], inStart: Int, outStart: Int, length: Int)
extends NarrowDependency[T](rdd) {
override def getParents(partitionId: Int): List[Int] = {
//判斷partitionId的合理性,必須在child RDD的合理partition範圍內
if (partitionId >= outStart && partitionId < outStart + length) {
//算出parent RDD中對應的partition id
List(partitionId - outStart + inStart)
} else {
Nil
}
}
}
複製代碼