在數據分析中,處理Key,Value的Pair數據是極爲常見的場景,例如咱們能夠針對這樣的數據進行分組、聚合或者將兩個包含Pair數據的RDD根據key進行join。從函數的抽象層面看,這些操做具備共同的特徵,都是將類型爲RDD[(K,V)]的數據處理爲RDD[(K,C)]。這裏的V和C能夠是相同類型,也能夠是不一樣類型。這種數據處理操做並不是單純的對Pair的value進行map,而是針對不一樣的key值對原有的value進行聯合(Combine)。於是,不只類型可能不一樣,元素個數也可能不一樣。html
combineByKey()是最爲經常使用的基於鍵進行聚合的函數。大多數基於鍵聚合的函數都是用它實現的。和aggregate()同樣,combineByKey()可讓用戶返回與輸入數據的類型不一樣的返回值。git
Spark爲此提供了一個高度抽象的操做combineByKey。該方法的定義以下所示:es6
def combineByKey[C]( //在找到給定分區中第一次碰到的key(在RDD元素中)時被調用。此方法爲這個key初始化一個累加器。 createCombiner: V => C, //當累加器已經存在的時候(也就是上面那個key的累加器)調用。 mergeValue: (C, V) => C, // 若是哪一個key跨多個分區,該參數就會被調用。 mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null ): RDD[(K, C)] = { //實現略 }
函數式風格與命令式風格不一樣之處在於它說明了代碼作了什麼(what to do),而不是怎麼作(how to do)。combineByKey函數主要接受了三個函數做爲參數,分別爲createCombiner、mergeValue、mergeCombiners。這三個函數足以說明它究竟作了什麼。理解了這三個函數,就能夠很好地理解combineByKey。shell
因爲combineByKey()會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就和以前的某個元素的鍵相同。apache
讓咱們來計算每一項科目的平均值編程
// 關閉 spark-shell INFO/DEBUG 調試信息 scala> sc.setLogLevel("WARN") scala> val inputrdd = sc.parallelize(Seq( ("maths", 50), ("maths", 60), ("english", 65), ("physics", 66), ("physics", 61), ("physics", 87)), 1) inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:27 scala> inputrdd.getNumPartitions res55: Int = 1 scala> val reduced = inputrdd.combineByKey( (mark) => { println(s"Create combiner -> ${mark}") (mark, 1) }, (acc: (Int, Int), v) => { println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""") (acc._1 + v, acc._2 + 1) }, (acc1: (Int, Int), acc2: (Int, Int)) => { println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""") (acc1._1 + acc2._1, acc1._2 + acc2._2) } ) reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[42] at combineByKey at <console>:29 scala> reduced.collect() Create combiner -> 50 Merge value : (50 + 60, 1 + 1) Create combiner -> 65 Create combiner -> 66 Merge value : (66 + 61, 1 + 1) Merge value : (127 + 87, 2 + 1) res56: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1))) scala> val result = reduced.mapValues(x => x._1 / x._2.toFloat) result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[43] at mapValues at <console>:31 scala> result.collect() res57: Array[(String, Float)] = Array((maths,55.0), (physics,71.333336), (english,65.0))
注意:本例中由於只有一個分區因此 mergeCombiners 並無用到,你也能夠經過下面的代碼從另外角度來驗證:ide
scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1))) rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21 scala> rdd1.getNumPartitions res18: Int = 64 scala> rdd1.combineByKey( (v : Int) => v + "_", (c : String, v : Int) => c + "@" + v, (c1 : String, c2 : String) => c1 + "$" + c2 ).collect res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))
在此例中,由於分區多而記錄少,能夠看作每條記錄都跨分區了,因此沒有機會用到 mergeValue,最後直接 mergeCombiners 獲得結果 。函數式編程
除了能夠進行group、average以外,根據傳入的函數實現不一樣,咱們還能夠利用combineByKey完成諸如aggregate、fold等操做。這是一個高度的抽象,但從聲明的角度來看,卻又不須要了解過多的實現細節。這正是函數式編程的魅力。函數
[1] Spark函數講解:combineByKeyes5
http://bihell.com/2017/03/14/Combiner-in-Pair-RDDs-combineByKey/
[2] combineByKey操做
https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/combinebykey.html
[3] Spark算子執行流程詳解之五
http://blog.csdn.net/wl044090432/article/details/59483319
[4] Spark算子:RDD鍵值轉換操做(2)–combineByKey、foldByKey