0五、transformation操做開發實戰

一、map:將集合中每一個元素乘以2
二、filter:過濾出集合中的偶數
三、flatMap:將行拆分爲單詞
四、groupByKey:將每一個班級的成績進行分組
五、reduceByKey:統計每一個班級的總分
六、sortByKey、sortBy :將學生分數進行排序
七、join:打印每一個學生的成績
八、cogroup:打印每一個學生的成績

 
package sparkcore.java;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
 * transformation操做實戰
 */
public class TransformationOperation {
    public static void main(String[] args) {
        // map();
        // filter();
        // flatMap();
        // groupByKey();
        // reduceByKey();
        // sortByKey();
        // sortBy();
        join();
        cogroup();
    }
    /**
     * map算子案例:將集合中每個元素都乘以2
     */
    public static void map() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 構造集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
        // 並行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        // 使用map算子,將集合中的每一個元素都乘以2
        // map算子,是對任何類型的RDD,均可以調用的
        // 在java中,map算子接收的參數是Function對象
        // 建立的Function對象,必定會讓你設置第二個泛型參數,這個泛型類型,就是返回的新元素的類型
        // 同時call()方法的返回類型,也必須與第二個泛型類型相同
        // 在call()方法內部,就能夠對原始RDD中的每個元素進行各類處理和計算,並返回一個新的元素
        // 全部新的元素就會組成一個新的RDD
        JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            // 傳入call()方法的,就是1,2,3,4,5
            // 返回的就是2,4,6,8,10
            @Override
            public Integer call(Integer v1throws Exception {
                return v1 * 2;
            }
        });
        // 打印新的RDD
        multipleNumberRDD.foreach(new VoidFunction<Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Integer tthrows Exception {
                System.out.println(t);
            }
            // 輸出結果:
            // 2
            // 4
            // 6
            // 8
            // 10
        });
        // 關閉JavaSparkContext
        sc.close();
    }
    /**
     * filter算子案例:過濾集合中的偶數
     */
    public static void filter() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模擬集合
        List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
        // 並行化集合,建立初始RDD
        JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
        // 對初始RDD執行filter算子,過濾出其中的偶數
        // filter算子,傳入的也是Function,其餘的使用注意點,實際上和map是同樣的
        // 可是,惟一的不一樣,就是call()方法的返回類型是Boolean
        // 每個初始RDD中的元素,都會傳入call()方法,此時你能夠執行各類自定義的計算邏輯
        // 來判斷這個元素是不是你想要的
        // 若是你想在新的RDD中保留這個元素,那麼就返回true;不然,不想保留這個元素,返回false
        JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {
            private static final long serialVersionUID = 1L;
            // 在這裏,1到10,都會傳入進來
            // 可是根據咱們的邏輯,只有2,4,6,8,10這幾個偶數,會返回true
            // 因此,只有偶數會保留下來,放在新的RDD中
            @Override
            public Boolean call(Integer v1throws Exception {
                return v1 % 2 == 0;
            }
        });
        // 打印新的RDD
        evenNumberRDD.foreach(new VoidFunction<Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Integer tthrows Exception {
                System.out.println(t);
            }
            // 輸出結果:
            // 2
            // 4
            // 6
            // 8
            // 10
        });
        // 關閉JavaSparkContext
        sc.close();
    }
    /**
     * flatMap案例:將文本行拆分爲多個單詞
     */
    public static void flatMap() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 構造集合
        List<String> lineList = Arrays.asList("hello you""hello me""hello world");
        // 並行化集合,建立RDD
        JavaRDD<String> lines = sc.parallelize(lineList);
        // 對RDD執行flatMap算子,將每一行文本,拆分爲多個單詞
        // flatMap算子,在java中,接收的參數是FlatMapFunction
        // 咱們須要本身定義FlatMapFunction的第二個泛型類型,即,表明了返回的新元素的類型
        // call()方法,返回的類型,不是U,而是Iterator<U>,這裏的U也與第二個泛型類型相同
        // flatMap其實就是,接收原始RDD中的每一個元素,並進行各類邏輯的計算和處理,返回能夠返回多個元素
        // 多個元素,即封裝在Iterator集合中,可使用ArrayList等集合
        // 新的RDD中,即封裝了全部的新元素;也就是說,新的RDD的大小必定是 >= 原始RDD的大小
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            private static final long serialVersionUID = 1L;
            // 在這裏會,好比,傳入第一行,hello you
            // 返回的是一個Iterator<String>(hello, you)
            @Override
            public Iterator<String> call(String tthrows Exception {
                return Arrays.asList(t.split(" ")).iterator();
            }
        });
        // 打印新的RDD
        words.foreach(new VoidFunction<String>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(String tthrows Exception {
                System.out.println(t);
            }
            // 輸出結果:
            // hello
            // you
            // hello
            // me
            // hello
            // world
        });
        // 關閉JavaSparkContext
        sc.close();
    }
    /**
     * groupByKey案例:按照班級對成績進行分組
     */
    public static void groupByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模擬集合
        List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),
                new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90),
                new Tuple2<String, Integer>("class2", 65));
        // 並行化集合,建立JavaPairRDD
        JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
        // 針對scores RDD,執行groupByKey算子,對每一個班級的成績進行分組
        // groupByKey算子,返回的仍是JavaPairRDD
        // 可是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
        // 也就是說,按照了key進行分組,那麼每一個key可能都會有多個value,此時多個value聚合成了Iterable
        // 那麼接下來,咱們是否是就能夠經過groupedScores這種JavaPairRDD,很方便地處理每一個分組內的數據
        JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
        // 打印groupedScores RDD
        groupedScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Iterable<Integer>> tthrows Exception {
                System.out.println("class: " + t._1);
                Iterator<Integer> ite = t._2.iterator();
                while (ite.hasNext()) {
                    System.out.println(ite.next());
                }
                System.out.println("==============================");
            }
            // 輸出結果:
            // class: class1
            // 80
            // 90
            // ==============================
            // class: class2
            // 75
            // 65
            // ==============================
        });
        // 關閉JavaSparkContext
        sc.close();
    }
    /**
     * reduceByKey案例:統計每一個班級的總分
     */
    public static void reduceByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模擬集合
        List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),
                new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90),
                new Tuple2<String, Integer>("class2", 65));
        // 並行化集合,建立JavaPairRDD
        JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
        // 針對scores RDD,執行reduceByKey算子
        // reduceByKey,接收的參數是Function2類型,它有三個泛型參數,實際上表明瞭三個值
        // 第一個泛型類型和第二個泛型類型,表明了原始RDD中的元素的value的類型
        // 所以對每一個key進行reduce,都會依次將第一個、第二個value傳入,將值再與第三個value傳入
        // 所以此處,會自動定義兩個泛型類型,表明call()方法的兩個傳入參數的類型
        // 第三個泛型類型,表明了每次reduce操做返回的值的類型,默認也是與原始RDD的value類型相同的
        // reduceByKey算法返回的RDD,仍是JavaPairRDD<key, value>
        JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
            private static final long serialVersionUID = 1L;
            // 對每一個key,都會將其value,依次傳入call方法
            // 從而聚合出每一個key對應的一個value
            // 而後,將每一個key對應的一個value,組合成一個Tuple2,做爲新RDD的元素
            @Override
            public Integer call(Integer v1, Integer v2throws Exception {
                return v1 + v2;
            }
        });
        // 打印totalScores RDD
        totalScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Integer> tthrows Exception {
                System.out.println(t._1 + ": " + t._2);
            }
            // 輸出結果:
            // class1: 170
            // class2: 140
        });
        // 關閉JavaSparkContext
        sc.close();
    }
    /**
     * sortByKey案例:按照學生分數進行排序,分數爲Key
     */
    public static void sortByKey() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模擬集合
        List<Tuple2<Integer, String>> scoreList = Arrays.asList(new Tuple2<Integer, String>(65, "leo"),
                new Tuple2<Integer, String>(50, "tom"), new Tuple2<Integer, String>(100, "marry"), new Tuple2<Integer, String>(80, "jack"));
        // 並行化集合,建立RDD
        JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
        // 對scores RDD執行sortByKey算子
        // sortByKey其實就是根據key進行排序,能夠手動指定升序,或者降序(false時)
        // 返回的,仍是JavaPairRDD,其中的元素內容,都是和原始的RDD如出一轍的
        // 可是就是RDD中的元素的順序,不一樣了
        JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);
        // 打印sortedScored RDD
        sortedScores.foreach(new VoidFunction<Tuple2<Integer, String>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<Integer, String> tthrows Exception {
                System.out.println(t._1 + ": " + t._2);
            }
            // 輸出結果:
            // 100: marry
            // 80: jack
            // 65: leo
            // 50: tom
        });
        // 關閉JavaSparkContext
        sc.close();
    }
    /**
     * sortByKey案例:按照學生分數進行排序,分數爲value
     */
    public static void sortBy() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模擬集合
        List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("leo", 65),
                new Tuple2<String, Integer>("tom", 50), new Tuple2<String, Integer>("marry", 100), new Tuple2<String, Integer>("jack", 80));
        // 注:只有JavaRDD纔有sortBy方法,而JavaPairRDD是沒有的
        JavaRDD<Tuple2<String, Integer>> scores = sc.parallelize(scoreList);
        // 根據value值進行降序排序
        JavaRDD<Tuple2<String, Integer>> sortedScores = scores.sortBy(new Function<Tuple2<String, Integer>, Integer>() {
            private static final long serialVersionUID = 1L;
            @Override
            public Integer call(Tuple2<String, Integer> v1throws Exception {
                return v1._2;// 返回待排序的值,這裏根據value進行排序,而非key
            }
        }, false, 1);
        // 打印sortedScored RDD
        sortedScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<String, Integer> tthrows Exception {
                System.out.println(t._1 + ": " + t._2);
            }
            // 輸出結果:
            // marry: 100
            // jack: 80
            // leo: 65
            // tom: 50
        });
        // 關閉JavaSparkContext
        sc.close();
    }
    /**
     * join案例:打印學生成績
     */
    public static void join() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("join").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模擬集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(new Tuple2<Integer, String>(1, "leo"),
                new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom"));
        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90),
                new Tuple2<Integer, Integer>(3, 60), new Tuple2<Integer, Integer>(1, 70), new Tuple2<Integer, Integer>(2, 80),
                new Tuple2<Integer, Integer>(3, 50));
        // 並行化兩個RDD
        JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
        // 使用join算子關聯兩個RDD
        // join會根據key進行join,並返回JavaPairRDD
        // 該JavaPairRDD的第一個泛型類型爲兩個原始JavaPairRDD的key的類型(兩個Key類型相同)
        // 第二個泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個泛型分別爲兩個原始RDD的value的類型
        // 什麼意思呢?好比有(1, 1) (1, 2) (1, 3)的一個RDD
        // 還有一個(1, 4) (2, 1) (2, 2)的一個RDD
        // join之後,實際上會獲得(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
        JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
        // 打印studnetScores RDD
        studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<Integer, Tuple2<String, Integer>> tthrows Exception {
                System.out.println("student id: " + t._1);
                System.out.println("student name: " + t._2._1);
                System.out.println("student score: " + t._2._2);
                System.out.println("===============================");
            }
            // 輸出結果:
            // student id: 1
            // student name: leo
            // student score: 100
            // ===============================
            // student id: 1
            // student name: leo
            // student score: 70
            // ===============================
            // student id: 3
            // student name: tom
            // student score: 60
            // ===============================
            // student id: 3
            // student name: tom
            // student score: 50
            // ===============================
            // student id: 2
            // student name: jack
            // student score: 90
            // ===============================
            // student id: 2
            // student name: jack
            // student score: 80
            // ===============================
        });
        // 關閉JavaSparkContext
        sc.close();
    }
    /**
     * cogroup案例:打印學生成績
     */
    public static void cogroup() {
        // 建立SparkConf
        SparkConf conf = new SparkConf().setAppName("cogroup").setMaster("local");
        // 建立JavaSparkContext
        JavaSparkContext sc = new JavaSparkContext(conf);
        // 模擬集合
        List<Tuple2<Integer, String>> studentList = Arrays.asList(new Tuple2<Integer, String>(1, "leo"),
                new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom"));
        List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90),
                new Tuple2<Integer, Integer>(3, 60), new Tuple2<Integer, Integer>(1, 70), new Tuple2<Integer, Integer>(2, 80),
                new Tuple2<Integer, Integer>(3, 50));
        // 並行化兩個RDD
        JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
        JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
        // cogroup與join不一樣
        // 至關因而,一個key join上的全部value,都給放到一個Iterable裏面去了
        // cogroup,不太好講解,但願你們經過動手編寫咱們的案例,仔細體會其中的奧妙
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores);
        // 打印studnetScores RDD
        studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
            private static final long serialVersionUID = 1L;
            @Override
            public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> tthrows Exception {
                System.out.println("student id: " + t._1);
                System.out.println("student name: " + t._2._1);
                System.out.println("student score: " + t._2._2);
                System.out.println("===============================");
            }
            // 輸出結果:
            // student id: 1
            // student name: [leo]
            // student score: [100, 70]
            // ===============================
            // student id: 3
            // student name: [tom]
            // student score: [60, 50]
            // ===============================
            // student id: 2
            // student name: [jack]
            // student score: [90, 80]
            // ===============================
        });
        // 關閉JavaSparkContext
        sc.close();
    }
}



package sparkcore.scala
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object TransformationOperation {
  def main(args: Array[String]) {
    //    map()
    //    filter()
    //    flatMap()
    //    groupByKey()
    //    reduceByKey()
    //    sortByKey()
    //    sortBy()
    //    join()
    cogroup()
  }
  def map() {
    val conf = new SparkConf()
      .setAppName("map")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val numbers = Array(12345)
    val numberRDD = sc.parallelize(numbers1)
    val multipleNumberRDD = numberRDD.map { num => num * 2 }
    multipleNumberRDD.foreach { num => println(num) }
    //    輸出結果:
    //    2
    //    4
    //    6
    //    8
    //    10
  }
  def filter() {
    val conf = new SparkConf()
      .setAppName("filter")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val numbers = Array(12345678910)
    val numberRDD = sc.parallelize(numbers1)
    val evenNumberRDD = numberRDD.filter { num => num % 2 == 0 }
    evenNumberRDD.foreach { num => println(num) }
    //    輸出結果:
    //    2
    //    4
    //    6
    //    8
    //    10
  }
  def flatMap() {
    val conf = new SparkConf()
      .setAppName("flatMap")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val lineArray = Array("hello you""hello me""hello world")
    val lines = sc.parallelize(lineArray1)
    val words = lines.flatMap { line => line.split(" ") }
    words.foreach { word => println(word) }
    //    輸出結果:
    //    hello
    //    you
    //    hello
    //    me
    //    hello
    //    world
  }
  def groupByKey() {
    val conf = new SparkConf()
      .setAppName("groupByKey")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val scoreList = Array(("class1"80), ("class2"75),
      ("class1"90), ("class2"60))
    val scores = sc.parallelize(scoreList1)
    val groupedScores = scores.groupByKey()
    groupedScores.foreach(score => {
      println(score._1);
      score._2.foreach { singleScore => println(singleScore) };
      println("=============================")
    })
    //    輸出結果:
    //    class1
    //    80
    //    90
    //    =============================
    //    class2
    //    75
    //    60
    //    =============================
  }
  def reduceByKey() {
    val conf = new SparkConf()
      .setAppName("groupByKey")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val scoreList = Array(("class1"80), ("class2"75),
      ("class1"90), ("class2"60))
    val scores = sc.parallelize(scoreList1)
    val totalScores = scores.reduceByKey(_ + _)
    totalScores.foreach(classScore => println(classScore._1 + ": " + classScore._2))
    //    輸出結果:
    //    class1: 170
    //    class2: 135
  }
  def sortByKey() {
    val conf = new SparkConf()
      .setAppName("sortByKey")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val scoreList = Array((65"leo"), (50"tom"),
      (100"marry"), (85"jack"))
    val scores = sc.parallelize(scoreList1)
    val sortedScores = scores.sortByKey(false)
    sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))
    //    輸出結果:
    //    100: marry
    //    85: jack
    //    65: leo
    //    50: tom
  }
  def sortBy() {
    val conf = new SparkConf()
      .setAppName("sortByKey")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val scoreList = Array(("leo"65), ("tom"50), ("marry"100), ("jack"80))
    val scores = sc.parallelize(scoreList1)
    val sortedScores = scores.sortBy(_._2false1)
    sortedScores.foreach(studentScore => println(studentScore._1 + ": " + studentScore._2))
    //    輸出結果:
    //    marry: 100
    //    jack: 80
    //    leo: 65
    //    tom: 50
  }
  def join() {
    val conf = new SparkConf()
      .setAppName("join")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val studentList = Array((1"leo"),(2"jack"),(3"tom"));
    val scoreList = Array((1100), (290), (360),(170), (280), (350));
    val students = sc.parallelize(studentList);
    val scores = sc.parallelize(scoreList);
    val studentScores = students.join(scores)
    studentScores.foreach(studentScore => {
      println("student id: " + studentScore._1);
      println("student name: " + studentScore._2._1)
      println("student socre: " + studentScore._2._2)
      println("=======================================")
    })
    //    輸出結果:
    //    student id: 1
    //    student name: leo
    //    student socre: 100
    //    =======================================
    //    student id: 1
    //    student name: leo
    //    student socre: 70
    //    =======================================
    //    student id: 3
    //    student name: tom
    //    student socre: 60
    //    =======================================
    //    student id: 3
    //    student name: tom
    //    student socre: 50
    //    =======================================
    //    student id: 2
    //    student name: jack
    //    student socre: 90
    //    =======================================
    //    student id: 2
    //    student name: jack
    //    student socre: 80
    //    =======================================
  }
  def cogroup() {
    val conf = new SparkConf()
      .setAppName("join")
      .setMaster("local")
    val sc = new SparkContext(conf)
    val studentList = Array((1"leo"),(2"jack"),(3"tom"));
    val scoreList = Array((1100), (290), (360),(170), (280), (350));
    val students = sc.parallelize(studentList);
    val scores = sc.parallelize(scoreList);
    val studentScores = students.cogroup(scores)
    studentScores.foreach(studentScore => {
      println("student id: " + studentScore._1);
      println("student name: " + studentScore._2._1)
      println("student socre: " + studentScore._2._2)
      println("=======================================")
    })
    //    輸出結果:
    //    student id: 1
    //    student name: CompactBuffer(leo)
    //    student socre: CompactBuffer(100, 70)
    //    =======================================
    //    student id: 3
    //    student name: CompactBuffer(tom)
    //    student socre: CompactBuffer(60, 50)
    //    =======================================
    //    student id: 2
    //    student name: CompactBuffer(jack)
    //    student socre: CompactBuffer(90, 80)
    //    =======================================
  }
}
相關文章
相關標籤/搜索