mac下Spark的安裝與使用

每次接觸一個新的知識以前我都抱有恐懼之心,由於總認爲本身沒有接觸到的知識都很高大上,好比上篇介紹到的Hadoop的安裝與使用與本篇要介紹的Spark,其實在本身真正琢磨之後才發現本覺得高大上的知識其實也不過如此。html

因爲Spark是最新火起來的處理大數據的框架,國內教程資源少之甚少,因此本篇文章是本人在看了Spark官網的快速入門教程後總結下來的經驗,因爲Spark同Hadoop同樣能夠運行在多種模式下,而本人又比較窮只有一臺電腦,因此本篇文章爲你們介紹如何在mac系統的本地模式下安裝Spark以及安裝後如何用Spark來進行交互式分析。java

本文結構:前部分介紹Spark的一點點(詳情介紹請自行google)基礎概念以及安裝過程,後部分經過一個demo讓你們快速學會使用Spark基本Api。python

1.Spark的運行模式

在正式安裝Spark以前,先給你們介紹下Spark能夠在哪幾種模式下運行。同上篇Hadoop的安裝與使用中介紹的Hadoop能夠運行在其3種模式中的任意一種模式之上,Spark也能夠運行在多種模式之上,主要有如下4種運行模式:es6

  • 1.local: 本地單進程模式,用於本地開發測試Spark代碼。
  • 2.standalone:分佈式集羣模式,Master-Worker架構,Master負責調度,Worker負責具體Task的執行。
  • 3.on yarn/mesos:運行在yarn/mesos等資源管理框架之上,yarn/mesos提供資源管理,spark提供計算調度,並可與其餘計算框架(如MapReduce/MPI/Storm)共同運行在同一個集羣之上。
  • 4.on cloud(EC2): 運行在AWS的EC2之上

因爲博主比較窮,因此下面爲你們介紹本地模式下Spark的安裝與使用。shell

2.Spark的安裝

2.1準備工做

第一步:安裝Java JDK 1.7及以上版本,並配置好環境變量。本電腦安裝的jdk是1.7.0_79版本的。apache

第二步:安裝Hadoop。本電腦安裝的Hadoop是2.7.3版本的。編程

疑惑:上篇文章說到能夠不學Hadoop直接學習Spark,那爲何還要安裝Hadoop?親,個人意思是不用學習Hadoop的相關知識例如它的API啥的,可是沒說不用先搭建Hadoop的環境呀!
合理解釋:Spark會用到HDFS與YARN,所以請先安裝Hadoop,關於Hadoop的安裝請參考個人上篇博文mac下Hadoop的安裝與使用,在此就再也不復述。vim

第三步:安裝Scala 2.9.3以上版本。這裏介紹下Scala在mac下的安裝與環境變量的配置。點擊連接進入scala官方網站的下載頁,下載2.11.8版本(第一次操做的時候我下載了最新版2.12.1,後來測試spark-shell命令時發現最新版本的scala與1.7版本的jdk不兼容,因此後來換成了2.11.8版本)的Scala:
bash

點擊圖上下載連接會自動將scala下載到Dowmloads目錄下,文件名爲:scala-2.12.1.tgz,還有一種下載方法就是直接在命令行使用homebrew命令(做爲一個Linux開發人員我建議使用這種方式)進行下載:brew install scala,該命令會自動幫你把scala下載到/usr/local目錄下。使用在官網點擊連接下載的方式的話,咱們也要將該scala文件加到/usr/local目錄下,你能夠直接拷貝過去,固然做爲一個Linux開發人員你能夠直接使用一條命令完成將該壓縮包進行解壓移動/usr/local/目錄下:sudo tar -zxf ~/downloads/scala-2.12.1.tgz -C /usr/local/,而後使用命令cd /usr/local進入到該目錄下,因爲解壓後的文件名爲:scala-2.12.1,因此爲了以後配置的方便咱們使用命令:sudo mv ./scala-2.12.1 ./scala將文件名修改成scala。由於該目錄屬於管理員級別的目錄因此若是當前用戶不是管理員的話應該在命令前面使用sudo關鍵字表示使用管理員權限。閉包

這樣scala的安裝便完成,可是還要配置scala的環境變量,使用命令:sudo vim ./etc/profile打開系統中配置環境變量的文件,在裏面添加以下內容:

1
2
export SCALA_HOME=/usr/local/scala
export PATH=$PATH:$SCALA_HOME/bin

 

而後:wq!保存並退出該文件,輸入命令使該文件的內容馬上生效:source /etc/profile,接下來在(根目錄下)命令行輸入:scala並敲擊回車,看到控制檯打印以下信息說明咱們的scala安裝併成功配置了環境變量:

圖中信息即代表咱們使用的scala版本爲2.11.8,jdk版本爲1.7.0_79。

使用命令行快捷鍵control+c或者:quit退出scala shell環境(網上教程有說使用exit命令能夠退出scala shell環境,我試了但貌似不行)。

疑惑:既然Spark提供了Scala、Python、Java三種程序設計語言的API,那麼我直接用java不就行了,爲何還要下載Scala呢?是由於等會咱們會使用Spark shell鏈接到Spark引擎進行交互式數據分析,而Spark shell只支持Scala和Python兩種語言。Java不支持交互式的Shell,所以這一功能暫未在Java語言中實現(固然你也能夠不使用shell編程,直接在IDE中用java編程語言鏈接到Spark引擎進行交互式數據分析也是能夠的)。因此建議你們仍是老老實實在電腦上面下載好scala並配置好環境變量,反正也佔不了多大空間啊,並且萬一哪天用到這東西了呢?因此下面我都是採用的scala支持的shell來配置的Spark,以後我也會使用scala運行spark-shell進行交互式數據分析的一個小示例帶你們快速入門。

準備好如上環境後,接下來就能夠進行Spark的安裝與相關配置操做了。

2.2安裝Spark並配置

接下來纔是正題,進入Apache Spark官方網站進行Spark的下載,看到以下頁面:

第2條你要是選擇的是Hadoop2.7的話,你要保證你以前安裝的Hadoop版本也是2.7版本。選擇第4條的下載連接便可(固然你也能夠直接用Homebrew命令進行下載),系統會將下好的文件放在Dowmloads文件目錄下,文件名爲:spark-2.0.2-bin-hadoop2.7.tgz,同scala的安裝方法同樣,咱們使用命令:sudo tar -zxf ~/Dowmloads/spark-2.0.2-bin-hadoop2.7.tgz -C /usr/local/直接將該壓縮包解壓並移動到/usr/local/目錄下,而後咱們cd /usr/local進入到/usr/local目錄下,使用命令更改該目錄下的spark文件名:sudo mv ./spark-2.0.2-bin-hadoop2.7 ./spark將文件名改成spark

通過上述步驟從官網下載到Spark的文件,這樣咱們便完成了Spark的安裝,可是Spark也是要進行相應的環境變量配置的,因此接下來咱們進行Spark環境變量的配置。

使用命令:sudo vim /etc/profile,在文件中加入Spark的環境變量:

1
2
export SPARK_HOME=/usr/local/spark
export PATH=$PATH:$SPARK_HOME/bin

 

而後咱們進入到Spark目錄的conf配置文件中:cd /usr/local/spark/conf,執行命令:cp spark-env.sh.template spark-env.sh將spark-env.sh.template拷貝一份,而後打開拷貝後的spark-env.sh文件:vim spark-env.sh,在裏面加入以下內容:

1
2
3
4
5
export SCALA_HOME=/usr/local/scala

export SPARK_MASTER_IP=localhost

export SPARK_WORKER_MEMORY=4g

 

這樣咱們便完成了Spark環境變量的配置,接下來測試測試一下Spark,在根目錄(由於咱們配置了spark環境變量,因此能夠直接在根目錄)下輸入命令:spark-shell,看到控制檯輸出以下信息:

恭喜你,盡情享受Spark吧。

3.安裝過程出現的問題分析

1.運行spark-shell命令時控制檯出現:

1
..我忘了是啥報錯了.connection out.中間報錯信息是這個..

 

的錯誤,說明沒有配置SSH,配置SSH請參考我上篇文章中Hadoop安裝的配置過程。

2.運行命令:scala時控制檯出現:

的錯誤信息,表示scala沒有成功安裝,或者安裝的scala與jdk不兼容,因此這裏我建議大家就按本教程的2.11.8scala版本與1.7jdk版原本操做吧。這些坑我都試過了,因此才能爲大家總結經驗。(大哭臉)

3.其餘錯誤,有如下緣由,大家必定要一一進行檢查:

  • 1.關於JDK:JDK版本不對,因此我建議你們用1.7;或者是JDK版本正確可是沒有成功配置它的環境變量,我配置時更改了兩個文件的環境變量:一個是/etc/profile目錄下的,一個是.bash_profile文件中的,配置環境變量信息以下:

    1
    2
    export JAVA_HOME=/Library/Java/JavaVirtualMachines/jdk1.7.0_79.jdk/Contents/Home
    export PATH=$JAVA_HOME/bin:$PATH
  • 2.關於scala:Scala版本不對,因此我建議你們用2.11.8;或者是沒有成功配置scala的環境變量,配置環境變量按照文中介紹的便可。

  • 3.Hadoop版本與Spark版本不兼容:因此你們在Spark官網下載Spark的時候必定要注意下載Spark時選擇的第二條信息的Hadoop版本要與電腦上面已經安裝的Hadoop一致才行。

4.快速入門Spark基礎Api

這裏我介紹兩種使用Spark基礎Api的方式,一種是在spark-shell中進行簡單的測試,一種是在開發工具IDEA中進行代碼的編寫來教你們快速學習Spark基礎API。

4.1使用spark-shell完成單詞統計功能

因爲spark-shell只支持scala和python兩種語言的編寫,不支持Java,因此我在spark-shell中經過scala的語法來進行簡單測試。

在配置好Spark環境變量以後,咱們打開命令行,直接在當前用戶目錄下輸入命令spark-shell進入scala編寫環境(固然前提是你首先使用命令start-all.sh命令開啓了Spark):

咱們從 /usr/local/spark/README.md 文件新建一個 RDD,代碼以下(本文出現的 Spark 交互式命令代碼中,第一行爲代碼及其解釋,第二行及之後是控制檯返回的結果):

1
2
scala> val textFile = sc.textFile("file:///usr/local/spark/README.md")
>textFile: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark/README.md MapPartitionsRDD[1] at textFile at <console>:24

 

代碼中經過 file:// 前綴或者不加 file:// 前綴表示指定讀取本地文件。若是你這裏傳入的路徑寫的是HDFS上的文件路徑,例如hdfs://遠程主機名:Hadoop端口號我/文件名表明你要是讀取的是 HDFS 中的文件,你須要先上傳文件到 HDFS 中(至於如何上傳,後面的demo中咱們會進行講解),不然會有org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://localhost:9000/user/hadoop/README.md的錯誤。這裏咱們就以讀取本地文件進行講解。

RDDs 支持兩種類型的操做:1.actions: 在數據集上運行計算後返回值。2.transformations: 轉換, 從現有數據集上建立一個新的數據集。

使用上述命令建立好的RDD對象,下面咱們就來經過該對象演示 count() 和 first() 操做:

1
2
3
4
5
textFile.count()  // RDD 中的 item 數量,對於文本文件,就是總行數
>res0: Long = 95

textFile.first() // RDD 中的第一個 item,對於文本文件,就是第一行內容
>res1: String = # Apache Spark

 

接着演示 transformation,經過 filter transformation 來返回一個新的 RDD,代碼以下:

1
2
3
4
5
val linesWithSpark = textFile.filter(line => line.contains("Spark"))   // 篩選出包含 Spark 的行
>linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at filter at <console>:26

linesWithSpark.count() // 統計行數
>res4: Long = 17

 

上述咱們完成了RDD的簡單計算,而RDD 的 actions 和 transformations 其實可用在更復雜的計算中,例如經過以下代碼能夠找到包含單詞最多的那一行內容共有幾個單詞:

1
2
textFile.map(line => line.split(" ").size).reduce((a, b) => if (a > b) a else b)
>res1: Int = 22

 

代碼首先將每一行內容 map 爲一個整數,這將建立一個新的 RDD,並在這個 RDD 中執行 reduce 操做,找到最大的數。map()、reduce() 中的參數是 Scala 的函數字面量(function literals,也稱爲閉包 closures),而且可使用語言特徵或 Scala/Java 的庫。例如,經過使用 Math.max() 函數(須要導入 Java 的 Math 庫),可使上述代碼更容易理解:

1
2
3
4
import java.lang.Math //先導入Math函數

textFile.map(line => line.split(" ").size).reduce((a, b) => Math.max(a, b))
>res6: Int = 14

 

Hadoop MapReduce 是常見的數據流模式,在 Spark 中一樣能夠實現(下面這個例子也就是 WordCount):

1
2
3
4
5
val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey((a, b) => a + b)   // 實現單詞統計
>wordCounts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:29

wordCounts.collect() // 輸出單詞統計結果
>res7: Array[(String, Int)] = Array((package,1), (For,2), (Programs,1), (processing.,1), (Because,1), (The,1)...)

 

上述咱們經過spark-shell完成單詞的統計簡單對咱們spark的基礎Api進行了熟悉,採用的是scala語言,因爲我是一個java開發人員,因此接下來就在開發工具IDEA中經過編寫Java代碼來實現HDFS中某個路徑下文件內容中單詞的統計功能。

4.2在IDEA中編寫Java代碼完成HDFS中某個文件中的單詞統計功能

既然要統計HDFS中某個文件中的單詞,那麼咱們首先要將文件上傳到HDFS上吧!如何上傳?聽我慢慢道來。

使用命令:quit退出scala命令環境,首先在本地電腦的當前用戶目錄下建立一個文件,我這裏建立了一個叫hello的txt文件,裏面寫上內容hello world hello you hello hello ,內容能夠隨便打啦,而後輸入hadoop的命令(關於Hadoop的更多命令請自行google):hadoop fs -put ~/hello /,實現將本機目錄下的hello文件推至遠程主機的根目錄下,而後即可以開始編寫咱們的java代碼了。

使用IDEA建立一個Maven項目(便於管理咱們的jar包嘛!),在pom.xml中添加上Spark相應jar包座標:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>cn.codingxiaxw.spark</groupId>
<artifactId>spark-mvn</artifactId>
<packaging>war</packaging>
<version>1.0-SNAPSHOT</version>
<name>spark-mvn Maven Webapp</name>
<url>http://maven.apache.org</url>
<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>

<dependency> <!-- Spark dependency -->
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.0.2</version>
</dependency>

</dependencies>


<build>
<finalName>spark-mvn</finalName>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.7</source>
<target>1.7</target>
</configuration>
</plugin>
</plugins>
</build>
</project>

 

而後建立一個WordCount.java文件,代碼以下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
public class Simple
{
private static final Pattern SPACE = Pattern.compile(" ");

public static void main(String[] args) throws Exception {

// if (args.length < 1) {
// System.err.println("Usage: JavaWordCount <file>");
// System.exit(1);
// }


//建立一個RDD對象
SparkConf conf=new SparkConf().setAppName("Simple").setMaster("local");

//建立spark上下文對象,是數據的入口
JavaSparkContext spark=new JavaSparkContext(conf);

//獲取數據源
JavaRDD<String> lines = spark.textFile("hdfs://localhost:8020/hello");

/**
* 對於從數據源獲得的DStream,用戶能夠在其基礎上進行各類操做,
* 對於當前時間窗口內從數據源獲得的數據首先進行分割,
* 而後利用Map和ReduceByKey方法進行計算,固然最後還有使用print()方法輸出結果;
*/
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterator<String> call(String s) {
return Arrays.asList(SPACE.split(s)).iterator();
}
});


//使用RDD的map和reduce方法進行計算
JavaPairRDD<String, Integer> ones = words.mapToPair(
new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) {
return new Tuple2<>(s, 1);
}
});


JavaPairRDD<String, Integer> counts = ones.reduceByKey(
new Function2<Integer, Integer, Integer>() {
@Override
public Integer call(Integer i1, Integer i2) {
return i1 + i2;
}
});

List<Tuple2<String, Integer>> output = counts.collect();
for (Tuple2<?,?> tuple : output) {
//輸出計算結果
System.out.println(tuple._1() + ": " + tuple._2());
}


spark.stop();
}
}

 

各行代碼意思見代碼旁的註釋,上述代碼都是從官方文檔抄的,可是貌似要註釋掉官方文檔的:

1
2
3
4
// if (args.length < 1) {
// System.err.println("Usage: JavaWordCount <file>");
// System.exit(1);
// }

 

而後運行程序,控制檯輸出結果以下圖:

成功統計出HDFS上文件內容的單詞個數。到此,咱們便簡單熟悉了Spark的相關API,下篇文章我將介紹經過Spark的Streaming Api實現對流式數據的處理爲你們介紹Spark中Streming庫的相關Api操做。

相關文章
相關標籤/搜索