用java寫的一個簡單的spark程序,經過本地運行和集羣運行例子。java
1 在eclipse下建一個maven工程sql
配置pom.xmlapache
配置文件參考下面:api
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>cn.spark</groupId> <artifactId>SparkTest</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>SparkTest</name> <url>http://maven.apache.org</url> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <dependency> <groupId>junit</groupId> <artifactId>junit</artifactId> <version>3.8.1</version> <scope>test</scope> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-sql_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-hive_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.3.0</version> </dependency> <dependency> <groupId>org.apache.hadoop</groupId> <artifactId>hadoop-client</artifactId> <version>2.4.1</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.3.0</version> </dependency> </dependencies> <build> <sourceDirectory>src/main/java</sourceDirectory> <testSourceDirectory>src/main/test</testSourceDirectory> <plugins> <plugin> <artifactId>maven-assembly-plugin</artifactId> <configuration> <descriptorRefs> <descriptorRef>jar-with-dependencies</descriptorRef> </descriptorRefs> <archive> <manifest> <mainClass></mainClass> </manifest> </archive> </configuration> <executions> <execution> <id>make-assembly</id> <phase>package</phase> <goals> <goal>single</goal> </goals> </execution> </executions> </plugin> <plugin> <groupId>org.codehaus.mojo</groupId> <artifactId>exec-maven-plugin</artifactId> <version>1.2.1</version> <executions> <execution> <goals> <goal>exec</goal> </goals> </execution> </executions> <configuration> <executable>java</executable> <includeProjectDependencies>true</includeProjectDependencies> <includePluginDependencies>false</includePluginDependencies> <classpathScope>compile</classpathScope> <mainClass>cn.spark.sparktest.App</mainClass> </configuration> </plugin> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>1.6</source> <target>1.6</target> </configuration> </plugin> </plugins> </build> </project>
配置好後eclipse會自動從遠端資源庫中進行下載app
2 編寫spark程序eclipse
程序詳細以下:maven
package org.spark.study.core; import java.util.Arrays; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.api.java.function.VoidFunction; import scala.Tuple2; /** * 用java語言開發spark程序 * 第一個學習程序 wordcount * @author 18521 * */ public class wordCountLocal { public static void main(String[] args) { // TODO Auto-generated method stub // 1 建立一個sparkconf 對象並配置 // 使用setMaster 能夠設置spark集羣能夠連接集羣的URL,若是設置local 表明在本地運行而不是在集羣運行 SparkConf conf = new SparkConf() .setAppName("wordCountLocal") .setMaster("local"); // 2 建立javasparkContext對象 // sparkcontext 是一個入口,主要做用就是初始化spark應用程序所需的一些核心組件,例如調度器,task, // 還會註冊spark,sparkMaster結點上註冊。反正就是spake應用中最重要的對象 JavaSparkContext sc = new JavaSparkContext(conf); // 3 對輸入源建立一個出事RDD // 元素就是輸入源文件中的一行 JavaRDD<String> lines = sc.textFile("D://worksoft//testdata//spark.txt"); // 4 把輸入源拆分紅一個一個的單詞 // 引用一個RDD 都會建立一個function 類(比較簡單的話就是一個匿名內部類) // FlatMapFunction 有連個參數輸入和輸出 JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; public Iterable<String> call(String arg0) throws Exception { // TODO Auto-generated method stub return Arrays.asList(arg0.split(" ")); } }); // 5 須要將每個單詞映射爲(單詞,1) 後面才能夠更具單詞key 對後面value 1 進行累加從而達到計數的功能 JavaPairRDD<String, Integer> parirs = words.mapToPair(new PairFunction<String, String, Integer>() { /** * 每個單詞都映射成(單詞,1) */ private static final long serialVersionUID = 1L; public Tuple2<String, Integer> call(String arg0) throws Exception { // TODO Auto-generated method stub return new Tuple2<String, Integer>(arg0, 1); } }); // 6 以單詞作爲key 統計單詞出現的次數,用reducebykey 算子,對每個key對於的value進行操做 JavaPairRDD<String,Integer> wordcount = parirs.reduceByKey(new Function2<Integer, Integer, Integer>() { public Integer call(Integer arg0, Integer arg1) throws Exception { // TODO Auto-generated method stub return arg0+arg1; } }); // 7 已經經過spark 的幾個算子 flatMap,mapToPair,reduceByKey 已經統計出每個結點中的單詞出現的次數 // 這中操做叫作transformation,可是在一開始的RDD是把文件拆分打散到不一樣的結點中的,因此後面還須要操做action 進行集合 // 9 action 操做經過foreach 來遍歷全部最後一個RDD生成的元素 wordcount.foreach(new VoidFunction<Tuple2<String,Integer>>() { @Override public void call(Tuple2<String, Integer> arg0) throws Exception { // TODO Auto-generated method stub System.out.println(arg0._1+" 出現了:"+arg0._2+"次"); } }); sc.close(); } }
3 本地測試ide
4 集羣運行oop
4.1 spark程序修改學習
4.2 測試文件上傳到hdfs
[root@spark1 opt]# hadoop fs -put spark.txt /spark.txt
[root@spark1 opt]# hadoop fs -ls / 17/05/27 11:51:29 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Found 2 items -rw-r--r-- 3 root supergroup 171073915 2017-05-27 10:32 /spark.txt drwxr-xr-x - root supergroup 0 2017-05-23 15:40 /user
4.3 程序打包
4.4 上傳打包程序並寫啓動腳本
編寫啓動腳本
[root@spark1 java]# cat wordcount.sh /opt/spark/bin/spark-submit \ # 用這個命令啓動 --class org.spark.study.core.wordCountSpark \ # 配置類名 --num-executors 3 \ # 配置在三個結點上運行 --driver-memory 100m \ # drive內存 --executor-memory 100m \ # 配置execute內存 --executor-cores 3 \ # 內核運行單元數 /opt/spark-study/java/study-0.0.1-SNAPSHOT-jar-with-dependencies.jar \ # 運行的jar包
4.5 運行啓動腳本進行測試
[root@spark1 java]# ./wordcount.sh >> spark.log [root@spark1 java]# cat spark.log integration 出現了:89100次 Hadoop��s 出現了:89100次 general 出現了:89100次 have 出現了:267300次 Million 出現了:89100次 here 出現了:89100次 big 出現了:89100次 stack. 出現了:89100次 modification 出現了:89100次 meili 出現了:267300次 conference. 出現了:89100次 we 出現了:178200次 requiring 出現了:89100次 conv 出現了:297次 simple 出現了:89100次 This 出現了:89100次 Joel 出現了:89118次 send 出現了:89118次 (HDFS) 出現了:89100次 without 出現了:178200次 ……