和通常RDD最大的不一樣就是有兩個泛型參數, [K, V]表示pair的概念
關鍵的function是, combineByKey, 全部pair相關操做的抽象apache
combine是這樣的操做, Turns an RDD[(K, V)] into a result of type RDD[(K, C)]
其中C有可能只是簡單類型, 但常常是seq, 好比(Int, Int) to (Int, Seq[Int])app
下面來看看combineByKey的參數,
首先須要用戶自定義一些操做,
createCombiner: V => C, C不存在的狀況下, 好比經過V建立seq C
mergeValue: (C, V) => C, 當C已經存在的狀況下, 須要merge, 好比把item V加到seq C中, 或者疊加
mergeCombiners: (C, C) => C, 合併兩個C
partitioner: Partitioner, Shuffle時須要的Partitioner
mapSideCombine: Boolean = true, 爲了減少傳輸量, 不少combine能夠在map端先作, 好比疊加, 能夠先在一個partition中把全部相同的key的value疊加, 再shuffle
serializerClass: String = null, 傳輸須要序列化, 用戶能夠自定義序列化類ide
/** * Extra functions available on RDDs of (key, value) pairs through an implicit conversion. * Import `org.apache.spark.SparkContext._` at the top of your program to use these functions. */ class PairRDDFunctions[K: ClassManifest, V: ClassManifest](self: RDD[(K, V)]) extends Logging with SparkHadoopMapReduceUtil with Serializable { /** * Generic function to combine the elements for each key using a custom set of aggregation * functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C * Note that V and C can be different -- for example, one might group an RDD of type * (Int, Int) into an RDD of type (Int, Seq[Int]). Users provide three functions: * * - `createCombiner`, which turns a V into a C (e.g., creates a one-element list) * - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list) * - `mergeCombiners`, to combine two C's into a single one. * * In addition, users can control the partitioning of the output RDD, and whether to perform * map-side aggregation (if a mapper can produce multiple items with the same key). */ def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)] = {
val aggregator = new Aggregator[K, V, C](createCombiner, mergeValue, mergeCombiners) //1.Aggregator
//RDD自己的partitioner和傳入的partitioner相等時, 即不須要從新shuffle, 直接map便可 if (self.partitioner == Some(partitioner)) { self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //2. mapPartitions, map端直接調用combineValuesByKey } else if (mapSideCombine) { //若是須要mapSideCombine val combined = self.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) //先在partition內部作mapSideCombine val partitioned = new ShuffledRDD[K, C, (K, C)](combined, partitioner).setSerializer(serializerClass) //3. ShuffledRDD, 進行shuffle partitioned.mapPartitions(aggregator.combineCombinersByKey, preservesPartitioning = true) //Shuffle完後, 在reduce端再作一次combine, 使用combineCombinersByKey } else { // Don't apply map-side combiner.和上面的區別就是不作mapSideCombine // A sanity check to make sure mergeCombiners is not defined. assert(mergeCombiners == null) val values = new ShuffledRDD[K, V, (K, V)](self, partitioner).setSerializer(serializerClass) values.mapPartitions(aggregator.combineValuesByKey, preservesPartitioning = true) } } }
在combineByKey中, 首先建立Aggregator, 其實在Aggregator中封裝了兩個函數,
combineValuesByKey, 用於處理將V加入到C的case, 好比加入一個item到一個seq裏面, 用於map端
combineCombinersByKey, 用於處理兩個C合併, 好比兩個seq合併, 用於reduce端函數
case class Aggregator[K, V, C] ( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C) { def combineValuesByKey(iter: Iterator[_ <: Product2[K, V]]) : Iterator[(K, C)] = { val combiners = new JHashMap[K, C] for (kv <- iter) { val oldC = combiners.get(kv._1) if (oldC == null) { combiners.put(kv._1, createCombiner(kv._2)) } else { combiners.put(kv._1, mergeValue(oldC, kv._2)) } } combiners.iterator } def combineCombinersByKey(iter: Iterator[(K, C)]) : Iterator[(K, C)] = { val combiners = new JHashMap[K, C] iter.foreach { case(k, c) => val oldC = combiners.get(k) if (oldC == null) { combiners.put(k, c) } else { combiners.put(k, mergeCombiners(oldC, c)) } } combiners.iterator } }
mapPartitions其實就是使用MapPartitionsRDD
作的事情就是對當前partition執行map函數f, Iterator[T] => Iterator[U]
好比, 執行combineValuesByKey: Iterator[_ <: Product2[K, V]] to Iterator[(K, C)] oop
/** * Return a new RDD by applying a function to each partition of this RDD. */ def mapPartitions[U: ClassManifest](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U] = new MapPartitionsRDD(this, sc.clean(f), preservesPartitioning)
private[spark] class MapPartitionsRDD[U: ClassManifest, T: ClassManifest]( prev: RDD[T], f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false) extends RDD[U](prev) { override val partitioner = if (preservesPartitioning) firstParent[T].partitioner else None override def getPartitions: Array[Partition] = firstParent[T].partitions override def compute(split: Partition, context: TaskContext) = f(firstParent[T].iterator(split, context)) // 對於map,就是調用f
Shuffle其實是由系統的shuffleFetcher完成的, Spark的抽象封裝很是的好
因此在這裏看不到Shuffle具體是怎麼樣作的, 這個須要分析到shuffleFetcher時候才能看到
由於每一個shuffle是有一個全局的shuffleid的
因此在compute裏面, 你只是看到用BlockStoreShuffleFetcher根據shuffleid和partitionid直接fetch到shuffle事後的數據fetch
/** * The resulting RDD from a shuffle (e.g. repartitioning of data). * @param prev the parent RDD. * @param part the partitioner used to partition the RDD * @tparam K the key class. * @tparam V the value class. */ class ShuffledRDD[K, V, P <: Product2[K, V] : ClassManifest]( @transient var prev: RDD[P], part: Partitioner) extends RDD[P](prev.context, Nil) {
override val partitioner = Some(part)
//ShuffleRDD會進行repartition, 因此從Partitioner中取出新的part數目
//並用Array.tabulate動態建立相應個數的ShuffledRDDPartition override def getPartitions: Array[Partition] = { Array.tabulate[Partition](part.numPartitions)(i => new ShuffledRDDPartition(i)) } override def compute(split: Partition, context: TaskContext): Iterator[P] = { val shuffledId = dependencies.head.asInstanceOf[ShuffleDependency[K, V]].shuffleId SparkEnv.get.shuffleFetcher.fetch[P](shuffledId, split.index, context.taskMetrics, SparkEnv.get.serializerManager.get(serializerClass)) } }
ShuffledRDDPartition沒啥區別, 同樣只是記錄idui
private[spark] class ShuffledRDDPartition(val idx: Int) extends Partition { override val index = idx override def hashCode(): Int = idx }
下面再來看一下, 若是使用combineByKey來實現其餘的操做的,this
group是比較典型的例子, (Int, Int) to (Int, Seq[Int])
因爲groupByKey不使用map side combine, 由於這樣也沒法減小傳輸空間, 因此不須要實現mergeCombinersspa
/** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. */ def groupByKey(partitioner: Partitioner): RDD[(K, Seq[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. def createCombiner(v: V) = ArrayBuffer(v) //建立seq def mergeValue(buf: ArrayBuffer[V], v: V) = buf += v //添加item到seq val bufs = combineByKey[ArrayBuffer[V]]( createCombiner _, mergeValue _, null, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Seq[V])]] }
reduce是更簡單的一種狀況, 只是兩個值合併成一個值, (Int, Int V) to (Int, Int C), 好比疊加
因此createCombiner很簡單, 就是直接返回v
而mergeValue和mergeCombiners邏輯是相同的, 沒有區別orm
/**
* Merge the values for each key using an associative reduce function. This will also perform
* the merging locally on each mapper before sending results to a reducer, similarly to a
* "combiner" in MapReduce.
*/
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = {
combineByKey[V]((v: V) => v, func, func, partitioner)
}