1.定義
【aggregate】
/**
* Aggregate the elements of each partition, and then the results for all the partitions, using
* given combine functions and a neutral "zero value". This function can return a different result
* type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
* and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
* allowed to modify and return their first argument instead of creating a new U to avoid memory
* allocation.
*/
即:
aggregate須要三個參數(初始值zeroValue,函數seqOp和函數combOp),返回值類型U同初始值zeroValue同樣。
處理過程:
1.在rdd的每一個分區上應用seqOp函數(應用初始值zeroValue)並返回分區的結果值(U類型)。
2.分區的結果值返回到driver端作reduce處理,也就是說在分區的結果集上應用函數combOp(應用初始值zeroValue),
並返回最終結果值(U類型)。
函數頭:
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
【treeAggregate】
/**
* Aggregates the elements of this RDD in a multi-level tree pattern.
* @param depth suggested depth of the tree (default: 2)
* @see [[org.apache.spark.rdd.RDD#aggregate]]
*/
即:treeAggregate和aggregate能夠同樣用,只是多了一個參數depth,但此參數默認爲2,能夠不指定。
treeAggregate和aggregate的參數,返回值及用法徹底同樣。只是處理過程及最終的結果集處理有些微不一樣,下面詳細說明。
函數頭:
def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U,combOp: (U, U) => U,depth: Int = 2): U
2.用法示例
【aggregate】
scala> def seq(a:Int,b:Int):Int={
| println("seq:"+a+":"+b)
| math.min(a,b)}
seq: (a: Int, b: Int)Int
scala> def comb(a:Int,b:Int):Int={
| println("comb:"+a+":"+b)
| a+b}
comb: (a: Int, b: Int)Int
val z =sc.parallelize(List(1,2,4,5,8,9),3)
scala> z.aggregate(3)(seq,comb)
seq:3:4
seq:3:1
seq:1:2
seq:3:8
seq:3:5
seq:3:9
comb:3:1
comb:4:3
comb:7:3
res0: Int = 10
【treeAggregate】
scala> def seq(a:Int,b:Int):Int={
| println("seq:"+a+":"+b)
| math.min(a,b)}
seq: (a: Int, b: Int)Int
scala> def comb(a:Int,b:Int):Int={
| println("comb:"+a+":"+b)
| a+b}
comb: (a: Int, b: Int)Int
val z =sc.parallelize(List(1,2,4,5,8,9),3)
scala> z.treeAggregate(3)(seq,comb)
seq:3:4 //3 分區1
seq:3:1 //1 分區1
seq:1:2 //1 分區1
seq:3:8 //3 分區2
seq:3:5 //3 分區2
seq:3:9 //3 分區3
comb:1:3
comb:4:3
res1: Int = 7
由上可見,形式上兩種用法一致,只是aggregate 比 treeAggregate在最後結果的reduce操做時,多使用了一次初始值。
3.區別
查看aggregate的代碼和treeAggregate的代碼實現會發現,確實如上現象所反映,整理結果以下:
(1)最終結果上,aggregate會比treeAggregate多作一次對於初始值的combOp操做。但從參數名字上就能夠看到,
通常要傳入相似0或者空的集合的zeroValue初始值。
(2)aggregate會把分區的結果直接拿到driver端作reduce操做。treeAggregate會先把分區結果作reduceByKey,
最後再把結果拿到driver端作reduce,算出最終結果。reduceByKey須要幾層,由參數depth決定,也就是至關於
作了depth層的reduceByKey,這也是treeAggregate名字的由來。
4.源碼解釋
源碼邏輯如上分析,較簡單,不贅述了。
借鑑圖一張(http://blog.csdn.net/lookqlp/article/details/52121057)
5.優缺點
(1) aggregate在combine上的操做,複雜度爲O(n). treeAggregate的時間複雜度爲O(lg n)。n爲分區數。
(2) aggregate把數據所有拿到driver端,存在內存溢出的風險。treeAggregate則不會。 所以,筆者以爲就用treeAggregate好了,若有不對之處,敬請留言指正。