和hadoop的目的同樣,給你數據,而後取TopN。數據以下:java
取出數據在排名前十的數據。apache
代碼以下:api
package com.test.book; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.SortedMap; import java.util.TreeMap; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.PairFunction; import scala.Tuple2; public class SparkTon { public static void main(String[] args) { SparkConf conf = new SparkConf().setAppName("SparkTon").setMaster("local"); JavaSparkContext sc = new JavaSparkContext(conf); JavaRDD<String> lines = sc.textFile("/Users/mac/Desktop/TopN2.txt"); // 將數據讀進來,拆分爲Tuple(String,Integer)這種形式 JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String t) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(t.split(",")[0], Integer.valueOf(t.split(",")[1])); } }); // 按照整個分區來處理。 JavaRDD<SortedMap<Integer, String>> pairspart = pairRDD .mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() { private static final long serialVersionUID = 1L; SortedMap<Integer, String> top10 = new TreeMap<Integer, String>(); @Override public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> t) throws Exception { while (t.hasNext()) { Tuple2<String, Integer> tuple2 = t.next(); top10.put(tuple2._2, tuple2._1); if (top10.size() > 10) { top10.remove(top10.firstKey()); } } return Collections.singleton(top10); } }); // 把各個分區處理好的數據拿過來。 List<SortedMap<Integer, String>> allTop10 = pairspart.collect(); // 在Reduce端用TreeMap對以前的分區數據排序。 SortedMap<Integer, String> finalmap = new TreeMap<Integer, String>(); // 遍歷每一個分區的SortedMap結構 for (SortedMap<Integer, String> localTop10 : allTop10) { for (Map.Entry<Integer, String> entry : localTop10.entrySet()) { finalmap.put(entry.getKey(), entry.getValue()); if (finalmap.size() > 10) { finalmap.remove(finalmap.firstKey()); } } } // 打印出來。 Set values = finalmap.keySet(); Iterator<Integer> iterator = values.iterator(); while (iterator.hasNext()) { System.out.println(finalmap.get(iterator.next())); } } }
結果:ide