package com.xp.cn import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} /** * Created by xupan on 2017/12/15. * Spark操做—aggregate:action * 將每一個分區裏面的元素進行聚合,而後用combine函數將每一個分區的結果和初始值(zeroValue)進行combine操做。 * 這個函數最終返回的類型不須要和RDD中元素類型一致。 */ object AggerateDemo { def main(args: Array[String]) { Logger.getLogger("org.apache.spark").setLevel(Level.ERROR) //SparkContext:spark執行入口 val sc: SparkContext = new SparkContext( new SparkConf() .setAppName("WordCount") .setMaster("local[4]") ) /** * 先在每一個分區中迭代執行 (x : Int,y : Int) => x + y 而且使用zeroValue的值1 * 即:part_0中 zeroValue+5+4+3+2+1 = 1+5+4+3+2+1 = 16 * part_1中 zeroValue+10+9+8+7+6 = 1+10+9+8+7+6 = 41 * 再將兩個分區的結果合併(a : Int,b : Int) => a + b ,而且使用zeroValue的值1 * 即:zeroValue+part_0+part_1 = 1 + 16 + 41 = 58 **/ val kk = sc.parallelize(Array(5, 4, 3, 2, 1, 10, 9, 8, 7, 6), 2) val oo = kk.aggregate(1)({ (x: Int, y: Int) => x + y }, { (a: Int, b: Int) => a + b }) println(oo + "oo") //58 /** * ##此次zeroValue=2 ##part_0中 zeroValue+5+4+3+2+1 = 2+5+4+3+2+1 = 17 ##part_1中 zeroValue+10+9+8+7+6 = 2+10+9+8+7+6 = 42 ##最後:zeroValue*part_0*part_1 = 2 * 17 * 42 = 1428 */ val res = kk.aggregate(2)({ (x: Int, y: Int) => x + y }, { (a: Int, b: Int) => a * b }) println("res :" + res) /** 1. 0+1, 0+1 2. 1+2, 1+1 3. 3+3, 2+1 4. 6+4, 3+1 5. 10+5, 4+1 ...... 實際Spark執行中是分佈式計算,可能會把List分紅多個分區, 假如3個,p1(1,2,3,4),p2(5,6,7,8),p3(9), 通過計算各分區的的結果(10,4),(26,4),(9,1), 這樣,執行(par1,par2) =>(par1._1 + par2._1, par1._2 + par2._2) 就是(10+26+9,4+4+1)即(45,9),再求平均值就簡單了。 */ val ll = kk.aggregate((0, 0))( (acc, number) => (acc._1 + number, acc._2 + 1), (par1, par2) => (par1._1 + par2._1, par1._2 + par2._2) ) println("ll : " + (ll._1.toDouble / ll._2)) //聚合求,指定2個分區,每一個分區對應一個task val rdd1 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2) //aggregate:聚合,有兩個括號的方法,柯立化方法,初始值爲0,傳遞了兩個函數 //第一個:每一個分區上作聚合 + 1 //第二個:將每一個分區的結果再次聚合 + 1 val aggRDD = rdd1.aggregate(1)(_ + _, _ + _) println(" aggRDD : " + aggRDD) //求每一個分區的最大值,再將結果聚合 val result = rdd1.aggregate(1)(math.max(_, _), _ + _) println(" result : " + result) //14 val rdd2 = sc.parallelize(Array(1, 2, 3, 4, 5, 6, 7, 8, 9), 2) val result2 = rdd2.aggregate(0)(math.max(_, _), _ + _) println(" result2 : " + result2) //18 //第一個參數2也要參與計算,只是最後參加一次 val result3 = rdd2.aggregate(2)(math.max(_, _), _ + _) println("result3 : " + result3) //result3 : 20 println("===============================") val rddS = sc.parallelize(Array("a", "b", "c", "d", "e"), 2) val resultS = rddS.aggregate("")(_ + _, _ + _) //結果在變化 resultS : cdeab resultS : abcde //變化的緣由是由於並不肯定哪一個分區先執行完成 println(" resultS : " + resultS) println("================================") //計算每一個分區最大的字符串長度,並轉換爲字符串相加 //rdds2Result : 65 rdds2Result : 56 //結果在變化 val rdds2 = sc.parallelize(Array("sda", "bsdfs", "c", "d", "esdfsd"), 2); val rdds2Result = rdds2.aggregate("")((x, y) => math.max(x.length, y.length).toString, (x, y) => x + y) println(" rdds2Result : " + rdds2Result) println("================================") val rddw = sc.parallelize(List("4444", "11113"), 2) val rddws = rddw.aggregate("")((x, y) => math.min(x.length, y.length).toString, (x, y) => x + y) println(" rddws :" + rddws) //關閉資源 sc.stop() } }