廢話不說,先來個示例,有個感性認識再介紹。java
這個示例來自spark自帶的example,基本步驟以下:apache
(1)使用如下命令輸入流消息:api
$ nc -lk 9999bash
(2)在一個新的終端中運行NetworkWordCount,統計上面的詞語數量並輸出:服務器
$ bin/run-example streaming.NetworkWordCount localhost 9999socket
(3)在第一步建立的輸入流程中敲入一些內容,在第二步建立的終端中會看到統計結果,如:maven
第一個終端輸入的內容:ide
hello world againoop
第二個端口的輸出this
-------------------------------------------
Time: 1436758706000 ms
-------------------------------------------
(again,1)
(hello,1)
(world,1)
簡單解釋一下,上面的示例經過手工敲入內容,並傳給spark streaming統計單詞數量,而後將結果打印出來。
附上代碼:
package org.apache.spark.examples.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.{Seconds, StreamingContext} import org.apache.spark.storage.StorageLevel /** * Counts words in UTF8 encoded, '\n' delimited text received from the network every second. * * Usage: NetworkWordCount * and describe the TCP server that Spark Streaming would connect to receive data. * * To run this on your local machine, you need to first run a Netcat server * `$ nc -lk 9999` * and then run the example * `$ bin/run-example org.apache.spark.examples.streaming.NetworkWordCount localhost 9999` */ object NetworkWordCount { def main(args: Array[String]) { if (args.length < 2) { System.err.println("Usage: NetworkWordCount ") System.exit(1) } StreamingExamples.setStreamingLogLevels() // Create the context with a 1 second batch size val sparkConf = new SparkConf().setAppName("NetworkWordCount") val ssc = new StreamingContext(sparkConf, Seconds(1)) // Create a socket stream on target ip:port and count the // words in input stream of \n delimited text (eg. generated by 'nc') // Note that no duplication in storage level only for running locally. // Replication necessary in distributed scenario for fault tolerance. val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER) val words = lines.flatMap(_.split(" ")) val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _) wordCounts.print() ssc.start() ssc.awaitTermination() } }
本示例使用java+maven來構建一個wordcount
一、建立項目,在pom.xml添加以下的依賴關係
<dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-api</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> <version>1.7.0</version> </dependency> <dependency> <groupId>log4j</groupId> <artifactId>log4j</artifactId> <version>1.2.17</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.10</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka_2.10</artifactId> <version>1.4.0</version> </dependency> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.10</artifactId> <version>0.8.2.1</version> </dependency>
二、寫代碼,此部分代碼使用了官方的代碼:
package com.netease.gdc.kafkaStreaming; import java.util.Map; import java.util.HashMap; import java.util.regex.Pattern; import scala.Tuple2; import com.google.common.collect.Lists; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Duration; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaPairReceiverInputDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka.KafkaUtils; /** * Consumes messages from one or more topics in Kafka and does wordcount. * * Usage: JavaKafkaWordCount * is a list of one or more zookeeper servers that make quorum * is the name of kafka consumer group * is a list of one or more kafka topics to consume from *is the number of threads the kafka consumer should use * * To run this example: * `$ bin/run-example org.apache.spark.examples.streaming.JavaKafkaWordCount zoo01,zoo02, \ * zoo03 my-consumer-group topic1,topic2 1` */ public final class JavaKafkaWordCount { private static final Pattern SPACE = Pattern.compile(" "); private JavaKafkaWordCount() { } public static void main(String[] args) { if (args.length < 4) { System.err.println("Usage: JavaKafkaWordCount "); System.exit(1); } SparkConf sparkConf = new SparkConf().setAppName("JavaKafkaWordCount"); // Create the context with a 1 second batch size JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, new Duration(2000)); int numThreads = Integer.parseInt(args[3]); Map topicMap = new HashMap(); String[] topics = args[2].split(","); for (String topic: topics) { topicMap.put(topic, numThreads); } JavaPairReceiverInputDStream messages = KafkaUtils.createStream(jssc, args[0], args[1], topicMap); JavaDStream lines = messages.map(new Function() { @Override public String call(Tuple2 tuple2) { return tuple2._2(); } }); JavaDStream words = lines.flatMap(new FlatMapFunction() { @Override public Iterable call(String x) { return Lists.newArrayList(SPACE.split(x)); } }); JavaPairDStream wordCounts = words.mapToPair( new PairFunction() { @Override public Tuple2 call(String s) { return new Tuple2(s, 1); } }).reduceByKey(new Function2() { @Override public Integer call(Integer i1, Integer i2) { return i1 + i2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); } }
三、上傳到服務器中而後編譯
mvn clean package
四、提交job到spark中
/home/hadoop/spark/bin/spark-submit --jars ../mylib/metrics-core-2.2.0.jar,../mylib/zkclient-0.3.jar,../mylib/spark-streaming-kafka_2.10-1.4.0.jar,../mylib/kafka-clients-0.8.2.1.jar,../mylib/kafka_2.10-0.8.2.1.jar --class com.netease.gdc.kafkaStreaming.JavaKafkaWordCount --master spark://192.168.16.102:7077 target/kafkaStreaming-0.0.1-SNAPSHOT.jar 192.168.172.111:2181/kafka my-consumer-group test 3
固然,前提是kafka集羣已經正常運行,且存在test這個topic
五、驗證
打開一個console producer,輸入內容,而後觀察wordcount的結果。
結果形式以下:
(hi,1)
本部分介紹建立一個spark streaming應用的基本步驟
一、構建依賴關係,以maven爲例,須要在pom.xml中添加如下內容
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.4.0</version>
</dependency>
若是須要使用其它數據源,則還須要將相應的依賴關係放入pom.xml。
如使用kafka做爲數據源:
固然,spark的核心包也要包含:
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-core_2.10</artifactId> <version>1.4.0</version> </dependency>