01Spark的TopN問題

和hadoop的目的同樣,給你數據,而後取TopN。數據以下:java

取出數據在排名前十的數據。apache

代碼以下:api

package com.test.book;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.PairFunction;

import scala.Tuple2;

public class SparkTon {

    public static void main(String[] args) {

        SparkConf conf = new SparkConf().setAppName("SparkTon").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaRDD<String> lines = sc.textFile("/Users/mac/Desktop/TopN2.txt");

        // 將數據讀進來,拆分爲Tuple(String,Integer)這種形式
        JavaPairRDD<String, Integer> pairRDD = lines.mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String t) throws Exception {
                // TODO Auto-generated method stub
                return new Tuple2<String, Integer>(t.split(",")[0], Integer.valueOf(t.split(",")[1]));
            }
        });

        // 按照整個分區來處理。
        JavaRDD<SortedMap<Integer, String>> pairspart = pairRDD
                .mapPartitions(new FlatMapFunction<Iterator<Tuple2<String, Integer>>, SortedMap<Integer, String>>() {

                    private static final long serialVersionUID = 1L;
                    SortedMap<Integer, String> top10 = new TreeMap<Integer, String>();

                    @Override
                    public Iterable<SortedMap<Integer, String>> call(Iterator<Tuple2<String, Integer>> t)
                            throws Exception {

                        while (t.hasNext()) {
                            Tuple2<String, Integer> tuple2 = t.next();

                            top10.put(tuple2._2, tuple2._1);
                            if (top10.size() > 10) {
                                top10.remove(top10.firstKey());
                            }
                        }
                        return Collections.singleton(top10);
                    }
                });

        // 把各個分區處理好的數據拿過來。
        List<SortedMap<Integer, String>> allTop10 = pairspart.collect();
        // 在Reduce端用TreeMap對以前的分區數據排序。
        SortedMap<Integer, String> finalmap = new TreeMap<Integer, String>();

        // 遍歷每一個分區的SortedMap結構
        for (SortedMap<Integer, String> localTop10 : allTop10) {

            for (Map.Entry<Integer, String> entry : localTop10.entrySet()) {

                finalmap.put(entry.getKey(), entry.getValue());
                if (finalmap.size() > 10) {
                    finalmap.remove(finalmap.firstKey());
                }

            }
        }

        // 打印出來。
        Set values = finalmap.keySet();

        Iterator<Integer> iterator = values.iterator();

        while (iterator.hasNext()) {

            System.out.println(finalmap.get(iterator.next()));

        }

    }

}

 

結果:ide

相關文章
相關標籤/搜索