先來看一下在PairRDDFunctions.scala文件中reduceByKey和groupByKey的源碼網絡
/** * Merge the values for each key using an associative reduce function. This will also perform * the merging locally on each mapper before sending results to a reducer, similarly to a * "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/ * parallelism level. */ def reduceByKey(func: (V, V) => V): RDD[(K, V)] = { reduceByKey(defaultPartitioner(self), func) } /** * Group the values for each key in the RDD into a single sequence. Allows controlling the * partitioning of the resulting key-value pair RDD by passing a Partitioner. * The ordering of elements within each group is not guaranteed, and may even differ * each time the resulting RDD is evaluated. * * Note: This operation may be very expensive. If you are grouping in order to perform an * aggregation (such as a sum or average) over each key, using [[PairRDDFunctions.aggregateByKey]] * or [[PairRDDFunctions.reduceByKey]] will provide much better performance. * * Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any * key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]]. */ def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = { // groupByKey shouldn't use map side combine because map side combine does not // reduce the amount of data shuffled and requires all map side data be inserted // into a hash table, leading to more objects in the old gen. val createCombiner = (v: V) => CompactBuffer(v) val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2 val bufs = combineByKey[CompactBuffer[V]]( createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine=false) bufs.asInstanceOf[RDD[(K, Iterable[V])]] }
reduceByKey:reduceByKey會在結果發送至reducer以前會對每一個mapper在本地進行merge,有點相似於在MapReduce中的combiner。這樣作的好處在於,在map端進行一次reduce以後,數據量會大幅度減少,從而減少傳輸,保證reduce端可以更快的進行結果計算。app
groupByKey:groupByKey會對每個RDD中的value值進行聚合造成一個序列(Iterator),此操做發生在reduce端,因此勢必會將全部的數據經過網絡進行傳輸,形成沒必要要的浪費。同時若是數據量十分大,可能還會形成OutOfMemoryError。ide
經過以上對比能夠發如今進行大量數據的reduce操做時候建議使用reduceByKey。不只能夠提升速度,仍是能夠防止使用groupByKey形成的內存溢出問題。ui