咱們首先用idea來搭建Spark項目,具體能夠參考提交第一個Spark統計文件單詞數程序,配合hadoop hdfs ,只不過咱們如今用java語言來編寫,而不是Scala.java
二次排序問題解決方案緩存
先用第一種方案來處理服務器
public class SecondSort { public static void main(String[] args) { SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SecondSort"); JavaSparkContext scc = new JavaSparkContext(conf); JavaRDD<String> rdd = scc.textFile(args[0]); JavaPairRDD<String, Tuple2<Integer,Integer>> pairs = rdd.mapToPair(s -> { String[] tokens = s.split(","); System.out.println(tokens[0] + "," + tokens[1] + "," + tokens[2]); Integer time = Integer.parseInt(tokens[1]); Integer value = Integer.parseInt(tokens[2]); Tuple2<Integer, Integer> timeValue = new Tuple2<>(time, value); return new Tuple2<String, Tuple2 < Integer, Integer >> (tokens[0], timeValue); }); List<Tuple2<String, Tuple2<Integer, Integer>>> outPut = pairs.collect(); outPut.stream().forEach(t -> { Tuple2<Integer, Integer> timeValue = t._2; System.out.println(t._1 + "," + timeValue._1 + "," + timeValue._1); }); JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groups = pairs.groupByKey(); List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> outPut2 = groups.collect(); outPut2.stream().forEach(t -> { Iterable<Tuple2<Integer, Integer>> list = t._2; System.out.println(t._1); while (list.iterator().hasNext()) { Tuple2<Integer, Integer> t2 = list.iterator().next(); System.out.println(t2._1 + "," + t2._2); } }); JavaPairRDD<String, List<Tuple2<Integer, Integer>>> sorted = groups.mapValues(s -> { List<Tuple2<Integer, Integer>> newList = new ArrayList<>(); while (s.iterator().hasNext()) { newList.add(s.iterator().next()); } Collections.sort(newList, SparkTupleComparator.INSTANCE); return newList; }); List<Tuple2<String, List<Tuple2<Integer, Integer>>>> outPut3 = sorted.collect(); outPut3.stream().forEach(t -> { List<Tuple2<Integer, Integer>> list = t._2; System.out.println(t._1); list.stream().forEach(t2 -> System.out.println(t2._1 + "," + t2._2)); }); } }
public class SparkTupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable { public static final SparkTupleComparator INSTANCE = new SparkTupleComparator(); private SparkTupleComparator() { } @Override public int compare(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2){ return t1._1.compareTo(t2._1); } }