既然要統計單詞咱們就須要一個包含必定數量的文本,咱們這裏選擇了英文原著《GoneWithTheWind》(《飄》)的文原本作一個數據統計,看看文章中各個單詞出現頻次如何。爲了便於你們下載文本。能夠到GitHub上下載文本以及對應的代碼。我將文本放在項目的目錄下。java
首先咱們要讀取該文件,就要用到SparkContext中的textFile的方法,咱們嘗試先讀取第一行。python
scala實現git
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
println(sc.textFile("./GoneWithTheWind").first())
}
}
複製代碼
java實現github
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
System.out.println(sc.textFile("./GoneWithTheWind").first());
}
}
複製代碼
python實現apache
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
print(sc.textFile("./GoneWithTheWind").first())
複製代碼
獲得輸出api
Chapter 1
複製代碼
以scala爲例,其他兩種語言也差很少。第一步咱們建立了一個SparkConf數組
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
複製代碼
這裏咱們設置Master爲local,該程序名稱爲WordCount,固然程序名稱能夠任意取,和類名不一樣也無妨。可是這個Master則不能亂寫,當咱們在集羣上運行,用spark-submit的時候,則要注意。咱們如今只討論本地的寫法,所以,這裏只寫local。bash
接着一句咱們建立了一個SparkContext,這是spark的核心,咱們將conf配置傳入初始化app
val sc = new SparkContext(conf)
複製代碼
最後咱們將文本路徑告訴SparkContext,而後輸出第一行內容dom
println(sc.textFile("./GoneWithTheWind").first())
複製代碼
接着咱們就能夠開始統計文本的單詞數了,由於單詞是以空格劃分,因此咱們能夠把空格做爲單詞的標記。
scala實現
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
//設置數據路徑
val text = sc.textFile("./GoneWithTheWind")
//將文本數據按行處理,每行按空格拆成一個數組
// flatMap會將各個數組中元素合成一個大的集合
val textSplit = text.flatMap(line =>line.split(" "))
//處理合並後的集合中的元素,每一個元素的值爲1,返回一個元組(key,value)
//其中key爲單詞,value這裏是1,即該單詞出現一次
val textSplitFlag = textSplit.map(word => (word,1))
//reduceByKey會將textSplitFlag中的key相同的放在一塊兒處理
//傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
val countWord = textSplitFlag.reduceByKey((x,y)=>x+y)
//將計算後的結果存在項目目錄下的result目錄中
countWord.saveAsTextFile("./result")
}
}
複製代碼
java實現
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
//設置數據的路徑
JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");
//將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
//這裏須要注意的是FlatMapFunction中<String, String>,第一個表示輸入,第二個表示輸出
//與Hadoop中的map-reduce很是類似
JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
//處理合並後的集合中的元素,每一個元素的值爲1,返回一個Tuple2,Tuple2表示兩個元素的元組
//值得注意的是上面是JavaRDD,這裏是JavaPairRDD,在返回的是元組時須要注意這個區別
//PairFunction中<String, String, Integer>,第一個String是輸入值類型
//第二第三個,String, Integer是返回值類型
//這裏返回的是一個word和一個數值1,表示這個單詞出現一次
JavaPairRDD<String, Integer> splitFlagRDD = splitRDD.mapToPair(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
return new Tuple2<>(s,1);
}
});
//reduceByKey會將splitFlagRDD中的key相同的放在一塊兒處理
//傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
JavaPairRDD<String, Integer> countRDD = splitFlagRDD.reduceByKey(new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer integer, Integer integer2) throws Exception {
return integer+integer2;
}
});
//將計算後的結果存在項目目錄下的result目錄中
countRDD.saveAsTextFile("./resultJava");
}
}
複製代碼
python實現
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
# 設置數據的路徑
textData = sc.textFile("./GoneWithTheWind")
# 將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
splitData = textData.flatMap(lambda line:line.split(" "))
# 處理合並後的集合中的元素,每一個元素的值爲1,返回一個元組(key,value)
# 其中key爲單詞,value這裏是1,即該單詞出現一次
flagData = splitData.map(lambda word:(word,1))
# reduceByKey會將textSplitFlag中的key相同的放在一塊兒處理
# 傳入的(x,y)中,x是上一次統計後的value,y是本次單詞中的value,即每一次是x+1
countData = flagData.reduceByKey(lambda x,y:x+y)
#輸出文件
countData.saveAsTextFile("./result")
複製代碼
運行後在住目錄下獲得一個名爲result的目錄,該目錄以下圖,SUCCESS表示生成文件成功,文件內容存儲在part-00000中
咱們能夠查看文件的部份內容:
('Chapter', 1)
('1', 1)
('SCARLETT', 1)
('O’HARA', 1)
('was', 74)
('not', 33)
('beautiful,', 1)
('but', 32)
('men', 4)
('seldom', 3)
('realized', 2)
('it', 37)
('when', 19)
('caught', 1)
('by', 20)
('her', 65)
('charmas', 1)
('the', 336)
('Tarleton', 7)
('twins', 16)
('were.', 1)
('In', 1)
('face', 6)
('were', 49)
...
...
...
...
複製代碼
這樣就完成了一個spark的真正HelloWorld程序--單詞計數。對比三個語言版本的程序,發現一個事實那就是,用scala和python寫的代碼很是簡潔並且易懂,而Java實現的則相對複雜,難懂。固然這個易懂和難懂是相對而言的。若是你只會Java不管如何你都應該從中能看懂java的程序,而簡潔的scala和python對你來講根本看不懂。這也無妨,語言只是工具,重點看你怎麼用。何況,咱們使用java8的特性也能夠寫出簡潔的代碼。
java8實現
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;
import java.util.Arrays;
import java.util.Iterator;
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
countJava8(sc);
}
public static void countJava8(JavaSparkContext sc){
sc.textFile("./GoneWithTheWind")
.flatMap(s->Arrays.asList(s.split(" ")).iterator())
.mapToPair(s->new Tuple2<>(s,1))
.reduceByKey((x,y)->x+y)
.saveAsTextFile("./resultJava8");
}
}
複製代碼
spark的優越性在這個小小的程序中已經有所體現,計算一本書的每一個單詞出現的次數,spark在單機上運行(讀取文件、生成臨時文件、將結果寫到硬盤),加載-運行-結束只花費了2秒時間。
程序是否還能再簡單高效呢?固然是能夠的,咱們能夠用countByValue這個函數,這個函數正是經常使用的計數的方法。
scala實現
import org.apache.spark.{SparkConf, SparkContext}
object WordCount {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("WordCount")
val sc = new SparkContext(conf)
//設置數據路徑
val text = sc.textFile("./GoneWithTheWind")
//將文本數據按行處理,每行按空格拆成一個數組
// flatMap會將各個數組中元素合成一個大的集合
val textSplit = text.flatMap(line =>line.split(" "))
println(textSplit.countByValue())
}
}
複製代碼
運行獲得結果
Map(Heknew -> 1,   「Ashley -> 1, 「Let’s -> 1, anarresting -> 1, of. -> 1, pasture -> 1, war’s -> 1, wall. -> 1, looks -> 2, ain’t -> 7,.......
複製代碼
java實現
public class WordCountJava {
public static void main(String[] args){
SparkConf conf = new SparkConf().setMaster("local").setAppName("WordCountJava");
JavaSparkContext sc = new JavaSparkContext(conf);
countJava(sc);
}
public static void countJava(JavaSparkContext sc){
//設置數據的路徑
JavaRDD<String> textRDD = sc.textFile("./GoneWithTheWind");
//將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
//這裏須要注意的是FlatMapFunction中<String, String>,第一個表示輸入,第二個表示輸出
//與Hadoop中的map-reduce很是類似
JavaRDD<String> splitRDD = textRDD.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) throws Exception {
return Arrays.asList(s.split(" ")).iterator();
}
});
System.out.println(splitRDD.countByValue());
}
}
複製代碼
運行獲得結果
{Heknew=1,   「Ashley=1, 「Let’s=1, anarresting=1, of.=1, pasture=1, war’s=1, wall.=1, looks=2, ain’t=7, Clayton=1, approval.=1, ideas=1,
複製代碼
python實現
from pyspark import SparkConf,SparkContext
conf = SparkConf().setMaster("local").setAppName("HelloWorld")
sc = SparkContext(conf=conf)
# 設置數據的路徑
textData = sc.textFile("./GoneWithTheWind")
# 將文本數據按行處理,每行按空格拆成一個數組,flatMap會將各個數組中元素合成一個大的集合
splitData = textData.flatMap(lambda line:line.split(" "))
print(splitData.countByValue())
複製代碼
運行獲得結果:
defaultdict(<class 'int'>, {'Chapter': 1, '1': 1, 'SCARLETT': 1, 'O’HARA': 1, 'was': 74, 'not': 33, 'beautiful,': 1, 'but': 32, 'men': 4,
複製代碼
spark的優越性在這個小小的程序中已經有所體現,計算一本書的每一個單詞出現的次數,spark在單機上運行(讀取文件、生成臨時文件、將結果寫到硬盤),加載-運行-結束只花費了2秒時間。若是想要獲取源代碼以及數據內容,能夠前往個人
github下載。