Spark入門(七)--Spark的intersection、subtract、union和distinct

Spark的intersection

intersection顧名思義,他是指交叉的。當兩個RDD進行intersection後,將保留二者共有的。所以對於RDD1.intersection(RDD2) 和RDD2.intersection(RDD1) 。應該是一致的。java

好比對於,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7},對於包含這兩個List的RDD來講,他們進行一次intersection應該獲得result={3,4,5}python

Spark的subtract

subtract則和intersection不一樣,他是找出二者之間不一致的內容。git

好比對於,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他們進行一次subtract獲得的結果是跟順序有關的。github

list1.subtract(list2) 
複製代碼

結果應該爲apache

1 2
複製代碼

而對於api

list2.subtract(list1) 
複製代碼

結果應該爲bash

6 7
複製代碼

Spark的union

union最好理解,他是把兩個RDD進行整合,但不考慮其中重複的狀況。好比對於,List1 = {1,2,3,4,5} 和 List1 = {3,4,5,6,7}他們進行一次union獲得的結果是跟順序無關的。結果應該爲less

result = {1,2,3,4,5,3,4,5,6,7}
複製代碼

Spark的distinct

distinc 是將RDD中重複的內容剔除,注意,這個剔除的過程並不會把重複的元素都去掉,而是重複的元素只保留一份。這固然很好理解,好比result = {1,2,3,4,5,3,4,5,6,7},進行一次distinct,則獲得{1,2,3,4,5,6,7}ide

一個綜合的例子

考慮到intersection、subtract、union和distinct比較經常使用,且在一個案例中可以很好體現其特色。所以咱們此次獲取的數據集是兩個課程,lesson1和lesson2。lesson1中有十位同窗,每一個同窗都有着許多個能力的估值,該估值是一個Int類型數據。lesson2中也是如此。對於這兩個數據集我將其分別放在lesson1中和lesson2中。數據集和下面的代碼都可以在github上找到並下載。post

數據集分析

對於lesson1,裏面有不少同窗,每一個同窗又有不少次能力估值。在Spark入門(六)--Spark的combineByKey、sortBykey中已經提到過給每一個人的成績求平均分,所以這裏不作這個處理。

這兩個數據集咱們解決以下的問題:

  • 0、計算lesson1和lesson2中每一個同窗的能力總估值
  • 一、找出lesson1中全部的同窗(不重複)
  • 二、找出lesson2中全部同窗(不重複)
  • 三、找出選了兩門課程的同窗
  • 四、找出只在lesson1而不在lesson2中的同窗
  • 五、找出只在lesson2而不在lesson1中的同窗

數據的部份內容展現

對於第0個問題,由於用到的並不是本節的內容,所以標註爲0。要求每一個課程中的每一個同窗能力的總估值,首先要對數據進行處理,按空格拆分。拆分後的數據應該是(姓名,分數)的元組集合,而後根據姓名對分數進行累加。

  • 第一個問題中找出lesson1中全部同窗,只要獲得了每一個同窗能力的總估值,去掉分數,便可知道lesson1中的全部同窗。

  • 第二題同理。

  • 第三題要找出選了兩門課的同窗,則要對兩門課全部的同窗進行一次整合,而後剔除重複的數據,即先union再distinc

  • 第四題要找到lesson1中而不在lesson二中的同窗,則只要對lesson1的同窗和lesson2中的同窗進行一次substract便可

  • 第五題同理

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object SparkIntersectionAndSubtract {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtract")

    val sc = new SparkContext(conf)

    //課程一中的數據
    val lesson1Data = sc.textFile("./lesson1").map(line => (line.split(" ")(0),line.split(" ")(1).toInt))

    //將課程一中每一個人的分數相加
    val lesson1Grade = lesson1Data.reduceByKey(_+_)

    val lesson1Student = lesson1Grade.map(x=>x._1)

    //課程二中的數據處理
    val lesson2Data = sc.textFile("./lesson2").map(line => (line.split(" ")(0),line.split(" ")(1).toInt))

    //將課程二中每一個人的分數相加
    val lesson2Grade = lesson2Data.reduceByKey((x,y)=>x+y)

    val lesson2Student = lesson2Grade.map(x=>x._1)

    //在課程一中的人且在課程二中的人的集合
    println("Students On Lesson1 And On Lesson2")
    lesson1Student.intersection(lesson2Student).foreach(println)

    //在課程二中的人且在課程一中的人的集合,與上面的結果一致
    println("Students On Lesson1 And On Lesson2")
    lesson2Student.intersection(lesson1Student).foreach(println)

    //在課程一中的人但不在課程二中的人的集合
    println("Students Only In Lesson1")
    val onlyInLesson1 = lesson1Student.subtract(lesson2Student)
    onlyInLesson1.foreach(println)

    //在課程二中的人但不在課程二中的人的集合
    println("Students Only In Lesson2")
    val onlyInLesson2 = lesson2Student.subtract(lesson1Student)
    onlyInLesson2.foreach(println)


    //只選了一門課的同窗
    println("Students Only Choose One Lesson")
    lesson1Student.union(lesson2Student).foreach(println)

    //兩門課全部學生(不重複)
    println("All the students")
    lesson1Student.union(lesson2Student).distinct().foreach(print)


  }

}
複製代碼

java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;

public class SparkIntersectionAndSubtractJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("SparkIntersectionAndSubtractJava");

        JavaSparkContext sc = new JavaSparkContext(conf);

        //java7實現
        intersectionAndSubtractJava(sc);

        //java8實現
        intersectionAndSubtractJava8(sc);
    }


    public static void intersectionAndSubtractJava(JavaSparkContext sc){

        JavaRDD<String> lesson1Data = sc.textFile("./lesson1");

        JavaRDD<String> lesson2Data = sc.textFile("./lesson2");

        JavaPairRDD<String,Integer> lesson1InfoData = lesson1Data.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
            }
        });

        JavaPairRDD<String,Integer> lesson2InfoData = lesson2Data.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s.split(" ")[0],Integer.parseInt(s.split(" ")[1]));
            }
        });

        JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

        JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

        JavaRDD<String> lesson1Students = lesson1Grades.map(new Function<Tuple2<String, Integer>, String>() {
            @Override
            public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2._1;
            }
        });

        JavaRDD<String> lesson2Students = lesson2Grades.map(new Function<Tuple2<String, Integer>, String>() {
            @Override
            public String call(Tuple2<String, Integer> stringIntegerTuple2) throws Exception {
                return stringIntegerTuple2._1;
            }
        });

        //既在lesson1中又在lesson2中的學生
        System.out.println("Students On Lesson1 And On Lesson2");
        lesson1Students.intersection(lesson2Students).foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        //既在lesson2中又在lesson1中的學生,與上面的結果一致
        System.out.println("Students On Lesson1 And On Lesson2");
        lesson2Students.intersection(lesson1Students).foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        //只在lesson1中而不在lesson2中的學生
        JavaRDD<String> studensOnlyInLesson1 = lesson1Students.subtract(lesson2Students);
        System.out.println("Students Only In Lesson1");
        lesson1Students.subtract(lesson2Students).foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        //只在lesson2中而不在lesson1中的學生
        JavaRDD<String> studensOnlyInLesson2 = lesson2Students.subtract(lesson1Students);
        System.out.println("Students Only In Lesson2");
        studensOnlyInLesson2.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        //只選了一門課的學生
        JavaRDD<String> onlyOneLesson = studensOnlyInLesson1.union(studensOnlyInLesson2);
        System.out.println("Students Only Choose One Lesson");
        onlyOneLesson.foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

        System.out.println("All the students");
        lesson1Students.union(lesson2Students).distinct().foreach(new VoidFunction<String>() {
            @Override
            public void call(String s) throws Exception {
                System.out.println(s);
            }
        });

    }

    public static void intersectionAndSubtractJava8(JavaSparkContext sc){

        JavaRDD<String> lesson1Data = sc.textFile("./lesson1");

        JavaRDD<String> lesson2Data = sc.textFile("./lesson2");


        JavaPairRDD<String,Integer> lesson1InfoData =
        lesson1Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1])));


        JavaPairRDD<String,Integer> lesson2InfoData =
        lesson2Data.mapToPair(line -> new Tuple2<>(line.split(" ")[0],Integer.parseInt(line.split(" ")[1])));


        JavaPairRDD<String,Integer> lesson1Grades = lesson1InfoData.reduceByKey((x,y) -> x+y);

        JavaPairRDD<String,Integer> lesson2Grades = lesson2InfoData.reduceByKey((x,y) -> x+y);


        JavaRDD<String> studentsInLesson1 = lesson1Grades.map(x->x._1);

        JavaRDD<String> studentsInLesson2 = lesson2Grades.map(x->x._1);

        //既在lesson1中又在lesson2中的學生
        studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name));

        //既在lesson2中又在lesson1中的學生,與上面的結果一致
        studentsInLesson1.intersection(studentsInLesson2).foreach(name -> System.out.println(name));

        //只在lesson1中的學生
        JavaRDD<String> studentsOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2);
        studentsOnlyInLesson1.foreach(name -> System.out.println(name));

        //只在lesson2中的學生
        JavaRDD<String> studentsOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1);
        studentsOnlyInLesson2.foreach(name -> System.out.println(name));


        //只選了一門課的學生
        JavaRDD<String> studentsOnlyOneLesson = studentsOnlyInLesson1.union(studentsInLesson2);
        studentsOnlyOneLesson.foreach(name -> System.out.println(name));


        studentsInLesson1.union(studentsInLesson2).distinct().foreach(name -> System.out.println(name));


    }

}
複製代碼

python實現

from pyspark import SparkConf,SparkContext

conf = SparkConf().setMaster("local").setAppName("SparkCombineByKey")

sc = SparkContext(conf=conf)

#lesson1數據
lesson1Data = sc.textFile("./lesson1").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1])))

#lesson2數據
lesson2Data = sc.textFile("./lesson2").map(lambda x:(x.split(" ")[0],int(x.split(" ")[1])))

#lesson1中每一個人的總分
lesson1InfoData = lesson1Data.reduceByKey(lambda x,y:x+y)

#lesson2中每一個人的總分
lesson2InfoData = lesson2Data.reduceByKey(lambda x,y:x+y)

#lesson1中的學生
studentsInLesson1 = lesson1InfoData.map(lambda x:x[0])

#lesson2中的學生
studentsInLesson2 = lesson2InfoData.map(lambda x:x[0])

#在lesson1中且在lesson2中的學生
print("Students On Lesson1 And On Lesson2")
studentsInLesson1.intersection(studentsInLesson2).foreach(print)

#在lesson2中且在lesson1中的學生,與上面的結果一致
print("Students On Lesson1 And On Lesson2")
studentsInLesson2.intersection(studentsInLesson1).foreach(print)

#只在lesson1中的學生
print("Students Only In Lesson1")
studensOnlyInLesson1 = studentsInLesson1.subtract(studentsInLesson2)
studensOnlyInLesson1.foreach(print)


#只在lesson2中的學生
print("Students Only In Lesson2")
studensOnlyInLesson2 = studentsInLesson2.subtract(studentsInLesson1)
studensOnlyInLesson2.foreach(print)


#只選了一門課的學生
print("Students Only Choose One Lesson")
studensOnlyInLesson1.union(studensOnlyInLesson2).foreach(print)

#兩門課全部學生(不重複)
print("All the students")
studentsInLesson1.union(studentsInLesson2).distinct().foreach(print)


複製代碼

運行獲得結果

Students On Lesson1 And On Lesson2
Vicky
Amy
Lili
Bob
Coco

Students On Lesson1 And On Lesson2
Vicky
Amy
Lili
Coco
Bob

Students Only In Lesson1
Bill
David
Mike
Nancy
Lucy

Students Only In Lesson2
White
Jimmy
Jason
John
Frank

Students Only Choose One Lesson
Bill
David
Mike
Nancy
Lucy
White
Jimmy
Jason
John
Frank

All the students
Vicky
Bill
Amy
White
Jimmy
Jason
Lili
David
Bob
Mike
Coco
Nancy
Lucy
John
Frank
複製代碼

經過上面的例子,很是具體地應用了intersection、subtract、union和distinct來解決具體的問題。而且利用好這幾個方法可以很快速地進行一些數據集之間的關係操做。事實上,直接利用這幾種方法比咱們本身動手實現要好不少,由於spark中對這幾種方法進行了優化。

數據集和代碼都可以在github上找到並下載

相關文章
相關標籤/搜索