Spark WordCount

Scala :java

package com.xp.cn

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

/**
 * Created by xupan on 2017/12/15.
 */
object WordCount {

  def main(args: Array[String]) {

    //SparkContext:spark執行入口
    val sc: SparkContext = new SparkContext(
      new SparkConf()
        .setAppName("WordCount")
        .setMaster("local")
    )

    //textFile:指定讀取數據的路徑
    //textFile(path: String,  minPartitions: Int = defaultMinPartitions):
    //minPartitions:若是不指定,默認和hdfs的block息息相關
    val textRdd: RDD[String] = sc.textFile("data/wordcount/wordcount.txt")

    //統計單詞個數
    val wordCount: RDD[(String, Int)] = textRdd.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).sortBy(_._2, false)

    //打印結果
    wordCount.collect().take(10).foreach(print)

    //保存到hdfs
    wordCount.saveAsTextFile("hdfs://xupan001:8020/user/root/spark/output/wordcount")

    //釋放資源
    sc.stop()
  }

}

 

 

 

java:apache

package com.xp.cn;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.*;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

/**
 * Created by xupan on 2017/12/15.
 */
public class WordCountJava {

    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(
                new SparkConf().setAppName("WordCountJava")
                        .setMaster("local")
        );

        JavaRDD<String> lineJavaRDD = sc.textFile("data/wordcount/wordcount.txt");

        //切分
        JavaRDD<String> wordJavaRDD = lineJavaRDD.flatMap(new FlatMapFunction<String, String>() {
            public Iterator<String> call(String line) throws Exception {
                return Arrays.asList(line.split(" ")).iterator();
            }
        });

        //計數
        JavaPairRDD<String, Integer> wordPairRDD = wordJavaRDD.mapToPair(new PairFunction<String, String, Integer>() {
            public Tuple2<String, Integer> call(String word) throws Exception {
                return new Tuple2<String, Integer>(word, 1);

            }
        });

        //聚合
        JavaPairRDD<String, Integer> reducePairRDD = wordPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        //反轉
        JavaPairRDD<Integer, String> resverRDD = reducePairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
            public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception {
                return new Tuple2<Integer, String>(t._2(), t._1());
            }
        });

        //排序
        JavaPairRDD<Integer, String> sortRDD = resverRDD.sortByKey(false);

        //再次反轉
        JavaPairRDD<String, Integer> reResverRDD = sortRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
            public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception {
                //return new Tuple2<String, Integer>(t._2(), t._1());
                return t.swap(); //
            }
        });

        //打印結果
        reResverRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() {
            public void call(Tuple2<String, Integer> pairs) throws Exception {
                System.out.println(pairs._1() + " : " + pairs._2());
            }
        });

        //關閉資源
        sc.close();


    }

}

 

java1.8api

package com.xp.cn;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * Created by xupan on 2017/12/15.
 */
public class WordJavaLamb {

    public static void main(String[] args) {
        JavaSparkContext sc = new JavaSparkContext(
                new SparkConf().setAppName("WordCountJava")
                        .setMaster("local")
        );

        JavaRDD<String> javaRDD = sc.textFile("data/wordcount/wordcount.txt");

        JavaRDD<String> lineRDD = javaRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator());

        JavaPairRDD<String, Integer> result = lineRDD
                .mapToPair(word -> new Tuple2<String, Integer>(word, 1))
                .reduceByKey((m, n) -> m + n)//聚合
                .mapToPair(tp -> tp.swap())//交換位置
                .sortByKey(false)//排序
                .mapToPair(tp -> tp.swap());//交換位置


        //打印結果
        result.foreach(wordcount -> System.out.println(wordcount._1() + ":" + wordcount._2()));

        //關閉資源
        sc.close();
    }
}
相關文章
相關標籤/搜索