1、安裝JDK(具體安裝省略)html
2、安裝Scala(具體安裝省略)java
3、安裝IDEAlinux
一、打開後會看到以下,而後點擊OKweb
二、點擊Next:Default plugins,進入如下頁面:shell
三、點擊Scala的Install安裝(確保聯網),等待完成後,出現apache
四、點擊Create New Project進入主頁面:打開後新建一個名爲WordCount的工程(這個應該都知道吧File-->New-->Scala Project),創建完成後,再點擊File-->Project Structure-->Libraries,而後點擊+號,找到本身的spark包(spark-assembly-1.0.0-hadoop1.0.4.jar,而後點擊OK)(這一步很重要)編程
五、此時全部的包依賴都導入成功,而後新建一個Scala類,便可開發Spark安全
4、寫程序網絡
下面給出史上最詳細的程序:app
一、若是想要在本地上搞Spark的話:
1 package com.df.spark 2 import org.apache.spark.SparkConf 3 import org.apache.spark.SparkContext 4 import org.apache.spark.SparkContext._ 5 import org.apache.spark.rdd.RDD 6 /** 7 * 使用Scala開發集羣運行的Spark WordCount程序 8 * @author liuzhongfeng 9 */ 10 object WordCount_Cluster { 11 def main(args: Array[String]){ 12 /** 13 * 第一步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息 14 * 例如說經過setMaster來設置程序要連接的Spark集羣的Master的URL,若是設置爲local, 15 * 則表明Spark程序在本地運行,特別適合機器配置條件差的初學者。 16 */ 17 val conf=new SparkConf()//建立SparkConf對象 18 conf.setAppName("My First Spark App!")//設置應用程序的名稱,在程序運行的監控界面能夠看到名稱 19 conf.setMaster("spark://cMaster-spark:7077")//程序此時運行在Spark集羣 20 21 /** 22 * 第二步:建立SparkContext對象, 23 * SparkContext是Spark程序全部功能的惟一入口,不管是採用Scala、Java、Python、R等都必須有一個SparkContext 24 * SparkContext的核心做用:初始化Spark應用程序運行所須要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBacken 25 * 同時還會負責Spark程序往Master註冊程序等 26 * SparkContext是整個Spark應用程序中相當重要的一個對象 27 */ 28 val sc=new SparkContext(conf)//經過建立SparkContext對象,經過傳入SparkConf實例來定製Spark運行的具體參數和配置信息 29 30 /** 31 * 第三步:根據具體的數據來源(HDFS、HBase、Local FS、S3)經過SparkContext來建立RDD 32 * RDD的建立基本有三種方式:根據外部的數據來源(例如HDFS)、根據Scala集合、由其餘的RDD操做 33 * 數據會被RDD劃分稱爲一些列的Partitions,分配到每一個Partition的數據屬於一個Task的處理範疇 34 */ 35 // val lines: RDD[String]=sc.textFile("H://下載//linux軟件包//linux-spark的文件//spark//spark-1.0.0-bin-hadoop1//README.md", 1) 36 //讀取本地文件並設置爲一個Partition 37 //val lines=sc.textFile("H://下載//linux軟件包//linux-spark的文件//spark//spark-1.0.0-bin-hadoop1//README.md", 1) 38 val lines=sc.textFile("/in", 1) 39 /** 40 * 第四步:對初始的RDD進行Transformation級別的處理,例如map、filter等高階函數的編程,來進行具體的數據計算 41 * 第4.1步:將每一行的字符串拆分紅單個的單詞 42 */ 43 val words=lines.flatMap { line => line.split(" ")}//對每一行的字符串進行單詞切分,並把全部行的切分結果經過flat合併成一個大的單詞集合 44 /** 45 * 第四步:對初始的RDD進行Transformation級別的處理,例如map、filter等高階函數的編程,來進行具體的數據計算 46 * 第4.2步:在單詞切分的基礎上,對每一個單詞實例的計數爲1,也就是word=>(word,1) 47 */ 48 val pairs=words.map { word => (word,1) } 49 /** 50 * 第四步:對初始的RDD進行Transformation級別的處理,例如map、filter等高階函數的編程,來進行具體的數據計算 51 * 第4.3步:在每一個單詞實例計數爲1的基礎之上統計每一個單詞在文件中出現的總次數 52 */ 53 val wordCounts=pairs.reduceByKey(_+_)//對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) 54 wordCounts.collect.foreach(wordNumberPair=>println(wordNumberPair._1+" : "+wordNumberPair._2)) 55 sc.stop() 56 } 57 }
經過點擊右鍵,選擇Run As-->Scala Application,而後出現運行結果:
16/01/27 16:55:27 INFO SecurityManager: Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 16/01/27 16:55:27 INFO SecurityManager: Changing view acls to: liuzhongfeng 16/01/27 16:55:27 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(liuzhongfeng) 16/01/27 16:55:28 INFO Slf4jLogger: Slf4jLogger started 16/01/27 16:55:28 INFO Remoting: Starting remoting 16/01/27 16:55:28 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://spark@Frank:38059] 16/01/27 16:55:28 INFO Remoting: Remoting now listens on addresses: [akka.tcp://spark@Frank:38059] 16/01/27 16:55:28 INFO SparkEnv: Registering MapOutputTracker 16/01/27 16:55:28 INFO SparkEnv: Registering BlockManagerMaster 16/01/27 16:55:28 INFO DiskBlockManager: Created local directory at C:\Users\LIUZHO~1\AppData\Local\Temp\spark-local-20160127165528-81e4 16/01/27 16:55:28 INFO MemoryStore: MemoryStore started with capacity 1068.9 MB. 16/01/27 16:55:28 INFO ConnectionManager: Bound socket to port 38062 with id = ConnectionManagerId(Frank,38062) 16/01/27 16:55:28 INFO BlockManagerMaster: Trying to register BlockManager 16/01/27 16:55:28 INFO BlockManagerInfo: Registering block manager Frank:38062 with 1068.9 MB RAM 16/01/27 16:55:28 INFO BlockManagerMaster: Registered BlockManager 16/01/27 16:55:28 INFO HttpServer: Starting HTTP Server 16/01/27 16:55:28 INFO HttpBroadcast: Broadcast server started at http://192.168.1.107:38063 16/01/27 16:55:28 INFO HttpFileServer: HTTP File server directory is C:\Users\LIUZHO~1\AppData\Local\Temp\spark-59ecde39-31f6-4f84-ac49-e86194415dec 16/01/27 16:55:28 INFO HttpServer: Starting HTTP Server 16/01/27 16:55:28 INFO SparkUI: Started SparkUI at http://Frank:4040 16/01/27 16:55:29 INFO MemoryStore: ensureFreeSpace(32816) called with curMem=0, maxMem=1120822886 16/01/27 16:55:29 INFO MemoryStore: Block broadcast_0 stored as values to memory (estimated size 32.0 KB, free 1068.9 MB) 16/01/27 16:55:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 16/01/27 16:55:29 WARN LoadSnappy: Snappy native library not loaded 16/01/27 16:55:29 INFO FileInputFormat: Total input paths to process : 1 16/01/27 16:55:29 INFO SparkContext: Starting job: foreach at WordCount.scala:53 16/01/27 16:55:29 INFO DAGScheduler: Registering RDD 4 (reduceByKey at WordCount.scala:52) 16/01/27 16:55:29 INFO DAGScheduler: Got job 0 (foreach at WordCount.scala:53) with 1 output partitions (allowLocal=false) 16/01/27 16:55:29 INFO DAGScheduler: Final stage: Stage 0(foreach at WordCount.scala:53) 16/01/27 16:55:29 INFO DAGScheduler: Parents of final stage: List(Stage 1) 16/01/27 16:55:29 INFO DAGScheduler: Missing parents: List(Stage 1) 16/01/27 16:55:29 INFO DAGScheduler: Submitting Stage 1 (MapPartitionsRDD[4] at reduceByKey at WordCount.scala:52), which has no missing parents 16/01/27 16:55:29 INFO DAGScheduler: Submitting 1 missing tasks from Stage 1 (MapPartitionsRDD[4] at reduceByKey at WordCount.scala:52) 16/01/27 16:55:29 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks 16/01/27 16:55:29 INFO TaskSetManager: Starting task 1.0:0 as TID 0 on executor localhost: localhost (PROCESS_LOCAL) 16/01/27 16:55:29 INFO TaskSetManager: Serialized task 1.0:0 as 2172 bytes in 2 ms 16/01/27 16:55:29 INFO Executor: Running task ID 0 16/01/27 16:55:29 INFO BlockManager: Found block broadcast_0 locally 16/01/27 16:55:29 INFO HadoopRDD: Input split: file:/H:/下載/linux軟件包/linux-spark的文件/spark/spark-1.0.0-bin-hadoop1/README.md:0+4221 16/01/27 16:55:29 INFO Executor: Serialized size of result for 0 is 775 16/01/27 16:55:29 INFO Executor: Sending result for 0 directly to driver 16/01/27 16:55:29 INFO Executor: Finished task ID 0 16/01/27 16:55:29 INFO TaskSetManager: Finished TID 0 in 231 ms on localhost (progress: 1/1) 16/01/27 16:55:29 INFO DAGScheduler: Completed ShuffleMapTask(1, 0) 16/01/27 16:55:29 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool 16/01/27 16:55:29 INFO DAGScheduler: Stage 1 (reduceByKey at WordCount.scala:52) finished in 0.240 s 16/01/27 16:55:29 INFO DAGScheduler: looking for newly runnable stages 16/01/27 16:55:29 INFO DAGScheduler: running: Set() 16/01/27 16:55:29 INFO DAGScheduler: waiting: Set(Stage 0) 16/01/27 16:55:29 INFO DAGScheduler: failed: Set() 16/01/27 16:55:29 INFO DAGScheduler: Missing parents for Stage 0: List() 16/01/27 16:55:29 INFO DAGScheduler: Submitting Stage 0 (MapPartitionsRDD[6] at reduceByKey at WordCount.scala:52), which is now runnable 16/01/27 16:55:29 INFO DAGScheduler: Submitting 1 missing tasks from Stage 0 (MapPartitionsRDD[6] at reduceByKey at WordCount.scala:52) 16/01/27 16:55:29 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 16/01/27 16:55:29 INFO TaskSetManager: Starting task 0.0:0 as TID 1 on executor localhost: localhost (PROCESS_LOCAL) 16/01/27 16:55:29 INFO TaskSetManager: Serialized task 0.0:0 as 2003 bytes in 1 ms 16/01/27 16:55:29 INFO Executor: Running task ID 1 16/01/27 16:55:29 INFO BlockManager: Found block broadcast_0 locally 16/01/27 16:55:29 INFO BlockFetcherIterator$BasicBlockFetcherIterator: maxBytesInFlight: 50331648, targetRequestSize: 10066329 16/01/27 16:55:29 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks 16/01/27 16:55:29 INFO BlockFetcherIterator$BasicBlockFetcherIterator: Started 0 remote fetches in 6 ms For : 5 Programs : 1 gladly : 1 Because : 1 The : 1 agree : 1 cluster. : 1 webpage : 1 its : 1 under : 2 legal : 1 1.x, : 1 have : 2 Try : 1 MRv1, : 1 add : 2 through : 1 several : 1 This : 2 Whether : 1 "yarn-cluster" : 1 % : 2 storage : 1 To : 2 setting : 1 any : 2 Once : 1 application : 1 explicitly, : 1 use: : 1 prefer : 1 SparkPi : 2 version : 3 file : 1 documentation, : 1 Along : 1 the : 28 entry : 1 author. : 1 are : 2 systems. : 1 params : 1 not : 2 different : 1 refer : 1 Interactive : 2 given. : 1 if : 5 file's : 1 build : 3 when : 2 be : 2 Tests : 1 Apache : 6 ./bin/run-example : 2 programs, : 1 including : 1 <http://spark.apache.org/documentation.html>. : 1 Spark. : 2 2.0.5-alpha : 1 package. : 1 1000).count() : 1 project's : 3 Versions : 1 HDFS : 1 license : 3 email, : 1 <artifactId>hadoop-client</artifactId> : 1 >>> : 1 "org.apache.hadoop" : 1 <version>1.2.1</version> : 1 programming : 1 Testing : 1 run: : 1 environment : 2 pull : 3 1000: : 2 v2 : 1 <groupId>org.apache.hadoop</groupId> : 1 Please : 1 is : 6 run : 7 URL, : 1 SPARK_HADOOP_VERSION=2.2.0 : 1 threads. : 1 same : 1 MASTER=spark://host:7077 : 1 on : 4 built : 2 against : 1 tests : 1 examples : 2 at : 1 usage : 1 using : 3 Maven, : 1 talk : 1 submitting : 1 Shell : 2 class : 2 adding : 1 abbreviated : 1 directory. : 1 README : 1 overview : 1 dependencies. : 1 `examples` : 2 example: : 1 ## : 9 N : 1 set : 2 use : 3 Hadoop-supported : 1 running : 1 find : 1 via : 2 contains : 1 project : 3 SPARK_HADOOP_VERSION=2.0.5-alpha : 1 Pi : 1 need : 1 request, : 1 or : 5 </dependency> : 1 <class> : 1 uses : 1 "hadoop-client" : 2 Hadoop, : 1 (You : 1 requires : 1 Contributions : 1 SPARK_HADOOP_VERSION=1.2.1 : 1 Documentation : 1 of : 3 cluster : 1 using: : 1 accepted : 1 must : 1 "1.2.1" : 1 1.2.1 : 2 built, : 1 Hadoop : 11 means : 1 Spark : 12 this : 4 Python : 2 original : 2 YARN, : 3 2.1.X, : 1 pre-built : 1 [Configuration : 1 locally. : 1 ./bin/pyspark : 1 A : 1 locally : 2 # : 6 sc.parallelize(1 : 1 only : 1 library : 1 Configuration : 1 basic : 1 MapReduce : 2 documentation : 1 first : 1 which : 2 following : 2 changed : 1 also : 4 Cloudera : 4 without : 1 should : 2 for : 1 "yarn-client" : 1 [params]`. : 1 `SPARK_YARN=true`: : 1 setup : 1 mesos:// : 1 <http://spark.apache.org/> : 1 GitHub : 1 requests : 1 latest : 1 your : 6 test : 1 MASTER : 1 example : 3 authority : 1 SPARK_YARN=true : 3 scala> : 1 guide](http://spark.apache.org/docs/latest/configuration.html) : 1 configure : 1 artifact : 1 can : 7 About : 1 you're : 1 instructions. : 1 do : 3 2.0.X, : 1 easiest : 1 no : 1 When : 1 how : 1 newer : 1 `./bin/run-example : 1 source : 2 copyrighted : 1 material : 2 Note : 1 2.10. : 1 by : 3 please : 1 Lightning-Fast : 1 spark:// : 1 so. : 1 Scala : 3 Alternatively, : 1 If : 1 Cluster : 1 variable : 1 submit : 1 an : 2 thread, : 1 them, : 1 2.2.X : 1 And : 1 application, : 1 return : 2 developing : 1 ./bin/spark-shell : 1 `<dependencies>` : 1 warrant : 1 "local" : 1 start : 1 You : 4 <dependency> : 1 Spark](#building-spark). : 1 one : 2 help : 1 with : 8 print : 1 CDH : 4 2.2.X, : 1 $ : 5 SPARK_HADOOP_VERSION=2.0.0-mr1-cdh4.2.0 : 1 in : 4 Contributing : 1 downloaded : 1 versions : 4 online : 1 `libraryDependencies`: : 1 - : 1 section: : 1 4.2.0 : 2 comes : 1 [building : 1 Python, : 1 0.23.x, : 1 `SPARK_HADOOP_VERSION` : 1 Many : 1 other : 4 Running : 1 sbt/sbt : 5 building : 1 way : 1 SBT, : 1 Online : 1 change : 1 MRv2, : 1 contribution : 1 from : 1 Example : 1 POM : 1 open : 2 sc.parallelize(range(1000)).count() : 1 you : 8 runs. : 1 Building : 1 protocols : 1 that : 4 a : 5 their : 1 guide, : 1 name : 1 example, : 1 state : 2 work : 2 will : 1 instance: : 1 to : 19 v1 : 1 core : 1 : 149 license. : 1 "local[N]" : 1 programs : 2 package.) : 1 shell: : 2 ./sbt/sbt : 2 assembly : 6 specify : 1 and : 9 Computing : 1 command, : 2 SPARK_HADOOP_VERSION=2.0.0-cdh4.2.0 : 1 sample : 1 requests, : 1 16/01/27 16:55:29 INFO Executor: Serialized size of result for 1 is 825 16/01/27 16:55:29 INFO Executor: Sending result for 1 directly to driver 16/01/27 16:55:29 INFO Executor: Finished task ID 1 16/01/27 16:55:29 INFO DAGScheduler: Completed ResultTask(0, 0) 16/01/27 16:55:29 INFO DAGScheduler: Stage 0 (foreach at WordCount.scala:53) finished in 0.126 s 16/01/27 16:55:29 INFO TaskSetManager: Finished TID 1 in 123 ms on localhost (progress: 1/1) 16/01/27 16:55:29 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 16/01/27 16:55:29 INFO SparkContext: Job finished: foreach at WordCount.scala:53, took 0.521885349 s 16/01/27 16:55:29 INFO SparkUI: Stopped Spark web UI at http://Frank:4040 16/01/27 16:55:29 INFO DAGScheduler: Stopping DAGScheduler 16/01/27 16:55:31 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped! 16/01/27 16:55:31 INFO ConnectionManager: Selector thread was interrupted! 16/01/27 16:55:31 INFO ConnectionManager: ConnectionManager stopped 16/01/27 16:55:31 INFO MemoryStore: MemoryStore cleared 16/01/27 16:55:31 INFO BlockManager: BlockManager stopped 16/01/27 16:55:31 INFO BlockManagerMasterActor: Stopping BlockManagerMaster 16/01/27 16:55:31 INFO BlockManagerMaster: BlockManagerMaster stopped 16/01/27 16:55:31 INFO SparkContext: Successfully stopped SparkContext 16/01/27 16:55:31 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon. 16/01/27 16:55:31 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
二、若是想要在集羣上搞Spark的話:
1 package com.df.spark 2 import org.apache.spark.SparkConf 3 import org.apache.spark.SparkContext 4 import org.apache.spark.SparkContext._ 5 import org.apache.spark.rdd.RDD 6 /** 7 * 使用Scala開發集羣運行的Spark WordCount程序 8 * @author liuzhongfeng 9 */ 10 object WordCount_Cluster { 11 def main(args: Array[String]){ 12 /** 13 * 第一步:建立Spark的配置對象SparkConf,設置Spark程序的運行時的配置信息 14 * 例如說經過setMaster來設置程序要連接的Spark集羣的Master的URL,若是設置爲local, 15 * 則表明Spark程序在本地運行,特別適合機器配置條件差的初學者。 16 */ 17 val conf=new SparkConf()//建立SparkConf對象 18 conf.setAppName("My First Spark App!")//設置應用程序的名稱,在程序運行的監控界面能夠看到名稱 19 conf.setMaster("spark://cMaster-spark:7077")//程序此時運行在Spark集羣 20 21 /** 22 * 第二步:建立SparkContext對象, 23 * SparkContext是Spark程序全部功能的惟一入口,不管是採用Scala、Java、Python、R等都必須有一個SparkContext 24 * SparkContext的核心做用:初始化Spark應用程序運行所須要的核心組件,包括DAGScheduler、TaskScheduler、SchedulerBacken 25 * 同時還會負責Spark程序往Master註冊程序等 26 * SparkContext是整個Spark應用程序中相當重要的一個對象 27 */ 28 val sc=new SparkContext(conf)//經過建立SparkContext對象,經過傳入SparkConf實例來定製Spark運行的具體參數和配置信息 29 30 /** 31 * 第三步:根據具體的數據來源(HDFS、HBase、Local FS、S3)經過SparkContext來建立RDD 32 * RDD的建立基本有三種方式:根據外部的數據來源(例如HDFS)、根據Scala集合、由其餘的RDD操做 33 * 數據會被RDD劃分稱爲一些列的Partitions,分配到每一個Partition的數據屬於一個Task的處理範疇 34 */ 35 36 val lines=sc.textFile("/in", 1)//導入你的hdfs上的文件 37 /** 38 * 第四步:對初始的RDD進行Transformation級別的處理,例如map、filter等高階函數的編程,來進行具體的數據計算 39 * 第4.1步:將每一行的字符串拆分紅單個的單詞 40 */ 41 val words=lines.flatMap { line => line.split(" ")}//對每一行的字符串進行單詞切分,並把全部行的切分結果經過flat合併成一個大的單詞集合 42 /** 43 * 第四步:對初始的RDD進行Transformation級別的處理,例如map、filter等高階函數的編程,來進行具體的數據計算 44 * 第4.2步:在單詞切分的基礎上,對每一個單詞實例的計數爲1,也就是word=>(word,1) 45 */ 46 val pairs=words.map { word => (word,1) } 47 /** 48 * 第四步:對初始的RDD進行Transformation級別的處理,例如map、filter等高階函數的編程,來進行具體的數據計算 49 * 第4.3步:在每一個單詞實例計數爲1的基礎之上統計每一個單詞在文件中出現的總次數 50 */ 51 val wordCounts=pairs.reduceByKey(_+_)//對相同的Key,進行Value的累計(包括Local和Reducer級別同時Reduce) 52 wordCounts.collect.foreach(wordNumberPair=>println(wordNumberPair._1+" : "+wordNumberPair._2)) 53 sc.stop() 54 } 55 }
(1)將你的程序打包到你的linux,運行Spark集羣。具體操做爲:File-->Project Structure-->Artifacts-->JAR-->From Moudle With Dependencies,注意此時要把和Spark與Scala相關的JAR去掉。
(2)而後點擊OK便可。
注意:問爲何不能直接在IDEA中發到spark集羣呢?
1)、內存和cores的限制,默認值spark的driver會在提交spark的機器上,須要很強大的環境。
2)、Driver要指揮workers工做,若是spark在生產環境下必定會經過寫自動化shell腳原本提交程序的,若是與IDEA不在同一個網絡下回出現任務丟失等問題。
3)、安全性較差。
(3)、經過打開spark後,執行