Scala :java
package com.xp.cn import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created by xupan on 2017/12/15. */ object WordCount { def main(args: Array[String]) { //SparkContext:spark執行入口 val sc: SparkContext = new SparkContext( new SparkConf() .setAppName("WordCount") .setMaster("local") ) //textFile:指定讀取數據的路徑 //textFile(path: String, minPartitions: Int = defaultMinPartitions): //minPartitions:若是不指定,默認和hdfs的block息息相關 val textRdd: RDD[String] = sc.textFile("data/wordcount/wordcount.txt") //統計單詞個數 val wordCount: RDD[(String, Int)] = textRdd.flatMap(_.split(" ")).map(word => (word, 1)).reduceByKey(_ + _).sortBy(_._2, false) //打印結果 wordCount.collect().take(10).foreach(print) //保存到hdfs wordCount.saveAsTextFile("hdfs://xupan001:8020/user/root/spark/output/wordcount") //釋放資源 sc.stop() } }
java:apache
package com.xp.cn; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.*; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; /** * Created by xupan on 2017/12/15. */ public class WordCountJava { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext( new SparkConf().setAppName("WordCountJava") .setMaster("local") ); JavaRDD<String> lineJavaRDD = sc.textFile("data/wordcount/wordcount.txt"); //切分 JavaRDD<String> wordJavaRDD = lineJavaRDD.flatMap(new FlatMapFunction<String, String>() { public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); //計數 JavaPairRDD<String, Integer> wordPairRDD = wordJavaRDD.mapToPair(new PairFunction<String, String, Integer>() { public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); //聚合 JavaPairRDD<String, Integer> reducePairRDD = wordPairRDD.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); //反轉 JavaPairRDD<Integer, String> resverRDD = reducePairRDD.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() { public Tuple2<Integer, String> call(Tuple2<String, Integer> t) throws Exception { return new Tuple2<Integer, String>(t._2(), t._1()); } }); //排序 JavaPairRDD<Integer, String> sortRDD = resverRDD.sortByKey(false); //再次反轉 JavaPairRDD<String, Integer> reResverRDD = sortRDD.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() { public Tuple2<String, Integer> call(Tuple2<Integer, String> t) throws Exception { //return new Tuple2<String, Integer>(t._2(), t._1()); return t.swap(); // } }); //打印結果 reResverRDD.foreach(new VoidFunction<Tuple2<String, Integer>>() { public void call(Tuple2<String, Integer> pairs) throws Exception { System.out.println(pairs._1() + " : " + pairs._2()); } }); //關閉資源 sc.close(); } }
java1.8api
package com.xp.cn; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import scala.Tuple2; import java.util.Arrays; /** * Created by xupan on 2017/12/15. */ public class WordJavaLamb { public static void main(String[] args) { JavaSparkContext sc = new JavaSparkContext( new SparkConf().setAppName("WordCountJava") .setMaster("local") ); JavaRDD<String> javaRDD = sc.textFile("data/wordcount/wordcount.txt"); JavaRDD<String> lineRDD = javaRDD.flatMap(line -> Arrays.asList(line.split(" ")).iterator()); JavaPairRDD<String, Integer> result = lineRDD .mapToPair(word -> new Tuple2<String, Integer>(word, 1)) .reduceByKey((m, n) -> m + n)//聚合 .mapToPair(tp -> tp.swap())//交換位置 .sortByKey(false)//排序 .mapToPair(tp -> tp.swap());//交換位置 //打印結果 result.foreach(wordcount -> System.out.println(wordcount._1() + ":" + wordcount._2())); //關閉資源 sc.close(); } }