package sparkcore.java;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.VoidFunction;
import scala.Tuple2;
/**
* transformation操做實戰
*/
public class TransformationOperation {
public static void main(String[] args) {
// map();
// filter();
// flatMap();
// groupByKey();
// reduceByKey();
// sortByKey();
// sortBy();
join();
cogroup();
}
/**
* map算子案例:將集合中每個元素都乘以2
*/
public static void map() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("map").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5);
// 並行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 使用map算子,將集合中的每一個元素都乘以2
// map算子,是對任何類型的RDD,均可以調用的
// 在java中,map算子接收的參數是Function對象
// 建立的Function對象,必定會讓你設置第二個泛型參數,這個泛型類型,就是返回的新元素的類型
// 同時call()方法的返回類型,也必須與第二個泛型類型相同
// 在call()方法內部,就能夠對原始RDD中的每個元素進行各類處理和計算,並返回一個新的元素
// 全部新的元素就會組成一個新的RDD
JavaRDD<Integer> multipleNumberRDD = numberRDD.map(new Function<Integer, Integer>() {
private static final long serialVersionUID = 1L;
// 傳入call()方法的,就是1,2,3,4,5
// 返回的就是2,4,6,8,10
@Override
public Integer call(Integer v1) throws Exception {
return v1 * 2;
}
});
// 打印新的RDD
multipleNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
// 輸出結果:
// 2
// 4
// 6
// 8
// 10
});
// 關閉JavaSparkContext
sc.close();
}
/**
* filter算子案例:過濾集合中的偶數
*/
public static void filter() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("filter").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Integer> numbers = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
// 並行化集合,建立初始RDD
JavaRDD<Integer> numberRDD = sc.parallelize(numbers);
// 對初始RDD執行filter算子,過濾出其中的偶數
// filter算子,傳入的也是Function,其餘的使用注意點,實際上和map是同樣的
// 可是,惟一的不一樣,就是call()方法的返回類型是Boolean
// 每個初始RDD中的元素,都會傳入call()方法,此時你能夠執行各類自定義的計算邏輯
// 來判斷這個元素是不是你想要的
// 若是你想在新的RDD中保留這個元素,那麼就返回true;不然,不想保留這個元素,返回false
JavaRDD<Integer> evenNumberRDD = numberRDD.filter(new Function<Integer, Boolean>() {
private static final long serialVersionUID = 1L;
// 在這裏,1到10,都會傳入進來
// 可是根據咱們的邏輯,只有2,4,6,8,10這幾個偶數,會返回true
// 因此,只有偶數會保留下來,放在新的RDD中
@Override
public Boolean call(Integer v1) throws Exception {
return v1 % 2 == 0;
}
});
// 打印新的RDD
evenNumberRDD.foreach(new VoidFunction<Integer>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Integer t) throws Exception {
System.out.println(t);
}
// 輸出結果:
// 2
// 4
// 6
// 8
// 10
});
// 關閉JavaSparkContext
sc.close();
}
/**
* flatMap案例:將文本行拆分爲多個單詞
*/
public static void flatMap() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("flatMap").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 構造集合
List<String> lineList = Arrays.asList("hello you", "hello me", "hello world");
// 並行化集合,建立RDD
JavaRDD<String> lines = sc.parallelize(lineList);
// 對RDD執行flatMap算子,將每一行文本,拆分爲多個單詞
// flatMap算子,在java中,接收的參數是FlatMapFunction
// 咱們須要本身定義FlatMapFunction的第二個泛型類型,即,表明了返回的新元素的類型
// call()方法,返回的類型,不是U,而是Iterator<U>,這裏的U也與第二個泛型類型相同
// flatMap其實就是,接收原始RDD中的每一個元素,並進行各類邏輯的計算和處理,返回能夠返回多個元素
// 多個元素,即封裝在Iterator集合中,可使用ArrayList等集合
// 新的RDD中,即封裝了全部的新元素;也就是說,新的RDD的大小必定是 >= 原始RDD的大小
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
private static final long serialVersionUID = 1L;
// 在這裏會,好比,傳入第一行,hello you
// 返回的是一個Iterator<String>(hello, you)
@Override
public Iterator<String> call(String t) throws Exception {
return Arrays.asList(t.split(" ")).iterator();
}
});
// 打印新的RDD
words.foreach(new VoidFunction<String>() {
private static final long serialVersionUID = 1L;
@Override
public void call(String t) throws Exception {
System.out.println(t);
}
// 輸出結果:
// hello
// you
// hello
// me
// hello
// world
});
// 關閉JavaSparkContext
sc.close();
}
/**
* groupByKey案例:按照班級對成績進行分組
*/
public static void groupByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("groupByKey").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));
// 並行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
// 針對scores RDD,執行groupByKey算子,對每一個班級的成績進行分組
// groupByKey算子,返回的仍是JavaPairRDD
// 可是,JavaPairRDD的第一個泛型類型不變,第二個泛型類型變成Iterable這種集合類型
// 也就是說,按照了key進行分組,那麼每一個key可能都會有多個value,此時多個value聚合成了Iterable
// 那麼接下來,咱們是否是就能夠經過groupedScores這種JavaPairRDD,很方便地處理每一個分組內的數據
JavaPairRDD<String, Iterable<Integer>> groupedScores = scores.groupByKey();
// 打印groupedScores RDD
groupedScores.foreach(new VoidFunction<Tuple2<String, Iterable<Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Iterable<Integer>> t) throws Exception {
System.out.println("class: " + t._1);
Iterator<Integer> ite = t._2.iterator();
while (ite.hasNext()) {
System.out.println(ite.next());
}
System.out.println("==============================");
}
// 輸出結果:
// class: class1
// 80
// 90
// ==============================
// class: class2
// 75
// 65
// ==============================
});
// 關閉JavaSparkContext
sc.close();
}
/**
* reduceByKey案例:統計每一個班級的總分
*/
public static void reduceByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("reduceByKey").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("class1", 80),
new Tuple2<String, Integer>("class2", 75), new Tuple2<String, Integer>("class1", 90),
new Tuple2<String, Integer>("class2", 65));
// 並行化集合,建立JavaPairRDD
JavaPairRDD<String, Integer> scores = sc.parallelizePairs(scoreList);
// 針對scores RDD,執行reduceByKey算子
// reduceByKey,接收的參數是Function2類型,它有三個泛型參數,實際上表明瞭三個值
// 第一個泛型類型和第二個泛型類型,表明了原始RDD中的元素的value的類型
// 所以對每一個key進行reduce,都會依次將第一個、第二個value傳入,將值再與第三個value傳入
// 所以此處,會自動定義兩個泛型類型,表明call()方法的兩個傳入參數的類型
// 第三個泛型類型,表明了每次reduce操做返回的值的類型,默認也是與原始RDD的value類型相同的
// reduceByKey算法返回的RDD,仍是JavaPairRDD<key, value>
JavaPairRDD<String, Integer> totalScores = scores.reduceByKey(new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
// 對每一個key,都會將其value,依次傳入call方法
// 從而聚合出每一個key對應的一個value
// 而後,將每一個key對應的一個value,組合成一個Tuple2,做爲新RDD的元素
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
});
// 打印totalScores RDD
totalScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}
// 輸出結果:
// class1: 170
// class2: 140
});
// 關閉JavaSparkContext
sc.close();
}
/**
* sortByKey案例:按照學生分數進行排序,分數爲Key
*/
public static void sortByKey() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<Integer, String>> scoreList = Arrays.asList(new Tuple2<Integer, String>(65, "leo"),
new Tuple2<Integer, String>(50, "tom"), new Tuple2<Integer, String>(100, "marry"), new Tuple2<Integer, String>(80, "jack"));
// 並行化集合,建立RDD
JavaPairRDD<Integer, String> scores = sc.parallelizePairs(scoreList);
// 對scores RDD執行sortByKey算子
// sortByKey其實就是根據key進行排序,能夠手動指定升序,或者降序(false時)
// 返回的,仍是JavaPairRDD,其中的元素內容,都是和原始的RDD如出一轍的
// 可是就是RDD中的元素的順序,不一樣了
JavaPairRDD<Integer, String> sortedScores = scores.sortByKey(false);
// 打印sortedScored RDD
sortedScores.foreach(new VoidFunction<Tuple2<Integer, String>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, String> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}
// 輸出結果:
// 100: marry
// 80: jack
// 65: leo
// 50: tom
});
// 關閉JavaSparkContext
sc.close();
}
/**
* sortByKey案例:按照學生分數進行排序,分數爲value
*/
public static void sortBy() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("sortByKey").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<String, Integer>> scoreList = Arrays.asList(new Tuple2<String, Integer>("leo", 65),
new Tuple2<String, Integer>("tom", 50), new Tuple2<String, Integer>("marry", 100), new Tuple2<String, Integer>("jack", 80));
// 注:只有JavaRDD纔有sortBy方法,而JavaPairRDD是沒有的
JavaRDD<Tuple2<String, Integer>> scores = sc.parallelize(scoreList);
// 根據value值進行降序排序
JavaRDD<Tuple2<String, Integer>> sortedScores = scores.sortBy(new Function<Tuple2<String, Integer>, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Tuple2<String, Integer> v1) throws Exception {
return v1._2;// 返回待排序的值,這裏根據value進行排序,而非key
}
}, false, 1);
// 打印sortedScored RDD
sortedScores.foreach(new VoidFunction<Tuple2<String, Integer>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<String, Integer> t) throws Exception {
System.out.println(t._1 + ": " + t._2);
}
// 輸出結果:
// marry: 100
// jack: 80
// leo: 65
// tom: 50
});
// 關閉JavaSparkContext
sc.close();
}
/**
* join案例:打印學生成績
*/
public static void join() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("join").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom"));
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60), new Tuple2<Integer, Integer>(1, 70), new Tuple2<Integer, Integer>(2, 80),
new Tuple2<Integer, Integer>(3, 50));
// 並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
// 使用join算子關聯兩個RDD
// join會根據key進行join,並返回JavaPairRDD
// 該JavaPairRDD的第一個泛型類型爲兩個原始JavaPairRDD的key的類型(兩個Key類型相同)
// 第二個泛型類型,是Tuple2<v1, v2>的類型,Tuple2的兩個泛型分別爲兩個原始RDD的value的類型
// 什麼意思呢?好比有(1, 1) (1, 2) (1, 3)的一個RDD
// 還有一個(1, 4) (2, 1) (2, 2)的一個RDD
// join之後,實際上會獲得(1 (1, 4)) (1, (2, 4)) (1, (3, 4))
JavaPairRDD<Integer, Tuple2<String, Integer>> studentScores = students.join(scores);
// 打印studnetScores RDD
studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<String, Integer>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, Tuple2<String, Integer>> t) throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("===============================");
}
// 輸出結果:
// student id: 1
// student name: leo
// student score: 100
// ===============================
// student id: 1
// student name: leo
// student score: 70
// ===============================
// student id: 3
// student name: tom
// student score: 60
// ===============================
// student id: 3
// student name: tom
// student score: 50
// ===============================
// student id: 2
// student name: jack
// student score: 90
// ===============================
// student id: 2
// student name: jack
// student score: 80
// ===============================
});
// 關閉JavaSparkContext
sc.close();
}
/**
* cogroup案例:打印學生成績
*/
public static void cogroup() {
// 建立SparkConf
SparkConf conf = new SparkConf().setAppName("cogroup").setMaster("local");
// 建立JavaSparkContext
JavaSparkContext sc = new JavaSparkContext(conf);
// 模擬集合
List<Tuple2<Integer, String>> studentList = Arrays.asList(new Tuple2<Integer, String>(1, "leo"),
new Tuple2<Integer, String>(2, "jack"), new Tuple2<Integer, String>(3, "tom"));
List<Tuple2<Integer, Integer>> scoreList = Arrays.asList(new Tuple2<Integer, Integer>(1, 100), new Tuple2<Integer, Integer>(2, 90),
new Tuple2<Integer, Integer>(3, 60), new Tuple2<Integer, Integer>(1, 70), new Tuple2<Integer, Integer>(2, 80),
new Tuple2<Integer, Integer>(3, 50));
// 並行化兩個RDD
JavaPairRDD<Integer, String> students = sc.parallelizePairs(studentList);
JavaPairRDD<Integer, Integer> scores = sc.parallelizePairs(scoreList);
// cogroup與join不一樣
// 至關因而,一個key join上的全部value,都給放到一個Iterable裏面去了
// cogroup,不太好講解,但願你們經過動手編寫咱們的案例,仔細體會其中的奧妙
JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> studentScores = students.cogroup(scores);
// 打印studnetScores RDD
studentScores.foreach(new VoidFunction<Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>>>() {
private static final long serialVersionUID = 1L;
@Override
public void call(Tuple2<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> t) throws Exception {
System.out.println("student id: " + t._1);
System.out.println("student name: " + t._2._1);
System.out.println("student score: " + t._2._2);
System.out.println("===============================");
}
// 輸出結果:
// student id: 1
// student name: [leo]
// student score: [100, 70]
// ===============================
// student id: 3
// student name: [tom]
// student score: [60, 50]
// ===============================
// student id: 2
// student name: [jack]
// student score: [90, 80]
// ===============================
});
// 關閉JavaSparkContext
sc.close();
}
}