spark aggregate

package com.xp.cn

import org.apache.log4j.{Level, Logger}
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by xupan on 2017/12/15.
 * Spark操做—aggregate:action
 * 將每一個分區裏面的元素進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。
 * 這個函數最終返回的類型不須要和RDD中元素類型一致。
 */
object AggerateDemo {

  def main(args: Array[String]) {


    Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)

    //SparkContext:spark執行入口
    val sc: SparkContext = new SparkContext(
      new SparkConf()
        .setAppName("WordCount")
        .setMaster("local[4]")
    )

    /**
     * 先在每一個分區中迭代執行 (x : Int,y : Int) => x + y 而且使用zeroValue的值1
     * 即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16
     * part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41
     * 再將兩個分區的結果合併(a : Int,b : Int) => a + b ,而且使用zeroValue的值1
     * 即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58
     **/
    val kk = sc.parallelize(Array(5, 4, 3, 2, 1, 10, 9, 8, 7, 6), 2)
    val oo = kk.aggregate(1)({ (x: Int, y: Int) => x + y }, { (a: Int, b: Int) => a + b })
    println(oo + "oo") //58


    /**
     * ##此次zeroValue=2
      ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17
      ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42
      ##最後:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428
     */
    val res = kk.aggregate(2)({ (x: Int, y: Int) => x + y }, { (a: Int, b: Int) => a * b })
    println("res :" + res)

    /**
    1.  0+1,  0+1
    2.  1+2,  1+1
    3.  3+3,  2+1
    4.  6+4,  3+1
    5.  10+5,  4+1
      ......
    實際Spark執行中是分佈式計算,可能會把List分紅多個分區,
    假如3個,p1(1,2,3,4),p2(5,6,7,8),p3(9),
    通過計算各分區的的結果(10,4),(26,4),(9,1),
    這樣,執行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2)
    就是(10+26+9,4+4+1)即(45,9),再求平均值就簡單了。
      */
    val ll = kk.aggregate((0, 0))(
      (acc, number) => (acc._1 + number, acc._2 + 1),
      (par1, par2) => (par1._1 + par2._1, par1._2 + par2._2)
    )
    println("ll : " + (ll._1.toDouble / ll._2))


    //聚合求,指定2個分區,每一個分區對應一個task
    val rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)
    //aggregate:聚合,有兩個括號的方法,柯立化方法,初始值爲0,傳遞了兩個函數
    //第一個:每一個分區上作聚合 + 1
    //第二個:將每一個分區的結果再次聚合 + 1
    val aggRDD = rdd1.aggregate(1)(_ + _, _ + _)
    println(" aggRDD :  " + aggRDD)


    //求每一個分區的最大值,再將結果聚合
    val result = rdd1.aggregate(1)(math.max(_, _), _ + _)
    println(" result : " + result) //14

    val rdd2 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)
    val result2 = rdd2.aggregate(0)(math.max(_, _), _ + _)
    println(" result2 : " + result2) //18


    //第一個參數2也要參與計算,只是最後參加一次
    val result3 = rdd2.aggregate(2)(math.max(_, _), _ + _)
    println("result3 : " + result3) //result3 : 20


    println("===============================")


    val rddS = sc.parallelize(Array("a", "b", "c", "d", "e"), 2)
    val resultS = rddS.aggregate("")(_ + _, _ + _)
    //結果在變化 resultS : cdeab  resultS : abcde
    //變化的緣由是由於並不肯定哪一個分區先執行完成
    println(" resultS : " + resultS)



    println("================================")

    //計算每一個分區最大的字符串長度,並轉換爲字符串相加
    //rdds2Result : 65   rdds2Result : 56
    //結果在變化
    val rdds2 = sc.parallelize(Array("sda", "bsdfs", "c", "d", "esdfsd"), 2);
    val rdds2Result = rdds2.aggregate("")((x, y) => math.max(x.length, y.length).toString, (x, y) => x + y)
    println(" rdds2Result : " + rdds2Result)


    println("================================")
    val rddw = sc.parallelize(List("4444", "11113"), 2)
    val rddws = rddw.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y)
    println(" rddws :" + rddws)

    //關閉資源
    sc.stop()
  }

}
相關文章
相關標籤/搜索