Spark函數講解: combineByKey

一、背景

在數據分析中,處理Key,Value的Pair數據是極爲常見的場景,例如咱們能夠針對這樣的數據進行分組、聚合或者將兩個包含Pair數據的RDD根據key進行join。從函數的抽象層面看,這些操做具備共同的特徵,都是將類型爲RDD[(K,V)]的數據處理爲RDD[(K,C)]。這裏的V和C能夠是相同類型,也能夠是不一樣類型。這種數據處理操做並不是單純的對Pair的value進行map,而是針對不一樣的key值對原有的value進行聯合(Combine)。於是,不只類型可能不一樣,元素個數也可能不一樣。html

combineByKey()是最爲經常使用的基於鍵進行聚合的函數。大多數基於鍵聚合的函數都是用它實現的。和aggregate()同樣,combineByKey()可讓用戶返回與輸入數據的類型不一樣的返回值。git

Spark爲此提供了一個高度抽象的操做combineByKey。該方法的定義以下所示:es6

def combineByKey[C](
      //在找到給定分區中第一次碰到的key(在RDD元素中)時被調用。此方法爲這個key初始化一個累加器。
      createCombiner: V => C,
      //當累加器已經存在的時候(也就是上面那個key的累加器)調用。
      mergeValue: (C, V) => C,
      // 若是哪一個key跨多個分區,該參數就會被調用。
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null
): RDD[(K, C)] = { //實現略 }

函數式風格與命令式風格不一樣之處在於它說明了代碼作了什麼(what to do),而不是怎麼作(how to do)。combineByKey函數主要接受了三個函數做爲參數,分別爲createCombiner、mergeValue、mergeCombiners。這三個函數足以說明它究竟作了什麼。理解了這三個函數,就能夠很好地理解combineByKey。shell

二、原理

因爲combineByKey()會遍歷分區中的全部元素,所以每一個元素的鍵要麼尚未遇到過,要麼就和以前的某個元素的鍵相同。apache

  • 若是這是一個新的元素,combineByKey()會使用一個叫做createCombiner()的函數來建立那個鍵對應的累加器的初始值。須要注意的是,這一過程會在每一個分區中第一次出現各個鍵時發生,而不是在整個RDD中第一次出現一個鍵時發生。
  • 若是這是一個在處理當前分區以前已經遇到的鍵,它會使用mergeValue()方法將該鍵的累加器對應的當前值與這個新的值進行合併。
  • 因爲每一個分區都是獨立處理的,所以對於同一個鍵能夠有多個累加器。若是有兩個或者更多的分區都有對應同一個鍵的累加器,就須要使用用戶提供的mergeCombiners()方法將各個分區的結果進行合併。

三、示例:

讓咱們來計算每一項科目的平均值編程

// 關閉 spark-shell INFO/DEBUG 調試信息
scala> sc.setLogLevel("WARN")

scala> val inputrdd = sc.parallelize(Seq(
                        ("maths", 50), ("maths", 60),
                        ("english", 65),
                        ("physics", 66), ("physics", 61), ("physics", 87)), 
                        1)
inputrdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[41] at parallelize at <console>:27

scala> inputrdd.getNumPartitions                      
res55: Int = 1

scala> val reduced = inputrdd.combineByKey(
         (mark) => {
           println(s"Create combiner -> ${mark}")
           (mark, 1)
         },
         (acc: (Int, Int), v) => {
           println(s"""Merge value : (${acc._1} + ${v}, ${acc._2} + 1)""")
           (acc._1 + v, acc._2 + 1)
         },
         (acc1: (Int, Int), acc2: (Int, Int)) => {
           println(s"""Merge Combiner : (${acc1._1} + ${acc2._1}, ${acc1._2} + ${acc2._2})""")
           (acc1._1 + acc2._1, acc1._2 + acc2._2)
         }
     )
reduced: org.apache.spark.rdd.RDD[(String, (Int, Int))] = ShuffledRDD[42] at combineByKey at <console>:29

scala> reduced.collect()
Create combiner -> 50
Merge value : (50 + 60, 1 + 1)
Create combiner -> 65
Create combiner -> 66
Merge value : (66 + 61, 1 + 1)
Merge value : (127 + 87, 2 + 1)
res56: Array[(String, (Int, Int))] = Array((maths,(110,2)), (physics,(214,3)), (english,(65,1)))

scala> val result = reduced.mapValues(x => x._1 / x._2.toFloat)
result: org.apache.spark.rdd.RDD[(String, Float)] = MapPartitionsRDD[43] at mapValues at <console>:31

scala> result.collect()
res57: Array[(String, Float)] = Array((maths,55.0), (physics,71.333336), (english,65.0))

注意:本例中由於只有一個分區因此 mergeCombiners 並無用到,你也能夠經過下面的代碼從另外角度來驗證:ide

scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21

scala> rdd1.getNumPartitions    
res18: Int = 64 

scala> rdd1.combineByKey(
            (v : Int) => v + "_",   
            (c : String, v : Int) => c + "@" + v,  
            (c1 : String, c2 : String) => c1 + "$" + c2
            ).collect
res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

在此例中,由於分區多而記錄少,能夠看作每條記錄都跨分區了,因此沒有機會用到 mergeValue,最後直接 mergeCombiners  獲得結果 函數式編程

除了能夠進行group、average以外,根據傳入的函數實現不一樣,咱們還能夠利用combineByKey完成諸如aggregate、fold等操做。這是一個高度的抽象,但從聲明的角度來看,卻又不須要了解過多的實現細節。這正是函數式編程的魅力。函數

Refer:

[1] Spark函數講解:combineByKeyes5

http://bihell.com/2017/03/14/Combiner-in-Pair-RDDs-combineByKey/

[2] combineByKey操做

https://zhangyi.gitbooks.io/spark-in-action/content/chapter2/combinebykey.html

[3] Spark算子執行流程詳解之五

http://blog.csdn.net/wl044090432/article/details/59483319

[4] Spark算子:RDD鍵值轉換操做(2)–combineByKey、foldByKey

http://lxw1234.com/archives/2015/07/358.htm

相關文章
相關標籤/搜索