準備兩臺以上Linux服務器,安裝好JDKhtml
提早到官網下載相應版本並上傳spark-安裝包到Linux上java
解壓安裝包到指定位置node
tar -zxvf spark-2.1.0-bin-hadoop2.6.tgz -C /usr/local
進入到Spark安裝目錄算法
cd /usr/local/spark-2.1.0-bin-hadoop2.6
進入conf目錄並重命名並修改spark-env.sh.template文件shell
cd conf/ mv spark-env.sh.template spark-env.sh vi spark-env.sh
在該配置文件中添加以下配置apache
export JAVA_HOME=/usr/java/jdk1.8.0_111 #export SPARK_MASTER_IP=node1.edu360.cn #export SPARK_MASTER_PORT=7077
保存退出編程
重命名並修改slaves.template文件服務器
mv slaves.template slaves vi slaves
在該文件中添加子節點所在的位置(Worker節點)maven
node2.edu360.cn
node3.edu360.cn
node4.edu360.cn
保存退出分佈式
將配置好的Spark拷貝到其餘節點上
scp -r spark-2.1.0-bin-hadoop2.6/ node2.edu360.cn:/usr/local/ scp -r spark-2.1.0-bin-hadoop2.6/ node3.edu360.cn:/usr/local/ scp -r spark-2.1.0-bin-hadoop2.6/ node4.edu360.cn:/usr/local/
Spark集羣配置完畢,目前是1個Master,3個Work,在node1.edu360.cn上啓動Spark集羣
/usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh
啓動後執行jps命令,主節點上有Master進程,其餘子節點上有Work進行,登陸Spark管理界面查看集羣狀態(主節點):http://node1.edu360.cn:8080/
到此爲止,Spark集羣安裝完畢,可是有一個很大的問題,那就是Master節點存在單點故障,要解決此問題,就要藉助zookeeper,而且啓動至少兩個Master節點來實現高可靠,配置方式比較簡單:
Spark集羣規劃:node1,node2是Master;node3,node4,node5是Worker
安裝配置zk集羣,並啓動zk集羣
中止spark全部服務,修改配置文件spark-env.sh,在該配置文件中刪掉SPARK_MASTER_IP並添加以下配置
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=zk1,zk2,zk3 -Dspark.deploy.zookeeper.dir=/spark"
1.在node1節點上修改slaves配置文件內容指定worker節點
2.在node1上執行sbin/start-all.sh腳本,而後在node2上執行sbin/start-master.sh啓動第二個Master
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://node1.edu360.cn:7077 --executor-memory 1G --total-executor-cores 2 /usr/local/spark-2.1.0-bin-hadoop2.6/lib/spark-examples-2.1.0-hadoop2.6.0.jar 100
該算法是利用蒙特·卡羅算法求PI
spark-shell是Spark自帶的交互式Shell程序,方便用戶進行交互式編程,用戶能夠在該命令行下用scala編寫spark程序。
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-shell --master spark://node1.edu360.cn:7077 --executor-memory 2g --total-executor-cores 2
參數說明:
--master spark://node1.edu360.cn:7077 指定Master的地址 --executor-memory 2g 指定每一個worker可用內存爲2G --total-executor-cores 2 指定整個集羣使用的cup核數爲2個
注意:若是啓動spark shell時沒有指定master地址,可是也能夠正常啓動spark shell和執行spark shell中的程序,實際上是啓動了spark的local模式,該模式僅在本機啓動一個進程,沒有與集羣創建聯繫。Spark Shell中已經默認將SparkContext類初始化爲對象sc。用戶代碼若是須要用到,則直接應用sc便可.
1.首先啓動hdfs
2.向hdfs上傳一個文件到hdfs://node1.edu360.cn:9000/words.txt
3.在spark shell中用scala語言編寫spark程序
sc.textFile("hdfs://node1.edu360.cn:9000/words.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile("hdfs://node1.edu360.cn:9000/out")
4.使用hdfs命令查看結果
hdfs dfs -ls hdfs://node1.edu360.cn:9000/out/p*
說明:
sc是SparkContext對象,該對象時提交spark程序的入口
textFile(hdfs://node1.edu360.cn:9000/words.txt)是hdfs中讀取數據
flatMap(_.split(" "))先map在壓平
map((_,1))將單詞和1構成元組
reduceByKey(_+_)按照key進行reduce,並將value累加
saveAsTextFile("hdfs://node1.edu360.cn:9000/out")將結果寫入到hdfs中
spark shell僅在測試和驗證咱們的程序時使用的較多,在生產環境中,一般會在IDE中編制程序,而後打成jar包,而後提交到集羣,最經常使用的是建立一個Maven項目,利用Maven來管理jar包的依賴。
1.建立一個項目
2.選擇Maven項目,而後點擊next
3.填寫maven的GAV,而後點擊next
4.填寫項目名稱,而後點擊finish
5.建立好maven項目後,點擊Enable Auto-Import
6.配置Maven的pom.xml
1 <?xml version="1.0" encoding="UTF-8"?> 2 <project xmlns="http://maven.apache.org/POM/4.0.0" 3 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 4 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 5 <modelVersion>4.0.0</modelVersion> 6 7 <groupId>learn.spark</groupId> 8 <artifactId>SparkDemo</artifactId> 9 <version>1.0-SNAPSHOT</version> 10 11 <properties> 12 <maven.compiler.source>1.8</maven.compiler.source> 13 <maven.compiler.target>1.8</maven.compiler.target> 14 <scala.version>2.11.8</scala.version> 15 <spark.version>2.2.0</spark.version> 16 <hadoop.version>2.6.5</hadoop.version> 17 <encoding>UTF-8</encoding> 18 </properties> 19 20 <dependencies> 21 <!-- 導入scala的依賴 --> 22 <dependency> 23 <groupId>org.scala-lang</groupId> 24 <artifactId>scala-library</artifactId> 25 <version>${scala.version}</version> 26 </dependency> 27 28 <!-- 導入spark的依賴 --> 29 <dependency> 30 <groupId>org.apache.spark</groupId> 31 <artifactId>spark-core_2.11</artifactId> 32 <version>${spark.version}</version> 33 </dependency> 34 35 <!-- 指定hadoop-client API的版本 --> 36 <dependency> 37 <groupId>org.apache.hadoop</groupId> 38 <artifactId>hadoop-client</artifactId> 39 <version>${hadoop.version}</version> 40 </dependency> 41 42 </dependencies> 43 44 <build> 45 <pluginManagement> 46 <plugins> 47 <!-- 編譯scala的插件 --> 48 <plugin> 49 <groupId>net.alchim31.maven</groupId> 50 <artifactId>scala-maven-plugin</artifactId> 51 <version>3.2.2</version> 52 </plugin> 53 <!-- 編譯java的插件 --> 54 <plugin> 55 <groupId>org.apache.maven.plugins</groupId> 56 <artifactId>maven-compiler-plugin</artifactId> 57 <version>3.5.1</version> 58 </plugin> 59 </plugins> 60 </pluginManagement> 61 <plugins> 62 <plugin> 63 <groupId>net.alchim31.maven</groupId> 64 <artifactId>scala-maven-plugin</artifactId> 65 <executions> 66 <execution> 67 <id>scala-compile-first</id> 68 <phase>process-resources</phase> 69 <goals> 70 <goal>add-source</goal> 71 <goal>compile</goal> 72 </goals> 73 </execution> 74 <execution> 75 <id>scala-test-compile</id> 76 <phase>process-test-resources</phase> 77 <goals> 78 <goal>testCompile</goal> 79 </goals> 80 </execution> 81 </executions> 82 </plugin> 83 84 <plugin> 85 <groupId>org.apache.maven.plugins</groupId> 86 <artifactId>maven-compiler-plugin</artifactId> 87 <executions> 88 <execution> 89 <phase>compile</phase> 90 <goals> 91 <goal>compile</goal> 92 </goals> 93 </execution> 94 </executions> 95 </plugin> 96 97 98 <!-- 打jar插件 --> 99 <plugin> 100 <groupId>org.apache.maven.plugins</groupId> 101 <artifactId>maven-shade-plugin</artifactId> 102 <version>2.4.3</version> 103 <executions> 104 <execution> 105 <phase>package</phase> 106 <goals> 107 <goal>shade</goal> 108 </goals> 109 <configuration> 110 <filters> 111 <filter> 112 <artifact>*:*</artifact> 113 <excludes> 114 <exclude>META-INF/*.SF</exclude> 115 <exclude>META-INF/*.DSA</exclude> 116 <exclude>META-INF/*.RSA</exclude> 117 </excludes> 118 </filter> 119 </filters> 120 </configuration> 121 </execution> 122 </executions> 123 </plugin> 124 </plugins> 125 </build> 126 127 </project>
7.新建一個scala class,類型爲Object
8.編寫spark程序
1 package spark.scala 2 3 import org.apache.spark.rdd.RDD 4 import org.apache.spark.{SparkConf, SparkContext} 5 6 object ScalaWordCount { 7 8 def main(args: Array[String]): Unit = { 9 // 建立spark配置,設置應用程序名字 10 //val conf = new SparkConf().setAppName("ScalaWordCount") 11 val conf = new SparkConf().setAppName("ScalaWordCount").setMaster("local[*]") 12 // 建立spark執行入口 13 val sc = new SparkContext(conf) 14 // 指定之後從哪兒讀取數據建立RDD(彈性分佈式數據集) 15 // sc.textFile(args(0)).flatMap(_.split(" ")).map((_, 1)).reduceByKey(_+_).sortBy(_._2, false).saveAsTextFile(args(1)) 16 17 val line: RDD[String] = sc.textFile(args(0)) 18 // 切分壓平 19 val words: RDD[String] = line.flatMap(_.split(" ")) 20 // 將單詞和1組合 21 val wordAndOne: RDD[(String, Int)] = words.map((_, 1)) 22 // 按照key進行聚合 23 val reduced: RDD[(String, Int)] = wordAndOne.reduceByKey(_+_) 24 // 排序 25 val sorted: RDD[(String, Int)] = reduced.sortBy(_._2, false) 26 // 將結果保存到HDFS中 27 sorted.saveAsTextFile(args(1)) 28 // 釋放資源 29 sc.stop() 30 } 31 }
9.使用Maven打包:首先修改pom.xml中的main class
點擊idea右側的Maven Project選項;點擊Lifecycle,選擇clean和package,而後點擊Run Maven Build
10.選擇編譯成功的jar包,並將該jar上傳到Spark集羣中的某個節點上
11.首先啓動hdfs和Spark集羣
啓動hdfs
/usr/local/hadoop-2.6.5/sbin/start-dfs.sh
啓動spark
/usr/local/spark-2.1.0-bin-hadoop2.6/sbin/start-all.sh
12.使用spark-submit命令提交Spark應用(注意參數的順序)
/usr/local/spark-2.1.0-bin-hadoop2.6/bin/spark-submit --class cn.itcast.spark.WordCount --master spark://node1.edu360.cn:7077 --executor-memory 2G --total-executor-cores 4 /root/spark-mvn-1.0-SNAPSHOT.jar hdfs://node1.edu360.cn:9000/words.txt hdfs://node1.edu360.cn:9000/out
查看程序執行結果
hdfs dfs -cat hdfs://node1.edu360.cn:9000/out/part-00000