Spark中有兩個相似的api,分別是reduceByKey和groupByKey。這兩個的功能相似,但底層實現卻有些不一樣,那麼爲何要這樣設計呢?咱們來從源碼的角度分析一下。api
先看二者的調用順序(都是使用默認的Partitioner,即defaultPartitioner)ide
所用spark版本:spark2.1.0性能
Step1ui
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope { reduceByKey(defaultPartitioner(self), func) }
Setp2spa
def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope { combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner) }
Setp3設計
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("HashPartitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
姑且不去看方法裏面的細節,咱們會只要知道最後調用的是combineByKeyWithClassTag這個方法。這個方法有兩個參數咱們來重點看一下,3d
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)
首先是partitioner參數,這個便是RDD的分區設置。除了默認的defaultPartitioner,Spark還提供了RangePartitioner和HashPartitioner外,此外用戶也能夠自定義partitioner。經過源碼能夠發現若是是HashPartitioner的話,那麼是會拋出一個錯誤的。code
而後是mapSideCombine參數,這個參數正是reduceByKey和groupByKey最大不一樣的地方,它決定是是否會先在節點上進行一次Combine操做,下面會有更具體的例子來介紹。blog
Step1ci
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope { groupByKey(defaultPartitioner(self)) }
Step2
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
Setp3
def combineByKeyWithClassTag[C]( createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope { require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0 if (keyClass.isArray) { if (mapSideCombine) { throw new SparkException("Cannot use map-side combining with array keys.") } if (partitioner.isInstanceOf[HashPartitioner]) { throw new SparkException("HashPartitioner cannot partition array keys.") } } val aggregator = new Aggregator[K, V, C]( self.context.clean(createCombiner), self.context.clean(mergeValue), self.context.clean(mergeCombiners)) if (self.partitioner == Some(partitioner)) { self.mapPartitions(iter => { val context = TaskContext.get() new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context)) }, preservesPartitioning = true) } else { new ShuffledRDD[K, V, C](self, partitioner) .setSerializer(serializer) .setAggregator(aggregator) .setMapSideCombine(mapSideCombine) } }
結合上面reduceByKey的調用鏈,能夠發現最終其實都是調用combineByKeyWithClassTag這個方法的,但調用的參數不一樣。
reduceByKey的調用
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
groupByKey的調用
combineByKeyWithClassTag[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
正是二者不一樣的調用方式致使了兩個方法的差異,咱們分別來看
reduceByKey的泛型參數直接是[V],而groupByKey的泛型參數是[CompactBuffer[V]]。這直接致使了reduceByKey和groupByKey的返回值不一樣,前者是RDD[(K, V)],然後者是RDD[(K, Iterable[V])]
而後就是mapSideCombine=false了,這個mapSideCombine參數的默認是true的。這個值有什麼用呢,上面也說了,這個參數的做用是控制要不要在map端進行初步合併(Combine)。能夠看看下面具體的例子。
從功能上來講,能夠發現ReduceByKey其實就是會在每一個節點先進行一次合併的操做,而groupByKey沒有。
這麼來看ReduceByKey的性能會比groupByKey好不少,由於有些工做在節點已經處理了。那麼groupByKey爲何存在,它的應用場景是什麼呢?我也不清楚,若是觀看這篇文章的讀者知道的話不妨在評論裏說出來吧。很是感謝!