Spark算法整理(Java版)

咱們首先用idea來搭建Spark項目,具體能夠參考提交第一個Spark統計文件單詞數程序,配合hadoop hdfs ,只不過咱們如今用java語言來編寫,而不是Scala.java

  • 問題描述:二次排序

二次排序問題解決方案緩存

  1. 讓歸約器讀取和緩存給定鍵的全部值(例如使用一個集合),而後對這些值完成一個歸約器中排序,這種方法不具備可伸縮性,由於歸約器要接收一個給定鍵的全部值,這種方法可能致使歸約器耗盡內存。另外一方面,若是值數量不多,不會致使內存溢出錯誤,那麼這種方法就是適用的。
  2. 使用Spark框架對規約器值排序(這種作法不須要對傳入歸約器的值完成歸約器中排序)。這種方法「會爲天然鍵增長部分或整個值來建立一個組合鍵以實現排序目標」。這種方法是可伸縮的(不會受商用服務器內存的限制)。

先用第一種方案來處理服務器

public class SecondSort {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("SecondSort");
        JavaSparkContext scc = new JavaSparkContext(conf);
        JavaRDD<String> rdd = scc.textFile(args[0]);
        JavaPairRDD<String, Tuple2<Integer,Integer>> pairs = rdd.mapToPair(s -> {
            String[] tokens = s.split(",");
            System.out.println(tokens[0] + "," + tokens[1] + "," + tokens[2]);
            Integer time = Integer.parseInt(tokens[1]);
            Integer value = Integer.parseInt(tokens[2]);
            Tuple2<Integer, Integer> timeValue = new Tuple2<>(time, value);
            return new Tuple2<String, Tuple2 < Integer, Integer >> (tokens[0], timeValue);
        });
        List<Tuple2<String, Tuple2<Integer, Integer>>> outPut = pairs.collect();
        outPut.stream().forEach(t -> {
            Tuple2<Integer, Integer> timeValue = t._2;
            System.out.println(t._1 + "," + timeValue._1 + "," + timeValue._1);
        });
        JavaPairRDD<String, Iterable<Tuple2<Integer, Integer>>> groups = pairs.groupByKey();
        List<Tuple2<String, Iterable<Tuple2<Integer, Integer>>>> outPut2 = groups.collect();
        outPut2.stream().forEach(t -> {
            Iterable<Tuple2<Integer, Integer>> list = t._2;
            System.out.println(t._1);
            while (list.iterator().hasNext()) {
                Tuple2<Integer, Integer> t2 = list.iterator().next();
                System.out.println(t2._1 + "," + t2._2);
            }
        });
        JavaPairRDD<String, List<Tuple2<Integer, Integer>>> sorted = groups.mapValues(s -> {
            List<Tuple2<Integer, Integer>> newList = new ArrayList<>();
            while (s.iterator().hasNext()) {
                newList.add(s.iterator().next());
            }
            Collections.sort(newList, SparkTupleComparator.INSTANCE);
            return newList;
        });
        List<Tuple2<String, List<Tuple2<Integer, Integer>>>> outPut3 = sorted.collect();
        outPut3.stream().forEach(t -> {
            List<Tuple2<Integer, Integer>> list = t._2;
            System.out.println(t._1);
            list.stream().forEach(t2 -> System.out.println(t2._1 + "," + t2._2));
        });
    }
}
public class SparkTupleComparator implements Comparator<Tuple2<Integer, Integer>>, Serializable {

    public static final SparkTupleComparator INSTANCE = new SparkTupleComparator();

    private SparkTupleComparator() {
    }

    @Override
    public int compare(Tuple2<Integer, Integer> t1, Tuple2<Integer, Integer> t2){
        return t1._1.compareTo(t2._1);
}
}
相關文章
相關標籤/搜索