Spark Streaming 概述+DStream工做原理+與Storm對比+實時WordCount

Spark Streaming簡介

Spark Streaming,其實就是一種Spark提供的,對於大數據,進行實時計算的一種框架。它的底層,其實,也是基於咱們以前講解的Spark Core的。基本的計算模型,仍是基於內存的大數據實時計算模型。並且,它的底層的組件或者叫作概念,其實仍是最核心的RDD。java

只很少,針對實時計算的特色,在RDD之上,進行了一層封裝,叫作DStream。其實,學過了Spark SQL以後,你理解這種封裝就容易了。以前學習Spark SQL是否是也是發現,它針對數據查詢這種應用,提供了一種基於RDD之上的全新概念,DataFrame,可是,其底層仍是基於RDD的。因此,RDD是整個Spark技術生態中的核心。要學好Spark在交互式查詢、實時計算上的應用技術和框架,首先必須學好Spark核心編程,也就是Spark Core。redis

Spark Streaming 相似於 Apache Storm,用於流式數據的處理。根據其官方文檔介紹,Spark Streaming 有高吞吐量和容錯能力強等特色。Spark Streaming 支持的數據輸入源不少,例如:Kafka、Flume、Twitter、ZeroMQ 和簡單的 TCP 套接字等等。數據輸入後能夠用 Spark 的高度抽象,如:map、reduce、join、window 等進行運算。而結果也能保存在不少地方,如 HDFS,數據庫等。另外 Spark Streaming 也能和 MLlib(機器學習)以及 Graphx 完美融合。shell

 


  和 Spark 基於 RDD 的概念很類似,Spark Streaming 使用離散化流(discretized stream)做爲抽象表示,叫做 DStream。DStream 是隨時間推移而收到的數據的序列。在內部,每一個時間區間收到的數據都做爲 RDD 存在,而 DStream 是由這些 RDD 所組成的序列(所以得名「離散化」)。
數據庫

 


  DStream 能夠從各類輸入源建立,好比 Flume、Kafka 或者 HDFS。建立出來的 DStream 支持兩種操做,一種是轉化操做(transformation),會生成一個新的 DStream,另外一種是輸出操做(output operation),能夠把數據寫入外部系統中。DStream 提供了許多與 RDD 所支持的操做相相似的操做支持,還增長了與時間相關的新操做,好比滑動窗口。
apache

 

 

  Spark Streaming 的關鍵抽象


  DStream:Discretized Stream 離散化流
 
編程

 

 基於Spark Streaming的大數據實時計算框架流程

 

 

Spark Streaming基本工做原理

 

DStream簡介

Spark Streaming提供了一種高級的抽象,叫作DStream,英文全稱爲Discretized Stream,中文翻譯爲「離散流」,它表明了一個持續不斷的數據流。DStream能夠經過輸入數據源來建立,好比Kafka、Flume和Kinesis;也能夠經過對其餘DStream應用高階函數來建立,好比map、reduce、join、window。api

DStream的內部,其實一系列持續不斷產生的RDD。RDD是Spark Core的核心抽象,即,不可變的,分佈式的數據集。DStream中的每一個RDD都包含了一個時間段內的數據。緩存

 


對DStream應用的算子,好比map,其實在底層會被翻譯爲對DStream中每一個RDD的操做。好比對一個DStream執行一個map操做,會產生一個新的DStream。可是,在底層,其實其原理爲,對輸入DStream中每一個時間段的RDD,都應用一遍map操做,而後生成的新的RDD,即做爲新的DStream中的那個時間段的一個RDD。底層的RDD的transformation操做,其實,仍是由Spark Core的計算引擎來實現的。Spark Streaming對Spark Core進行了一層封裝,隱藏了細節,而後對開發人員提供了方便易用的高層次的API。網絡

 

 

 

 Spark 與 Storm 的對比

 

 

實時wordcount程序開發:基於Socket

public class WordCount { public static void main(String[] args) throws Exception { // 建立SparkConf對象 // 可是這裏有一點不一樣,咱們是要給它設置一個Master屬性,可是咱們測試的時候使用local模式 // local後面必須跟一個方括號,裏面填寫一個數字,數字表明瞭,咱們用幾個線程來執行咱們的 // Spark Streaming程序 SparkConf conf=new SparkConf().setMaster("local[2]").setAppName("WordCount"); // 建立JavaStreamingContext對象 // 該對象,就相似於Spark Core中的JavaSparkContext,就相似於Spark SQL中的SQLContext // 該對象除了接收SparkConf對象對象以外 // 還必須接收一個batch interval參數,就是說,每收集多長時間的數據,劃分爲一個batch,進行處理 // 這裏設置一秒 JavaStreamingContext jsc=new JavaStreamingContext(conf,Duration.apply(1000)); // 首先,建立輸入DStream,表明了一個從數據源(好比kafka、socket)來的持續不斷的實時數據流 // 調用JavaStreamingContext的socketTextStream()方法,能夠建立一個數據源爲Socket網絡端口的 // 數據流,JavaReceiverInputStream,表明了一個輸入的DStream // socketTextStream()方法接收兩個基本參數,第一個是監聽哪一個主機上的端口,第二個是監聽哪一個端口 JavaReceiverInputDStream lines=jsc.socketTextStream("localhost",9999); // 到這裏爲止,你能夠理解爲JavaReceiverInputDStream中的,每隔一秒,會有一個RDD,其中封裝了 // 這一秒發送過來的數據 // RDD的元素類型爲String,即一行一行的文本 // 因此,這裏JavaReceiverInputStream的泛型類型<String>,其實就表明了它底層的RDD的泛型類型 // 開始對接收到的數據,執行計算,使用Spark Core提供的算子,執行應用在DStream中便可 // 在底層,其實是會對DStream中的一個一個的RDD,執行咱們應用在DStream上的算子 // 產生的新RDD,會做爲新DStream中的RDD JavaDStream<String> words=lines.flatMap(new FlatMapFunction<String, String>() { @Override public Iterator call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); // 這個時候,每秒的數據,一行一行的文本,就會被拆分爲多個單詞,words DStream中的RDD的元素類型 // 即爲一個一個的單詞 // 接着,開始進行flatMap、reduceByKey操做 JavaPairDStream<String,Integer> pairs=words.mapToPair(new PairFunction<String, String, Integer>() { @Override public Tuple2<String, Integer> call(String s) throws Exception { return new Tuple2<String,Integer>(s,1); } }); // 這裏,正好說明一下,其實你們能夠看到,用Spark Streaming開發程序,和Spark Core很相像 // 惟一不一樣的是Spark Core中的JavaRDD、JavaPairRDD,都變成了JavaDStream、JavaPairDStream JavaPairDStream<String,Integer> wordCounts=pairs.reduceByKey(new Function2<Integer, Integer, Integer>() { @Override public Integer call(Integer integer, Integer integer2) throws Exception { return integer+integer2; } }); // 到此爲止,咱們就實現了實時的wordcount程序了 // 你們總結一下思路,加深一下印象 // 每秒中發送到指定socket端口上的數據,都會被lines DStream接收到 // 而後lines DStream會把每秒的數據,也就是一行一行的文本,諸如hell world,封裝爲一個RDD // 而後呢,就會對每秒中對應的RDD,執行後續的一系列的算子操做 // 好比,對lins RDD執行了flatMap以後,獲得一個words RDD,做爲words DStream中的一個RDD // 以此類推,直到生成最後一個,wordCounts RDD,做爲wordCounts DStream中的一個RDD // 此時,就獲得了,每秒鐘發送過來的數據的單詞統計 // 可是,必定要注意,Spark Streaming的計算模型,就決定了,咱們必須本身來進行中間緩存的控制 // 好比寫入redis等緩存 // 它的計算模型跟Storm是徹底不一樣的,storm是本身編寫的一個一個的程序,運行在節點上,至關於一個 // 一個的對象,能夠本身在對象中控制緩存 // 可是Spark自己是函數式編程的計算模型,因此,好比在words或pairs DStream中,無法在實例變量中 // 進行緩存 // 此時就只能將最後計算出的wordCounts中的一個一個的RDD,寫入外部的緩存,或者持久化DB // 最後,每次計算完,都打印一下這一秒鐘的單詞計數狀況 // 並休眠5秒鐘,以便於咱們測試和觀察 Thread.sleep(5000); wordCounts.print(); // 首先對JavaSteamingContext進行一下後續處理 // 必須調用JavaStreamingContext的start()方法,整個Spark Streaming Application纔會啓動執行 // 不然是不會執行的  jsc.start(); jsc.awaitTermination(); jsc.close(); } }

 

 運行的shell腳本以下

 


~/bigdatasoftware/spark-2.1.3-bin-hadoop2.7/bin/spark-submit \
--class com.hzk.sparkStreaming.WordCount \
--driver-java-options "-Dspark.testing.memory=471859200" \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 512m \
--executor-cores 3 \
~/bigdatasoftware/spark-2.1.3-bin-hadoop2.7/study/SparkStudy-1.0-SNAPSHOT.jar \app


 若是executor-memory不夠大的話,有可能會報錯:Spark-submit:System memory 466092032 must be at least 471859200

運行shell腳本,且啓動netcat監聽:nc -lk 9999

 

 實時計算結果以下

 

實時wordcount程序開發:基於HDFS

 

package com.hzk.sparkStreaming; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import scala.Tuple2; import java.util.Arrays; import java.util.Iterator; public class HDFSWordCount { public static void main(String[] args) throws InterruptedException { SparkConf conf = new SparkConf() .setMaster("local[2]") .setAppName("HDFSWordCount"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5)); // 首先,使用JavaStreamingContext的textFileStream()方法,針對HDFS目錄建立輸入數據流
        JavaDStream<String> lines = jssc.textFileStream("hdfs://hadoop-001:9000/datas"); // 執行wordcount操做
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() { private static final long serialVersionUID = 1L; @Override public Iterator<String> call(String line) throws Exception { return Arrays.asList(line.split(" ")).iterator(); } }); JavaPairDStream<String, Integer> pairs = words.mapToPair( new PairFunction<String, String, Integer>() { private static final long serialVersionUID = 1L; @Override public Tuple2<String, Integer> call(String word) throws Exception { return new Tuple2<String, Integer>(word, 1); } }); JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey( new Function2<Integer, Integer, Integer>() { private static final long serialVersionUID = 1L; @Override public Integer call(Integer v1, Integer v2) throws Exception { return v1 + v2; } }); wordCounts.print(); jssc.start(); jssc.awaitTermination(); jssc.close(); } }

 

 

 

運行shell腳本以下

 

 


~/bigdatasoftware/spark-2.1.3-bin-hadoop2.7/bin/spark-submit \

--class com.hzk.sparkStreaming.HDFSWordCount \
--driver-java-options "-Dspark.testing.memory=471859200" \
--num-executors 3 \
--driver-memory 100m \
--executor-memory 512m \
--executor-cores 3 \
~/bigdatasoftware/spark-2.1.3-bin-hadoop2.7/study/SparkStudy-1.0-SNAPSHOT.jar \


 

運行shell腳本後,將文本put進hdfs


hadoop fs -put ./wc.txt /datas


 

結果以下

相關文章
相關標籤/搜索