多種語言開發Spark-以WordCount爲例

Spark是目前最火爆的大數據計算框架,有趕超Hadoop MapReduce的趨勢。所以,趁着如今還有大多數人不懂得Spark開發的,趕忙好好學習吧,爲了使不一樣的開發人員可以很好的利用Spark,Spark官方提供了不一樣開發語言的API,本文以大數據經典入門案例WordCount爲例,開發多個版本的Spark應用程序,以知足不一樣的開發人員需求。編程

1、Scala:框架

  

    val conf: SparkConf = new SparkConf().setMaster("local")
    val sc: SparkContext = new SparkContext(conf)
    sc.textFile("test")
      .flatMap(line => {
        line.split("\t")
      })
      .mapPartitions(iter => {
        val list: List[(String, Int)] = List[(String, Int)]()
        iter.foreach(word => {
          list.::((word,1))
        })
        list.iterator
      })
      .reduceByKey(_ + _)
      .saveAsTextFile("result")

2、JDK1.7及如下版本:ide

  

SparkConf conf = new SparkConf().setAppName("JavaSparkTest").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
sc.textFile("test")
        .flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String t) throws Exception {
                return Arrays.asList(t.split("\t"));
            }
        }).mapToPair(new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String t) throws Exception {
                return new Tuple2<String, Integer>(t, 1);
            }
            
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {

            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1+v2;
            }
        }).saveAsTextFile("result");

3、JDK1.8:函數式編程

  因爲JDK1.8加入了新特性——函數式編程,所以,能夠利用JDK1.8的新特性簡化Java開發Spark的語句。函數

SparkConf conf = new SparkConf().setAppName("JavaSparkTest").setMaster("local");
JavaSparkContext sc = new JavaSparkContext(conf);
sc.textFile("test")
        .flatMap(line -> {
            return Arrays.asList(line.split("\t"));
        }).mapToPair(word -> {
            return new Tuple2<String, Integer>(word, 1);
        }).reduceByKey((x, y) -> {
            return x + y;
        }).saveAsTextFile("result");

  是否是以爲比上述的Scala還簡潔呢?實際上是這樣的,Scala中使用了mapPartitions是對map函數的優化,即對每個RDD的分區進行map操做,這樣就減小了對象的建立,從而加速了計算。而Java中,經過個人測試,不能使用mapPartitions方法進行上述優化,只能使用map方法(不知道爲啥),這樣也可使用,可是在大數據集面前,其性能就遜色於mapPartitions了。oop

 4、Python:性能

from pyspark import SparkContext
from pyspark import SparkConf as conf
conf.setAppName("WordCount").setMaster("local")
sc = SparkContext(conf)

text_file = sc.textFile("test")\
    .flatMap(lambda line: line.split("\t"))\
    .map(lambda word: (word, 1))\
    .reduceByKey(lambda x, y: x + y)\
    .saveAsTextFile("test")
相關文章
相關標籤/搜索