aggregate 和 treeAggregate 的對比

 1.定義

   【aggregate】
      /**
      * Aggregate the elements of each partition, and then the results for all the partitions, using
      * given combine functions and a neutral "zero value". This function can return a different result
      * type, U, than the type of this RDD, T. Thus, we need one operation for merging a T into an U
      * and one operation for merging two U's, as in scala.TraversableOnce. Both of these functions are
      * allowed to modify and return their first argument instead of creating a new U to avoid memory
      * allocation.
      */
      即:
      aggregate須要三個參數(初始值zeroValue,函數seqOp和函數combOp),返回值類型U同初始值zeroValue同樣。
      處理過程:
          1.在rdd的每一個分區上應用seqOp函數(應用初始值zeroValue)並返回分區的結果值(U類型)。
          2.分區的結果值返回到driver端作reduce處理,也就是說在分區的結果集上應用函數combOp(應用初始值zeroValue),
            並返回最終結果值(U類型)。
      函數頭:
         def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U

  【treeAggregate】
     /**
      * Aggregates the elements of this RDD in a multi-level tree pattern.
      * @param depth suggested depth of the tree (default: 2)
      * @see [[org.apache.spark.rdd.RDD#aggregate]]
      */
      即:treeAggregate和aggregate能夠同樣用,只是多了一個參數depth,但此參數默認爲2,能夠不指定。
        treeAggregate和aggregate的參數,返回值及用法徹底同樣。只是處理過程及最終的結果集處理有些微不一樣,下面詳細說明。

      函數頭:
        def treeAggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U,combOp: (U, U) => U,depth: Int = 2): U

 2.用法示例 
    【aggregate】
        scala> def seq(a:Int,b:Int):Int={
             | println("seq:"+a+":"+b)
             | math.min(a,b)}
        seq: (a: Int, b: Int)Int

        scala> def comb(a:Int,b:Int):Int={
             | println("comb:"+a+":"+b)
             | a+b}
        comb: (a: Int, b: Int)Int

        val z =sc.parallelize(List(1,2,4,5,8,9),3)
        scala> z.aggregate(3)(seq,comb)
        seq:3:4
        seq:3:1
        seq:1:2
        seq:3:8
        seq:3:5
        seq:3:9
        comb:3:1
        comb:4:3
        comb:7:3
        res0: Int = 10
  【treeAggregate】
        scala> def seq(a:Int,b:Int):Int={
             | println("seq:"+a+":"+b)
             | math.min(a,b)}
        seq: (a: Int, b: Int)Int

        scala> def comb(a:Int,b:Int):Int={
             | println("comb:"+a+":"+b)
             | a+b}
        comb: (a: Int, b: Int)Int

        val z =sc.parallelize(List(1,2,4,5,8,9),3)
        scala> z.treeAggregate(3)(seq,comb)
        seq:3:4   //3 分區1
        seq:3:1   //1 分區1
        seq:1:2   //1 分區1
        seq:3:8   //3 分區2
        seq:3:5   //3 分區2
        seq:3:9   //3 分區3
        comb:1:3
        comb:4:3
        res1: Int = 7

    由上可見,形式上兩種用法一致,只是aggregate 比 treeAggregate在最後結果的reduce操做時,多使用了一次初始值。

    3.區別

      查看aggregate的代碼和treeAggregate的代碼實現會發現,確實如上現象所反映,整理結果以下:
      (1)最終結果上,aggregate會比treeAggregate多作一次對於初始值的combOp操做。但從參數名字上就能夠看到,
          通常要傳入相似0或者空的集合的zeroValue初始值。
      (2)aggregate會把分區的結果直接拿到driver端作reduce操做。treeAggregate會先把分區結果作reduceByKey,
          最後再把結果拿到driver端作reduce,算出最終結果。reduceByKey須要幾層,由參數depth決定,也就是至關於
          作了depth層的reduceByKey,這也是treeAggregate名字的由來。

    4.源碼解釋
      源碼邏輯如上分析,較簡單,不贅述了。
借鑑圖一張(http://blog.csdn.net/lookqlp/article/details/52121057)
5.優缺點1) aggregate在combine上的操做,複雜度爲O(n). treeAggregate的時間複雜度爲O(lg n)。n爲分區數。
 (2) aggregate把數據所有拿到driver端,存在內存溢出的風險。treeAggregate則不會。             所以,筆者以爲就用treeAggregate好了,若有不對之處,敬請留言指正。
相關文章
相關標籤/搜索