AggregateByKey算子操做。java
Github項目上已包含Spark全部操做DEMO。git
Java版本:github
package com.huangyueran.spark.operator; import java.util.ArrayList; import java.util.List; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.Function2; import scala.Tuple2; /** * @category aggregateByKey函數對PairRDD中相同Key的值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值。 * 和aggregate函數相似,aggregateByKey返回值得類型不須要和RDD中value的類型一致。 * 由於aggregateByKey是對相同Key中的值進行聚合操做,因此aggregateByKey函數最終返回的類型仍是Pair * RDD,對應的結果是Key和聚合好的值;而aggregate函數直接返回非RDD的結果。 * 1.zeroValue:表示在每一個分區中第一次拿到key值時,用於建立一個返回類型的函數,這個函數最終會被包裝成先生成一個返回類型, * 而後經過調用seqOp函數,把第一個key對應的value添加到這個類型U的變量中。 * 2.seqOp:這個用於把迭代分區中key對應的值添加到zeroValue建立的U類型實例中。 * 3.combOp:這個用於合併每一個分區中聚合過來的兩個U類型的值。 * @author huangyueran * @time 2019-7-21 16:38:20 */ public class AggregateByKey { public static void main(String[] args) { /** * SparkConf:第一步建立一個SparkConf,在這個對象裏面能夠設置容許模式Local Standalone yarn * AppName(能夠在Web UI中看到) 還能夠設置Spark運行時的資源要求 */ SparkConf conf = new SparkConf().setAppName("AggregateByKey").setMaster("local"); // SparkConf conf = new SparkConf().setAppName("JoinOperator"); /** * 基於SparkConf的對象能夠建立出來一個SparkContext Spark上下文 * SparkContext是通往集羣的惟一通道,SparkContext在建立的時候還會建立任務調度器 */ JavaSparkContext sc = new JavaSparkContext(conf); aggregateByKey(sc); } private static void aggregateByKey(JavaSparkContext sc) { List<Tuple2<Integer, Integer>> datas = new ArrayList<>(); datas.add(new Tuple2<>(1, 3)); datas.add(new Tuple2<>(1, 2)); datas.add(new Tuple2<>(1, 4)); datas.add(new Tuple2<>(2, 3)); List<Tuple2<Integer, Integer>> list = sc.parallelizePairs(datas, 2) .aggregateByKey(0, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("seq: " + v1 + "\t" + v2); return v1 + v2; } }, new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { System.out.println("comb: " + v1 + "\t" + v2); return v1 + v2; } }).collect(); List<Tuple2<Integer, Integer>> list2 = sc.parallelizePairs(datas, 2) .reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }).collect(); for (Tuple2 t : list) { System.out.println(t._1 + "=====" + t._2); } for (Tuple2 t : list2) { System.out.println(t._1 + "=====" + t._2); } } }
Scala版本:apache
package com.hyr.spark.operator import org.apache.spark.{SparkConf, SparkContext} /** ***************************************************************************** * @date 2019-08-07 15:46 * @author: <a href=mailto:huangyr>黃躍然</a> * @Description: * aggregateByKey函數對PairRDD中相同Key的值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值。 * 和aggregate函數相似,aggregateByKey返回值得類型不須要和RDD中value的類型一致。 * 由於aggregateByKey是對相同Key中的值進行聚合操做,因此aggregateByKey函數最終返回的類型仍是Pair * RDD,對應的結果是Key和聚合好的值;而aggregate函數直接返回非RDD的結果。 * 1.zeroValue:表示在每一個分區中第一次拿到key值時,用於建立一個返回類型的函數,這個函數最終會被包裝成先生成一個返回類型, * 而後經過調用seqOp函數,把第一個key對應的value添加到這個類型U的變量中。 * 2.seqOp:這個用於把迭代分區中key對應的值添加到zeroValue建立的U類型實例中。 * 3.combOp:這個用於合併每一個分區中聚合過來的兩個U類型的值。 * *****************************************************************************/ object AggregateByKey { def aggregateByKey(sparkContext: SparkContext): Unit = { val datas = List((1, 3), (2, 6), (1, 4), (2, 3)) val rdd = sparkContext.parallelize(datas, 2) val tuples1 = rdd.aggregateByKey(0)((sum: Int, value: Int) => { println("seq:" + sum + "\t" + value) sum + value }, (sum: Int, value: Int) => { println("comb:" + sum + "\t" + value) sum + value }).collect() for(t<-tuples1){ println(t._1+" "+t._2) } val tuples2 = rdd.reduceByKey((sum: Int, value: Int) => { println("sum:" + sum + "\t" + "value:" + value) sum + value }).collect() for(t<-tuples2){ println(t._1+" "+t._2) } } def main(args: Array[String]): Unit = { val sparkConf = new SparkConf setAppName "AggregateByKey" setMaster "local" val sparkContext = new SparkContext(sparkConf) aggregateByKey(sparkContext) } }
Github地址:https://github.com/huangyueranbbc/SparkDemo api