Spark算子彙總-AggregateByKey

AggregateByKey算子操做。java

Github項目上已包含Spark全部操做DEMO。git

Java版本:github

package com.huangyueran.spark.operator;
import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function2;

import scala.Tuple2;

/** * @category aggregateByKey函數對PairRDD中相同Key的值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值。 * 和aggregate函數相似,aggregateByKey返回值得類型不須要和RDD中value的類型一致。 * 由於aggregateByKey是對相同Key中的值進行聚合操做,因此aggregateByKey函數最終返回的類型仍是Pair * RDD,對應的結果是Key和聚合好的值;而aggregate函數直接返回非RDD的結果。 * 1.zeroValue:表示在每一個分區中第一次拿到key值時,用於建立一個返回類型的函數,這個函數最終會被包裝成先生成一個返回類型, * 而後經過調用seqOp函數,把第一個key對應的value添加到這個類型U的變量中。 * 2.seqOp:這個用於把迭代分區中key對應的值添加到zeroValue建立的U類型實例中。 * 3.combOp:這個用於合併每一個分區中聚合過來的兩個U類型的值。 * @author huangyueran * @time 2019-7-21 16:38:20 */
public class AggregateByKey {

	public static void main(String[] args) {
		/** * SparkConf:第一步建立一個SparkConf,在這個對象裏面能夠設置容許模式Local Standalone yarn * AppName(能夠在Web UI中看到) 還能夠設置Spark運行時的資源要求 */
		SparkConf conf = new SparkConf().setAppName("AggregateByKey").setMaster("local");
		// SparkConf conf = new SparkConf().setAppName("JoinOperator");

		/** * 基於SparkConf的對象能夠建立出來一個SparkContext Spark上下文 * SparkContext是通往集羣的惟一通道,SparkContext在建立的時候還會建立任務調度器 */
		JavaSparkContext sc = new JavaSparkContext(conf);

		aggregateByKey(sc);
	}

	private static void aggregateByKey(JavaSparkContext sc) {
		List<Tuple2<Integer, Integer>> datas = new ArrayList<>();
		datas.add(new Tuple2<>(1, 3));
		datas.add(new Tuple2<>(1, 2));
		datas.add(new Tuple2<>(1, 4));
		datas.add(new Tuple2<>(2, 3));

		List<Tuple2<Integer, Integer>> list = sc.parallelizePairs(datas, 2)
				.aggregateByKey(0, new Function2<Integer, Integer, Integer>() {
					@Override
					public Integer call(Integer v1, Integer v2) throws Exception {
						System.out.println("seq: " + v1 + "\t" + v2);
						return v1 + v2;
					}
				}, new Function2<Integer, Integer, Integer>() {
					@Override
					public Integer call(Integer v1, Integer v2) throws Exception {
						System.out.println("comb: " + v1 + "\t" + v2);
						return v1 + v2;
					}
				}).collect();

		List<Tuple2<Integer, Integer>> list2 = sc.parallelizePairs(datas, 2)
				.reduceByKey(new Function2<Integer, Integer, Integer>() {

					@Override
					public Integer call(Integer v1, Integer v2) throws Exception {
						return v1 + v2;
					}
				}).collect();

		for (Tuple2 t : list) {
			System.out.println(t._1 + "=====" + t._2);
		}

		for (Tuple2 t : list2) {
			System.out.println(t._1 + "=====" + t._2);
		}
	}

}

 

Scala版本:apache

package com.hyr.spark.operator

import org.apache.spark.{SparkConf, SparkContext}

/** ***************************************************************************** * @date 2019-08-07 15:46 * @author: <a href=mailto:huangyr>黃躍然</a> * @Description: * aggregateByKey函數對PairRDD中相同Key的值進行聚合操做,在聚合過程當中一樣使用了一箇中立的初始值。 * 和aggregate函數相似,aggregateByKey返回值得類型不須要和RDD中value的類型一致。 * 由於aggregateByKey是對相同Key中的值進行聚合操做,因此aggregateByKey函數最終返回的類型仍是Pair * RDD,對應的結果是Key和聚合好的值;而aggregate函數直接返回非RDD的結果。 * 1.zeroValue:表示在每一個分區中第一次拿到key值時,用於建立一個返回類型的函數,這個函數最終會被包裝成先生成一個返回類型, * 而後經過調用seqOp函數,把第一個key對應的value添加到這個類型U的變量中。 * 2.seqOp:這個用於把迭代分區中key對應的值添加到zeroValue建立的U類型實例中。 * 3.combOp:這個用於合併每一個分區中聚合過來的兩個U類型的值。 * *****************************************************************************/
object AggregateByKey {


  def aggregateByKey(sparkContext: SparkContext): Unit = {
    val datas = List((1, 3), (2, 6), (1, 4), (2, 3))

    val rdd = sparkContext.parallelize(datas, 2)
    val tuples1 = rdd.aggregateByKey(0)((sum: Int, value: Int) => {
      println("seq:" + sum + "\t" + value)
      sum + value
    }, (sum: Int, value: Int) => {
      println("comb:" + sum + "\t" + value)
      sum + value
    }).collect()
    for(t<-tuples1){
      println(t._1+" "+t._2)
    }

    val tuples2 = rdd.reduceByKey((sum: Int, value: Int) => {
      println("sum:" + sum + "\t" + "value:" + value)
      sum + value
    }).collect()
    for(t<-tuples2){
      println(t._1+" "+t._2)
    }

  }

  def main(args: Array[String]): Unit = {
    val sparkConf = new SparkConf setAppName "AggregateByKey" setMaster "local"
    val sparkContext = new SparkContext(sparkConf)

    aggregateByKey(sparkContext)
  }

}

Github地址:https://github.com/huangyueranbbc/SparkDemo api

相關文章
相關標籤/搜索