aggregate
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): Uapache
aggregate用戶聚合RDD中的元素,先使用seqOp將RDD中每一個分區中的T類型元素聚合成U類型,再使用combOp將以前每一個分區聚合後的U類型聚合成U類型,特別注意seqOp和combOp都會使用zeroValue的值,zeroValue的類型爲U。app
- var rdd1 = sc.makeRDD(1 to 10,2)
- rdd1.mapPartitionsWithIndex{
- (partIdx,iter) => {
- var part_map = scala.collection.mutable.Map[String,List[Int]]()
- while(iter.hasNext){
- var part_name = "part_" + partIdx;
- var elem = iter.next()
- if(part_map.contains(part_name)) {
- var elems = part_map(part_name)
- elems ::= elem
- part_map(part_name) = elems
- } else {
- part_map(part_name) = List[Int]{elem}
- }
- }
- part_map.iterator
-
- }
- }.collect
- res16: Array[(String, List[Int])] = Array((part_0,List(5, 4, 3, 2, 1)), (part_1,List(10, 9, 8, 7, 6)))
-
##第一個分區中包含5,4,3,2,1函數
##第二個分區中包含10,9,8,7,6spa
- scala> rdd1.aggregate(1)(
- | {(x : Int,y : Int) => x + y},
- | {(a : Int,b : Int) => a + b}
- | )
- res17: Int = 58
-
結果爲何是58,看下面的計算過程:scala
##先在每一個分區中迭代執行 (x : Int,y : Int) => x + y 而且使用zeroValue的值1ci
##即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16it
## part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41spark
##再將兩個分區的結果合併(a : Int,b : Int) => a + b ,而且使用zeroValue的值1io
##即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58table
再好比:
- scala> rdd1.aggregate(2)(
- | {(x : Int,y : Int) => x + y},
- | {(a : Int,b : Int) => a * b}
- | )
- res18: Int = 1428
-
##此次zeroValue=2
##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
##最後:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
所以,zeroValue即肯定了U的類型,也會對結果產生相當重要的影響,使用時候要特別注意。
fold
def fold(zeroValue: T)(op: (T, T) ⇒ T): T
fold是aggregate的簡化,將aggregate中的seqOp和combOp使用同一個函數op。
- scala> rdd1.fold(1)(
- | (x,y) => x + y
- | )
- res19: Int = 58
-
- ##結果同上面使用aggregate的第一個例子同樣,即:
- scala> rdd1.aggregate(1)(
- | {(x,y) => x + y},
- | {(a,b) => a + b}
- | )
- res20: Int = 58
-
lookup
def lookup(key: K): Seq[V]
lookup用於(K,V)類型的RDD,指定K值,返回RDD中該K對應的全部V值。
- scala> var rdd1 = sc.makeRDD(Array(("A",0),("A",2),("B",1),("B",2),("C",1)))
- rdd1: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at makeRDD at :21
-
- scala> rdd1.lookup("A")
- res0: Seq[Int] = WrappedArray(0, 2)
-
- scala> rdd1.lookup("B")
- res1: Seq[Int] = WrappedArray(1, 2)