Spark Streaming,其實就是一種Spark提供的,對於大數據,進行實時計算的一種框架。它的底層,其實,也是基於咱們以前講解的Spark Core的。基本的計算模型,仍是基於內存的大數據實時計算模型。並且,它的底層的組件或者叫作概念,其實仍是最核心的RDD。java
只很少,針對實時計算的特色,在RDD之上,進行了一層封裝,叫作DStream。其實,學過了Spark SQL以後,你理解這種封裝就容易了。以前學習Spark SQL是否是也是發現,它針對數據查詢這種應用,提供了一種基於RDD之上的全新概念,DataFrame,可是,其底層仍是基於RDD的。因此,RDD是整個Spark技術生態中的核心。要學好Spark在交互式查詢、實時計算上的應用技術和框架,首先必須學好Spark核心編程,也就是Spark Core。redis
Spark Streaming 相似於 Apache Storm,用於流式數據的處理。根據其官方文檔介紹,Spark Streaming 有高吞吐量和容錯能力強等特色。Spark Streaming 支持的數據輸入源不少,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後能夠用 Spark 的高度抽象,如:map、reduce、join、window 等進行運算。而結果也能保存在不少地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。shell
和 Spark 基於 RDD 的概念很類似,Spark Streaming 使用離散化流(discretized stream)做爲抽象表示,叫做 DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每一個時間區間收到的數據都做爲 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(所以得名「離散化」)。數據庫
DStream 能夠從各類輸入源建立,好比 Flume、Kafka 或者 HDFS。建立出來的 DStream 支持兩種操做,一種是轉化操做(transformation),會生成一個新的 DStream,另外一種是輸出操做(output operation),能夠把數據寫入外部系統中。DStream 提供了許多與 RDD 所支持的操做相相似的操做支持,還增長了與時間相關的新操做,好比滑動窗口。apache
DStream:Discretized Stream 離散化流
編程
Spark Streaming提供了一種高級的抽象,叫作DStream,英文全稱爲Discretized Stream,中文翻譯爲「離散流」,它表明了一個持續不斷的數據流。DStream能夠經過輸入數據源來建立,好比Kafka、Flume和Kinesis;也能夠經過對其餘DStream應用高階函數來建立,好比map、reduce、join、window。api
DStream的內部,其實一系列持續不斷產生的RDD。RDD是Spark Core的核心抽象,即,不可變的,分佈式的數據集。DStream中的每一個RDD都包含了一個時間段內的數據。緩存
對DStream應用的算子,好比map,其實在底層會被翻譯爲對DStream中每一個RDD的操做。好比對一個DStream執行一個map操做,會產生一個新的DStream。可是,在底層,其實其原理爲,對輸入DStream中每一個時間段的RDD,都應用一遍map操做,而後生成的新的RDD,即做爲新的DStream中的那個時間段的一個RDD。底層的RDD的transformation操做,其實,仍是由Spark Core的計算引擎來實現的。Spark Streaming對Spark Core進行了一層封裝,隱藏了細節,而後對開發人員提供了方便易用的高層次的API。網絡
public class WordCount { public static void main(String[] args) throws Exception { // 建立SparkConf對象 // 可是這裏有一點不一樣,咱們是要給它設置一個Master屬性,可是咱們測試的時候使用local模式 // local後面必須跟一個方括號,裏面填寫一個數字,數字表明瞭,咱們用幾個線程來執行咱們的 // Spark Streaming程序 SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("WordCount"); // 建立JavaStreamingContext對象 // 該對象,就相似於Spark Core中的JavaSparkContext,就相似於Spark SQL中的SQLContext // 該對象除了接收SparkConf對象對象以外 // 還必須接收一個batch interval參數,就是說,每收集多長時間的數據,劃分爲一個batch,進行處理 // 這裏設置一秒 JavaStreamingContext jsc=new JavaStreamingContext(conf,Duration.apply(1000)); // 首先,建立輸入DStream,表明了一個從數據源(好比kafka、socket)來的持續不斷的實時數據流 // 調用JavaStreamingContext的socketTextStream()方法,能夠建立一個數據源爲Socket網絡端口的 // 數據流,JavaReceiverInputStream,表明了一個輸入的DStream // socketTextStream()方法接收兩個基本參數,第一個是監聽哪一個主機上的端口,第二個是監聽哪一個端口 JavaReceiverInputDStream lines=jsc.socketTextStream("localhost",9999); // 到這裏爲止,你能夠理解爲JavaReceiverInputDStream中的,每隔一秒,會有一個RDD,其中封裝了 // 這一秒發送過來的數據 // RDD的元素類型爲String,即一行一行的文本 // 因此,這裏JavaReceiverInputStream的泛型類型<String>,其實就表明了它底層的RDD的泛型類型 // 開始對接收到的數據,執行計算,使用Spark Core提供的算子,執行應用在DStream中便可 // 在底層,其實是會對DStream中的一個一個的RDD,執行咱們應用在DStream上的算子 // 產生的新RDD,會做爲新DStream中的RDD JavaDStream<String> words=lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); // 這個時候,每秒的數據,一行一行的文本,就會被拆分爲多個單詞,words DStream中的RDD的元素類型 // 即爲一個一個的單詞 // 接着,開始進行flatMap、reduceByKey操做 JavaPairDStream<String,Integer> pairs=words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }); // 這裏,正好說明一下,其實你們能夠看到,用Spark Streaming開發程序,和Spark Core很相像 // 惟一不一樣的是Spark Core中的JavaRDD、JavaPairRDD,都變成了JavaDStream、JavaPairDStream JavaPairDStream<String,Integer> wordCounts=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); // 到此爲止,咱們就實現了實時的wordcount程序了 // 你們總結一下思路,加深一下印象 // 每秒中發送到指定socket端口上的數據,都會被lines DStream接收到 // 而後lines DStream會把每秒的數據,也就是一行一行的文本,諸如hell world,封裝爲一個RDD // 而後呢,就會對每秒中對應的RDD,執行後續的一系列的算子操做 // 好比,對lins RDD執行了flatMap以後,獲得一個words RDD,做爲words DStream中的一個RDD // 以此類推,直到生成最後一個,wordCounts RDD,做爲wordCounts DStream中的一個RDD // 此時,就獲得了,每秒鐘發送過來的數據的單詞統計 // 可是,必定要注意,Spark Streaming的計算模型,就決定了,咱們必須本身來進行中間緩存的控制 // 好比寫入redis等緩存 // 它的計算模型跟Storm是徹底不一樣的,storm是本身編寫的一個一個的程序,運行在節點上,至關於一個 // 一個的對象,能夠本身在對象中控制緩存 // 可是Spark自己是函數式編程的計算模型,因此,好比在words或pairs DStream中,無法在實例變量中 // 進行緩存 // 此時就只能將最後計算出的wordCounts中的一個一個的RDD,寫入外部的緩存,或者持久化DB // 最後,每次計算完,都打印一下這一秒鐘的單詞計數狀況 // 並休眠5秒鐘,以便於咱們測試和觀察 Thread.sleep(5000); wordCounts.print(); // 首先對JavaSteamingContext進行一下後續處理 // 必須調用JavaStreamingContext的start()方法,整個Spark Streaming Application纔會啓動執行 // 不然是不會執行的 jsc.start(); jsc.awaitTermination(); jsc.close(); } }
~/bigdatasoftware/spark-2.1.3-bin-hadoop2.7/bin/spark-submit \
--class com.hzk.sparkStreaming.WordCount \
--driver-java-options "-Dspark.testing.memory=471859200" \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 512m \
--executor-cores 3 \
~/bigdatasoftware/spark-2.1.3-bin-hadoop2.7/study/SparkStudy-1.0-SNAPSHOT.jar \app
若是executor-memory不夠大的話,有可能會報錯:Spark-submit:System memory 466092032 must be at least 471859200
運行shell腳本,且啓動netcat監聽:nc -lk 9999
實時計算結果以下
package com.hzk.sparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class HDFSWordCount { public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("HDFSWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // 首先,使用JavaStreamingContext的textFileStream()方法,針對HDFS目錄建立輸入數據流 JavaDStream<String> lines = jssc.textFileStream("hdfs://hadoop-001:9000/datas"); // 執行wordcount操做 JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }
運行shell腳本以下
~/bigdatasoftware/spark-2.1.3-bin-hadoop2.7/bin/spark-submit \
--class com.hzk.sparkStreaming.HDFSWordCount \
--driver-java-options "-Dspark.testing.memory=471859200" \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 512m \
--executor-cores 3 \
~/bigdatasoftware/spark-2.1.3-bin-hadoop2.7/study/SparkStudy-1.0-SNAPSHOT.jar \
運行shell腳本後,將文本put進hdfs
hadoop fs -put ./wc.txt /datas
結果以下