Spark(十):Spark Streaming

一:Spark Streaming 概覽。html

1.1    簡單瞭解 Spark Streaming。java

         Spark Streaming 是核心 Spark API的一個擴展。具備可擴展性,高吞吐量,容錯性,實時性等特徵。算法

        數據從許多來如中攝入數據,如 Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets。數據庫

也能夠使用複雜的算法與高級別的功能像map,reduce,join和window處理。apache

        最後,也能夠將處理過的數據推送到文件系統、數據庫。事實上,咱們也能夠用Spark 的機器學習和圖形處理數據流上的算法。用圖形表示以下:
        在內部,其工做原理以下。Spark Streaming接收實時輸入的數據流和數據劃分批次,而後由Spark引擎批處理生成的最終結果流。如圖示:  api

      另外,Spark Streaming提供一個高級抽象,稱爲離散的流或 DStream,表示連續的流的數據。DStreams 能夠被建立從輸入的數據流,如Kafka, Flume, and Kinesis,服務器

        或採用其餘的DStreams高級別操做的輸入的數據流。機器學習

        在內部,DStream 是以 RDDs 的序列來表示。socket

首先,看看Maven的依賴包(spark-streaming_2.10)管理:oop

<dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.6.1</version>
		</dependency>

1.2    eg:從一個數據服務器監聽 TCP 套接字接收的文本數據中的單詞進行計數

package com.berg.spark.test5.streaming;

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

public class SparkStreamingDemo1 {

	public static void main(String[] args) {

		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		conf.set("spark.testing.memory", "269522560000");

		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
		System.out.println("jssc: " + jssc);

		// 建立一個DStream, 將鏈接 hostname:port, 好比 master:9999
		JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);
		System.out.println("lines : " + lines);

		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

			private static final long serialVersionUID = 1L;

			public Iterable<String> call(String x) {
				return Arrays.asList(x.split(" "));
			}
		});

		// Count each word in each batch
		JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});
		JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});

		// Print the first ten elements of each RDD generated in this DStream to
		// the console
		wordCounts.print();

		jssc.start();  // Start the computation
		jssc.awaitTermination();  // Wait for the computation to terminate
	}
}

至於代碼如何運行了,首先在Linux下終端輸入:$ nc -lk 9999

而後在Eclipse中運行代碼 。

隨意輸入一行文本單詞,單詞之間用空格隔開,以下:

hadoop@master:~$ nc -lk 9999
berg hello world berg hello

能夠在Eclipse控制檯看到以下結果:

Time: 1465386060000 ms
-------------------------------------------
(hello,2)
(berg,2)
(world,1)

 

1.3 將HDFS目錄下的某些文件內容當作 輸入的數據流。

public class SparkStreamingDemo2 {

	public static void main(String[] args) {

		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		conf.set("spark.testing.memory", "269522560000");

		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
		System.out.println("jssc: " + jssc);

		// 建立一個DStream, 讀取HDFS上的文件,做爲數據源。
		JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/txt/sparkstreaming/");

		System.out.println("lines : " + lines);

		// Split each line into words
		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;

			public Iterable<String> call(String x) {
				System.out.println(Arrays.asList(x.split(" ")).get(0));
				return Arrays.asList(x.split(" "));
			}
		});

		// Count each word in each batch
		JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			private static final long serialVersionUID = 1L;

			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});
		System.out.println(pairs);
		
		JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});

		// Print the first ten elements of each RDD generated in this DStream to the console
		wordCounts.print();
		
		JavaDStream<Long> count = wordCounts.count();
		count.print(); // 統計
		
		DStream<Tuple2<String, Integer>>  dstream = wordCounts.dstream();
		dstream.saveAsTextFiles("hdfs://master:9000/bigdata/spark/xxxx", "sparkstreaming");
		//wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/bigdata/spark/xxxx", "sparkstreaming");
		
		jssc.start();  
		jssc.awaitTermination();   // Wait for the computation to terminate
	}
}

上述代碼完成的操做是,一直監聽HDFS即hdfs://master:9000/txt/sparkstreaming/目錄下是否有文件存入,若是有,則統計文件中的單詞。。。。

嘗試運行程序後,而後往該目錄中手動添加一個文件,會在控制檯看到對該文件內容中的單詞統計後的數據。

注意參數的意義:


 public JavaDStream<java.lang.String> textFileStream(java.lang.String directory)
  Create an input stream that monitors a Hadoop-compatible filesystem for 
             new files and reads them as text 
                      files (using key as LongWritable, value as Text and input format as TextInputFormat).
                 Files must be written to the monitored directory 
                 by "moving" them from another location within the same file system. 
                 File names starting with . are ignored.
 Parameters:
 directory - HDFS directory to monitor for new file
 Returns:
 (undocumented)
 

參照官網 和 API學習。

http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html

相關文章
相關標籤/搜索