Spark 的 cogroup 和 join 算子

cogroup 這個算子使用的頻率很低,join 算子使用頻率較高,二者都是根據兩個 RDD 的 key 進行關聯。具體看下面的代碼,先看下面的 2 個 RDD:java

SparkConf conf = new SparkConf()
                .setAppName("co")
                .setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<String, Integer>> words1 = Arrays.asList(
                new Tuple2<>("hello", 3),
                new Tuple2<>("hello", 2),
                new Tuple2<>("world", 7),
                new Tuple2<>("hello", 12),
                new Tuple2<>("you", 9)
        );

        List<Tuple2<String, Integer>> words2 = Arrays.asList(
                new Tuple2<>("hello", 21),
                new Tuple2<>("world", 24),
                new Tuple2<>("hello", 25),
                new Tuple2<>("you", 28)
        );

        JavaPairRDD<String, Integer> words1RDD = sc.parallelizePairs(words1);
        JavaPairRDD<String, Integer> words2RDD = sc.parallelizePairs(words2);

複製代碼

上面的 RDD 中,words1RDD 和 words2RDD 中的 key 都有重複的。而後看看看二者分別用 cogroup 和 join 算子的操做結果,先看 cogroup 的:spa

int count = 1;

        JavaPairRDD<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> cogroupRDD = words1RDD.cogroup(words2RDD);
        List<Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>>> cogroupResult = cogroupRDD.collect();
        for (Tuple2<String, Tuple2<Iterable<Integer>, Iterable<Integer>>> t : cogroupResult){
            String word = t._1;
            Iterable<Integer> word1Counts = t._2._1;
            Iterable<Integer> word2Counts = t._2._2;

            String countInfo = "";
            for (Integer c1 : word1Counts) {
                countInfo += c1 + "(words1RDD),";
            }

            for (Integer c2 : word2Counts) {
                countInfo += c2 + "(words2RDD),";
            }

            System.out.println(String.format("第%s個元素爲:%s -> %s", count, word, countInfo));

            count++;
        }
複製代碼

輸出結果爲:code

1個元素爲:you -> 9(words1RDD),28(words2RDD),
第2個元素爲:hello -> 3(words1RDD),2(words1RDD),12(words1RDD),21(words2RDD),25(words2RDD),
第3個元素爲:world -> 7(words1RDD),24(words2RDD),
複製代碼

再看 join 的:orm

JavaPairRDD<String, Tuple2<Integer, Integer>> joinedRDD = words1RDD.join(words2RDD);
        List<Tuple2<String, Tuple2<Integer, Integer>>> joinedResult = joinedRDD.collect();
        for (Tuple2<String, Tuple2<Integer, Integer>> t : joinedResult) {
            System.out.println(String.format("第%s個元素爲:%s -> %s(words1RDD),%s(words2RDD)", count, t._1, t._2._1, t._2._2));
            count++;
        }
複製代碼

輸出結果爲:string

1個元素爲:you -> 9(words1RDD),28(words2RDD)
第2個元素爲:hello -> 3(words1RDD),21(words2RDD)
第3個元素爲:hello -> 3(words1RDD),25(words2RDD)
第4個元素爲:hello -> 2(words1RDD),21(words2RDD)
第5個元素爲:hello -> 2(words1RDD),25(words2RDD)
第6個元素爲:hello -> 12(words1RDD),21(words2RDD)
第7個元素爲:hello -> 12(words1RDD),25(words2RDD)
第8個元素爲:world -> 7(words1RDD),24(words2RDD)
複製代碼

cogroup 算子計算過程會對相同的 key 作聚合操做,join 則不會。ast

相關文章
相關標籤/搜索