關鍵字:Spark算子、Spark RDD鍵值轉換、combineByKey、foldByKey es6
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C): RDD[(K, C)] apache
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, numPartitions: Int): RDD[(K, C)] ide
def combineByKey[C](createCombiner: (V) => C, mergeValue: (C, V) => C, mergeCombiners: (C, C) => C, partitioner: Partitioner, mapSideCombine: Boolean = true, serializer: Serializer = null): RDD[(K, C)] 函數
該函數用於將RDD[K,V]轉換成RDD[K,C],這裏的V類型和C類型能夠相同也能夠不一樣。 大數據
其中的參數: spa
createCombiner:組合器函數,用於將V類型轉換成C類型,輸入參數爲RDD[K,V]中的V,輸出爲C scala
mergeValue:合併值函數,將一個C類型和一個V類型值合併成一個C類型,輸入參數爲(C,V),輸出爲C code
mergeCombiners:合併組合器函數,用於將兩個C類型值合併成一個C類型,輸入參數爲(C,C),輸出爲C htm
numPartitions:結果RDD分區數,默認保持原有的分區數 get
partitioner:分區函數,默認爲HashPartitioner
mapSideCombine:是否須要在Map端進行combine操做,相似於MapReduce中的combine,默認爲true
看下面例子:
- scala> var rdd1 = sc.makeRDD(Array(("A",1),("A",2),("B",1),("B",2),("C",1)))
- rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[64] at makeRDD at :21
- scala> rdd1.combineByKey(
- | (v : Int) => v + "_",
- | (c : String, v : Int) => c + "@" + v,
- | (c1 : String, c2 : String) => c1 + "$" + c2
- | ).collect
- res60: Array[(String, String)] = Array((A,2_$1_), (B,1_$2_), (C,1_))
其中三個映射函數分別爲:
createCombiner: (V) => C
(v : Int) => v + 「_」 //在每個V值後面加上字符_,返回C類型(String)
mergeValue: (C, V) => C
(c : String, v : Int) => c + 「@」 + v //合併C類型和V類型,中間加字符@,返回C(String)
mergeCombiners: (C, C) => C
(c1 : String, c2 : String) => c1 + 「$」 + c2 //合併C類型和C類型,中間加$,返回C(String)
其餘參數爲默認值。
最終,將RDD[String,Int]轉換爲RDD[String,String]。
再看例子:
- rdd1.combineByKey(
- (v : Int) => List(v),
- (c : List[Int], v : Int) => v :: c,
- (c1 : List[Int], c2 : List[Int]) => c1 ::: c2
- ).collect
- res65: Array[(String, List[Int])] = Array((A,List(2, 1)), (B,List(2, 1)), (C,List(1)))
最終將RDD[String,Int]轉換爲RDD[String,List[Int]]。
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, numPartitions: Int)(func: (V, V) => V): RDD[(K, V)]
def foldByKey(zeroValue: V, partitioner: Partitioner)(func: (V, V) => V): RDD[(K, V)]
該函數用於RDD[K,V]根據K將V作摺疊、合併處理,其中的參數zeroValue表示先根據映射函數將zeroValue應用於V,進行初始化V,再將映射函數應用於初始化後的V.
直接看例子:
- scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
- scala> rdd1.foldByKey(0)(_+_).collect
- res75: Array[(String, Int)] = Array((A,2), (B,3), (C,1))
- //將rdd1中每一個key對應的V進行累加,注意zeroValue=0,須要先初始化V,映射函數爲+操
- //做,好比("A",0), ("A",2),先將zeroValue應用於每一個V,獲得:("A",0+0), ("A",2+0),即:
- //("A",0), ("A",2),再將映射函數應用於初始化後的V,最後獲得(A,0+2),即(A,2)
再看:
- scala> rdd1.foldByKey(2)(_+_).collect
- res76: Array[(String, Int)] = Array((A,6), (B,7), (C,3))
- //先將zeroValue=2應用於每一個V,獲得:("A",0+2), ("A",2+2),即:("A",2), ("A",4),再將映射函
- //數應用於初始化後的V,最後獲得:(A,2+4),即:(A,6)
再看乘法操做:
- scala> rdd1.foldByKey(0)(_*_).collect
- res77: Array[(String, Int)] = Array((A,0), (B,0), (C,0))
- //先將zeroValue=0應用於每一個V,注意,此次映射函數爲乘法,獲得:("A",0*0), ("A",2*0),
- //即:("A",0), ("A",0),再將映射函//數應用於初始化後的V,最後獲得:(A,0*0),即:(A,0)
- //其餘K也同樣,最終都獲得了V=0
- scala> rdd1.foldByKey(1)(_*_).collect
- res78: Array[(String, Int)] = Array((A,0), (B,2), (C,1))
- //映射函數爲乘法時,須要將zeroValue設爲1,才能獲得咱們想要的結果。
在使用foldByKey算子時候,要特別注意映射函數及zeroValue的取值。
更多關於Spark算子的介紹,可參考 Spark算子 :
http://lxw1234.com/archives/tag/spark%E7%AE%97%E5%AD%90
轉載請註明:lxw的大數據田地 » Spark算子:RDD鍵值轉換操做(2)–combineByKey、foldByKey