spark combineByKey

查看源代碼會發現combineByKey定義以下:app

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C)
    : RDD[(K, C)] = {
    combineByKey(createCombiner, mergeValue, mergeCombiners, defaultPartitioner(self))
  }

例子:ide

spark分組計算平均值函數

object ColumnValueAvg extends App {
  /**
    * ID,Name,ADDRESS,AGE
    * 001,zhangsan,chaoyang,20
    * 002,zhangsa,chaoyang,27
    * 003,zhangjie,chaoyang,35
    * 004,lisi,haidian,24
    * 005,lier,haidian,40
    * 006,wangwu,chaoyang,90
    * 007,wangchao,haidian,80
    */
  val conf = new SparkConf().setAppName("test column value sum and avg").setMaster("local[1]")
  val sc = new SparkContext(conf)

  val textRdd = sc.textFile(args(0))

  //be careful the toInt here is necessary ,if no cast ,then it will be age string append
  val addressAgeMap = textRdd.map(x => (x.split(",")(2), x.split(",")(3).toInt))

  val sumAgeResult = addressAgeMap.reduceByKey(_ + _).collect().foreach(println)

  val avgAgeResult = addressAgeMap.combineByKey(
    (v) => (v, 1),
    (accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1),
    (accu1: (Int, Int), accu2: (Int, Int)) => (accu1._1 + accu2._1, accu1._2 + accu2._2)
  ).mapValues(x => (x._1 / x._2).toDouble).collect().foreach(println)

  println("Sum and Avg calculate successfuly")

  sc.stop()

}

combineByKey函數須要傳遞三個函數作爲參數,分別爲createCombiner、mergeValue、mergeCombiner,須要理解這三個函數的意義性能

結合數據來說的話,combineByKey默認按照key來進行元素的combine,這裏三個參數都是對value的一些操做spa

1>第一個參數createCombiner,如代碼中定義的是 : (v) => (v, 1)code

這裏是建立了一個combiner,做用是當遍歷rdd的分區時,遇到第一次出現的key值,那麼生成一個(v,1)的combiner,好比這裏key爲address,當遇到第一個ip

chaoyang,20 的時候,(v,1)中的v就是age的值20,1是address出現的次數
 
2>第2個參數是mergeValue,顧名思義就是合併value,如代碼中定義的是:(accu: (Int, Int), v) => (accu._1 + v, accu._2 + 1)
這裏的做用是當處理當前分區時,遇到已經出現過的key,那麼合併combiner中的value,注意這裏accu: (Int, Int)對應第一個參數中出現的combiner,即(v,1),注意類型要一致
那麼(accu._1 + v, accu._2 + 1)就很好理解了,accu._1即便須要合併的age的值,而acc._2是須要合併的key值出現的次數,出現一次即加1
 
3>第三個參數是mergeCombiners,用來合併各個分區上的累加器,由於各個分區分別運行了前2個函數後須要最後合併分區結果.
 
ok,運行代碼,結果以下,分別按照address來計算出age的平均值
 
(haidian,48.0)
(chaoyang,43.0)
 
因爲combineByKey抽象程度很高,能夠本身custom一些函數作爲計算因子,所以能夠靈活的完成更多的計算功能.reduceByKey、groupByKey都是基於combineByKey實現的。字符串

combineByKey

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)]

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)]

def combineByKey[C](createCombiner: V => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializerClass: String = null): RDD[(K, C)]

1,第一個參數,createCombiner: V => C。。這個表示當combineByKey第一次遇到值爲k的key時,調用createCombiner函數,將V轉換爲C。 (這一步相似於初始化操做)
2,第二個參數,mergeValue: (C, V) => C。。這個表示當combineByKey不是第一次遇到值爲k的Key時,調用mergeValue函數,將v累加到c中。。(這個操做在每一個分區內進行)
3,第三個參數,mergeCombiners: (C, C) => C。 這個表示將兩個C合併爲一個C類型。 (這個操做在不一樣分區間進行)
4,算子的返回值最後爲RDD[(K,C)]類型。表示根據相同的k,將value值由原來的V類型最後轉換爲C類型。string

val a = sc.parallelize(List("dog","cat","gnu","salmon","rabbit","turkey","wolf","bear","bee"), 3)
val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)
val c = b.zip(a)  //利用拉練操做將兩個rdd合併爲一個值爲pair類型的rdd。
 
val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x, (x:List[String], y:List[String]) => x ::: y)
//在這個combineByKey中,能夠看到首先每次遇到第一個值,就將其變爲一個加入到一個List中去。
//第二個函數指的是在key相同的狀況下,當每次遇到新的value值,就把這個值添加到這個list中去。
//最後是一個merge函數,表示將key相同的兩個list進行合併。
 
d.collect
res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu, rabbit, salmon, bee, bear, wolf)))
val initialScores = Array(("Fred", 88.0), ("Fred", 95.0), ("Fred", 91.0), ("Wilma", 93.0), ("Wilma", 95.0), ("Wilma", 98.0))  
val d1 = sc.parallelize(initialScores)  
type MVType = (Int, Double) //定義一個元組類型(科目計數器,分數)  。type的意思是之後再這個代碼中全部的類型爲(Int, Double)均可以被記爲MVType。
d1.combineByKey(  
  score => (1, score),  
  //score => (1, score),咱們把分數做爲參數,並返回了附加的元組類型。 以"Fred"爲列,當前其分數爲88.0 =>(1,88.0)  1表示當前科目的計數器,此時只有一個科目
  (c1: MVType, newScore) => (c1._1 + 1, c1._2 + newScore),  
  //注意這裏的c1就是createCombiner初始化獲得的(1,88.0)。在一個分區內,咱們又碰到了"Fred"的一個新的分數91.0。固然咱們要把以前的科目分數和當前的分數加起來即//c1._2 + newScore,而後把科目計算器加1即c1._1 + 1
 
  (c1: MVType, c2: MVType) => (c1._1 + c2._1, c1._2 + c2._2)  
  //注意"Fred"多是個學霸,他選修的科目可能過多而分散在不一樣的分區中。全部的分區都進行mergeValue後,接下來就是對分區間進行合併了,分區間科目數和科目數相加分數和分數相加就獲得了總分和總科目數
).map 
{ 
case (name, (num, socre)) 
=> (name, socre / num)
 }.collect

reduceByKey

def reduceByKey(func: (V, V) => V): RDD[(K, V)]

def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]

def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)]

做用於鍵值對類型的數據,根據有相同鍵的數據,進行彙總。傳入一個函數,這個函數做用於有兩個相同的key的鍵值對,而後對value值進行函數操做it

val a = sc.parallelize(List("dog", "cat", "owl", "gnu", "ant"), 2)
val b = a.map(x => (x.length, x)) //生成一個鍵值對類型的數據,鍵爲字符串長度,值爲字符串。
b.reduceByKey(_ + _).collect  //對於有相同的鍵的元祖進行累加,因爲全部的數據的長度都是3,因此最後獲得了以下的結果
res86: Array[(Int, String)] = Array((3,dogcatowlgnuant))
 
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", "eagle"), 2)
val b = a.map(x => (x.length, x)) //一樣的,將數據變爲元祖。
b.reduceByKey(_ + _).collect //長度爲3的數據有dog,cat,長度爲4的數據有lion。長度爲5的有tiger和eagle。長度爲7的有一個panther。

groupByKey

def groupByKey(): RDD[(K, Iterable[V])]  //講一個rdd進行有鍵值,進行group操做,最後返回的value值是一個迭代器,其內容包含全部key值爲K的元祖的value值。
 
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]

def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "spider", "eagle"), 2)
val b = a.keyBy(_.length) //keyBy算子的意思是以_.length這個值做爲key。其中value的返回值爲ArrayBuffer。
b.groupByKey().collect()
 
res11: Array[(Int, Seq[String])] = Array((4,ArrayBuffer(lion)), (6,ArrayBuffer(spider)), (3,ArrayBuffer(dog, cat)), (5,ArrayBuffer(tiger, eagle)))  //

groupByKey數據沒有進行合併,因此性能最低。

相關文章
相關標籤/搜索