Spark入門(三)--Spark經典的單詞統計

spark經典之單詞統計

準備數據

既然要統計單詞咱們就須要一個包含必定數量的文本,咱們這裏選擇了英文原著《GoneWithTheWind》(《飄》)的文原本作一個數據統計,看看文章中各個單詞出現頻次如何。爲了便於你們下載文本。能夠到GitHub上下載文本以及對應的代碼。我將文本放在項目的目錄下。java

首先咱們要讀取該文件,就要用到SparkContext中的textFile的方法,咱們嘗試先讀取第一行。python

scala實現git

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WordCount")

    val sc = new SparkContext(conf)

    println(sc.textFile("./GoneWithTheWind").first())
  }

}
複製代碼

java實現github

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;

public class WordCountJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");

        JavaSparkContext sc = new JavaSparkContext(conf);

        System.out.println(sc.textFile("./GoneWithTheWind").first());
        
    }

}
複製代碼

python實現apache

from pyspark import SparkConf,SparkContext


conf = SparkConf().setMaster("local").setAppName("HelloWorld")

sc = SparkContext(conf=conf)

print(sc.textFile("./GoneWithTheWind").first())
複製代碼

獲得輸出api

Chapter 1
複製代碼

以scala爲例,其他兩種語言也差很少。第一步咱們建立了一個SparkConf數組

val conf = new SparkConf().setMaster("local").setAppName("WordCount")
複製代碼

這裏咱們設置Master爲local,該程序名稱爲WordCount,固然程序名稱能夠任意取,和類名不一樣也無妨。可是這個Master則不能亂寫,當咱們在集羣上運行,用spark-submit的時候,則要注意。咱們如今只討論本地的寫法,所以,這裏只寫local。bash

接着一句咱們建立了一個SparkContext,這是spark的核心,咱們將conf配置傳入初始化app

val sc = new SparkContext(conf)
複製代碼

最後咱們將文本路徑告訴SparkContext,而後輸出第一行內容dom

println(sc.textFile("./GoneWithTheWind").first())
複製代碼

開始統計

接着咱們就能夠開始統計文本的單詞數了,由於單詞是以空格劃分,因此咱們能夠把空格做爲單詞的標記。

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WordCount")

    val sc = new SparkContext(conf)

    //設置數據路徑
    val text = sc.textFile("./GoneWithTheWind")

    //將文本數據按行處理,每行按空格拆成一個數組
    // flatMap會將各個數組中元素合成一個大的集合
    val textSplit = text.flatMap(line =>line.split(" "))

    //處理合並後的集合中的元素,每一個元素的值爲1,返回一個元組(key,value)
    //其中key爲單詞,value這裏是1,即該單詞出現一次
    val textSplitFlag = textSplit.map(word => (word,1))

    //reduceByKey會將textSplitFlag中的key相同的放在一塊兒處理
    //傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
    val countWord = textSplitFlag.reduceByKey((x,y)=>x+y)

    //將計算後的結果存在項目目錄下的result目錄中
    countWord.saveAsTextFile("./result")
  }

}
複製代碼

java實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class WordCountJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");

        JavaSparkContext sc = new JavaSparkContext(conf);

        //設置數據的路徑
        JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");


        //將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
        //這裏須要注意的是FlatMapFunction中<String, String>,第一個表示輸入,第二個表示輸出
        //與Hadoop中的map-reduce很是類似
        JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        //處理合並後的集合中的元素,每一個元素的值爲1,返回一個Tuple2,Tuple2表示兩個元素的元組
        //值得注意的是上面是JavaRDD,這裏是JavaPairRDD,在返回的是元組時須要注意這個區別
        //PairFunction中<String, String, Integer>,第一個String是輸入值類型
        //第二第三個,String, Integer是返回值類型
        //這裏返回的是一個word和一個數值1,表示這個單詞出現一次
        JavaPairRDD<String, Integer> splitFlagRDD = splitRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<>(s,1);
            }
        });


        //reduceByKey會將splitFlagRDD中的key相同的放在一塊兒處理
        //傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
        JavaPairRDD<String, Integer> countRDD = splitFlagRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer integer, Integer integer2) throws Exception {
                return integer+integer2;
            }
        });

        //將計算後的結果存在項目目錄下的result目錄中
        countRDD.saveAsTextFile("./resultJava");



    }

}
複製代碼

python實現

from pyspark import SparkConf,SparkContext


conf = SparkConf().setMaster("local").setAppName("HelloWorld")

sc = SparkContext(conf=conf)

# 設置數據的路徑
textData = sc.textFile("./GoneWithTheWind")

# 將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
splitData = textData.flatMap(lambda line:line.split(" "))

# 處理合並後的集合中的元素,每一個元素的值爲1,返回一個元組(key,value)
# 其中key爲單詞,value這裏是1,即該單詞出現一次
flagData = splitData.map(lambda word:(word,1))

# reduceByKey會將textSplitFlag中的key相同的放在一塊兒處理
# 傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
countData = flagData.reduceByKey(lambda x,y:x+y)

#輸出文件
countData.saveAsTextFile("./result")
複製代碼

運行後在住目錄下獲得一個名爲result的目錄,該目錄以下圖,SUCCESS表示生成文件成功,文件內容存儲在part-00000中

咱們能夠查看文件的部份內容:

('Chapter', 1)
('1', 1)
('SCARLETT', 1)
('O’HARA', 1)
('was', 74)
('not', 33)
('beautiful,', 1)
('but', 32)
('men', 4)
('seldom', 3)
('realized', 2)
('it', 37)
('when', 19)
('caught', 1)
('by', 20)
('her', 65)
('charmas', 1)
('the', 336)
('Tarleton', 7)
('twins', 16)
('were.', 1)
('In', 1)
('face', 6)
('were', 49)
...
...
...
...
複製代碼

這樣就完成了一個spark的真正HelloWorld程序--單詞計數。對比三個語言版本的程序,發現一個事實那就是,用scala和python寫的代碼很是簡潔並且易懂,而Java實現的則相對複雜,難懂。固然這個易懂和難懂是相對而言的。若是你只會Java不管如何你都應該從中能看懂java的程序,而簡潔的scala和python對你來講根本看不懂。這也無妨,語言只是工具,重點看你怎麼用。何況,咱們使用java8的特性也能夠寫出簡潔的代碼。

java8實現

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;
import java.util.Iterator;

public class WordCountJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");

        JavaSparkContext sc = new JavaSparkContext(conf);

        countJava8(sc);

    }


    public static void countJava8(JavaSparkContext sc){


        sc.textFile("./GoneWithTheWind")
          .flatMap(s->Arrays.asList(s.split(" ")).iterator())
          .mapToPair(s->new Tuple2<>(s,1))
          .reduceByKey((x,y)->x+y)
          .saveAsTextFile("./resultJava8");


    }

}
複製代碼

spark的優越性在這個小小的程序中已經有所體現,計算一本書的每一個單詞出現的次數,spark在單機上運行(讀取文件、生成臨時文件、將結果寫到硬盤),加載-運行-結束只花費了2秒時間。

對程序進行優化

程序是否還能再簡單高效呢?固然是能夠的,咱們能夠用countByValue這個函數,這個函數正是經常使用的計數的方法。

scala實現

import org.apache.spark.{SparkConf, SparkContext}

object WordCount {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local").setAppName("WordCount")

    val sc = new SparkContext(conf)
    

    //設置數據路徑
    val text = sc.textFile("./GoneWithTheWind")

    //將文本數據按行處理,每行按空格拆成一個數組
    // flatMap會將各個數組中元素合成一個大的集合
    val textSplit = text.flatMap(line =>line.split(" "))
    
    println(textSplit.countByValue())
  }
}
複製代碼

運行獲得結果

Map(Heknew -> 1, &emsp;&emsp;「Ashley -> 1, 「Let’s -> 1, anarresting -> 1, of. -> 1, pasture -> 1, war’s -> 1, wall. -> 1, looks -> 2, ain’t -> 7,.......
複製代碼

java實現

public class WordCountJava {

    public static void main(String[] args){

        SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");

        JavaSparkContext sc = new JavaSparkContext(conf);
        countJava(sc);

    }
    
     public static void countJava(JavaSparkContext sc){
        //設置數據的路徑
        JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");


        //將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
        //這裏須要注意的是FlatMapFunction中<String, String>,第一個表示輸入,第二個表示輸出
        //與Hadoop中的map-reduce很是類似
        JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterator<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" ")).iterator();
            }
        });

        System.out.println(splitRDD.countByValue());

    }

}
複製代碼

運行獲得結果

{Heknew=1, &emsp;&emsp;「Ashley=1, 「Let’s=1, anarresting=1, of.=1, pasture=1, war’s=1, wall.=1, looks=2, ain’t=7, Clayton=1, approval.=1, ideas=1,
複製代碼

python實現

from pyspark import SparkConf,SparkContext


conf = SparkConf().setMaster("local").setAppName("HelloWorld")

sc = SparkContext(conf=conf)

# 設置數據的路徑
textData = sc.textFile("./GoneWithTheWind")

# 將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
splitData = textData.flatMap(lambda line:line.split(" "))

print(splitData.countByValue())
複製代碼

運行獲得結果:

defaultdict(<class 'int'>, {'Chapter': 1, '1': 1, 'SCARLETT': 1, 'O’HARA': 1, 'was': 74, 'not': 33, 'beautiful,': 1, 'but': 32, 'men': 4, 
複製代碼

spark的優越性在這個小小的程序中已經有所體現,計算一本書的每一個單詞出現的次數,spark在單機上運行(讀取文件、生成臨時文件、將結果寫到硬盤),加載-運行-結束只花費了2秒時間。若是想要獲取源代碼以及數據內容,能夠前往個人 github下載。
相關文章
相關標籤/搜索