Spark算子:RDD鍵值轉換操做(2)–combineByKey、foldByKey

Spark算子:RDD鍵值轉換操做(2)–combineByKey、foldByKey

 Spark  lxw1234@qq.com   5個月前 (07-06)   1241℃   0評論

關鍵字:Spark算子、Spark RDD鍵值轉換、combineByKey、foldByKey es6

combineByKey

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] apache

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] ide

def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] 函數

 

該函數用於將RDD[K,V]轉換成RDD[K,C],這裏的V類型和C類型能夠相同也能夠不一樣。 大數據

其中的參數: spa

createCombiner:組合器函數,用於將V類型轉換成C類型,輸入參數爲RDD[K,V]中的V,輸出爲C scala

mergeValue:合併值函數,將一個C類型和一個V類型值合併成一個C類型,輸入參數爲(C,V),輸出爲C code

mergeCombiners:合併組合器函數,用於將兩個C類型值合併成一個C類型,輸入參數爲(C,C),輸出爲C htm

numPartitions:結果RDD分區數,默認保持原有的分區數 get

partitioner:分區函數,默認爲HashPartitioner

mapSideCombine:是否須要在Map端進行combine操做,相似於MapReduce中的combine,默認爲true

看下面例子:

 
  1. scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
  2. rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21
  3.  
  4. scala> rdd1.combineByKey(
  5.     |       (v : Int) => v + "_",  
  6.     |       (c : String, v : Int) => c + "@" + v,  
  7.     |       (c1 : String, c2 : String) => c1 + "$" + c2
  8.     |     ).collect
  9. res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))

其中三個映射函數分別爲:
createCombiner: (V) => C
(v : Int) => v + 「_」 //在每個V值後面加上字符_,返回C類型(String)
mergeValue: (C, V) => C
(c : String, v : Int) => c + 「@」 + v //合併C類型和V類型,中間加字符@,返回C(String)
mergeCombiners: (C, C) => C
(c1 : String, c2 : String) => c1 + 「$」 + c2 //合併C類型和C類型,中間加$,返回C(String)
其餘參數爲默認值。

最終,將RDD[String,Int]轉換爲RDD[String,String]。

再看例子:

 
  1. rdd1.combineByKey(
  2.      (v : Int) => List(v),
  3.      (c : List[Int], v : Int) => v :: c,
  4.      (c1 : List[Int], c2 : List[Int]) => c1 ::: c2
  5. ).collect
  6. res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))

最終將RDD[String,Int]轉換爲RDD[String,List[Int]]。

foldByKey

def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]

def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]

 

該函數用於RDD[K,V]根據K將V作摺疊、合併處理,其中的參數zeroValue表示先根據映射函數將zeroValue應用於V,進行初始化V,再將映射函數應用於初始化後的V.

直接看例子:

 
  1. scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
  2. scala> rdd1.foldByKey(0)(_+_).collect
  3. res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1))
  4. //將rdd1中每一個key對應的V進行累加,注意zeroValue=0,須要先初始化V,映射函數爲+操
  5. //做,好比("A",0), ("A",2),先將zeroValue應用於每一個V,獲得:("A",0+0), ("A",2+0),即:
  6. //("A",0), ("A",2),再將映射函數應用於初始化後的V,最後獲得(A,0+2),即(A,2)
  7.  

再看:

 
  1. scala> rdd1.foldByKey(2)(_+_).collect
  2. res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
  3. //先將zeroValue=2應用於每一個V,獲得:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再將映射函
  4. //數應用於初始化後的V,最後獲得:(A,2+4),即:(A,6)
  5.  

再看乘法操做:

 
  1. scala> rdd1.foldByKey(0)(_*_).collect
  2. res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
  3. //先將zeroValue=0應用於每一個V,注意,此次映射函數爲乘法,獲得:("A",0*0), ("A",2*0),
  4. //即:("A",0), ("A",0),再將映射函//數應用於初始化後的V,最後獲得:(A,0*0),即:(A,0)
  5. //其餘K也同樣,最終都獲得了V=0
  6.  
  7. scala> rdd1.foldByKey(1)(_*_).collect
  8. res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
  9. //映射函數爲乘法時,須要將zeroValue設爲1,才能獲得咱們想要的結果。
  10.  
  11.  

在使用foldByKey算子時候,要特別注意映射函數及zeroValue的取值。

 

更多關於Spark算子的介紹,可參考 Spark算子 :

http://lxw1234.com/archives/tag/spark%E7%AE%97%E5%AD%90

 

 

轉載請註明:lxw的大數據田地 » Spark算子:RDD鍵值轉換操做(2)–combineByKey、foldByKey

相關文章
相關標籤/搜索