Spark Streaming概述

             Spark Streaming概述html

                                     做者:尹正傑算法

版權聲明:原創做品,謝絕轉載!不然將追究法律責任。數據庫

 

 

 

一.Spark Streaming概覽apache

1>.什麼是Spark Streaming架構

  Spark Streaming是核心Spark API的擴展,可實現實時數據流的可伸縮,高吞吐量,容錯流處理。

  以下圖所示,Spark Streaming支持的數據輸入源不少,例如:Kafka、Flume、Twitter、ZeroMQ,Kinesis,HDFS,簡單的TCP套接字,甚至你還能夠自定義數據源等等。

  數據輸入後能夠用Spark的高度抽象原語如:map、reduce、join、window等進行運算。

  最後,能夠將處理後的數據推送到文件系統,數據庫和實時儀表板。

  實際上,您能夠在數據流上應用Spark的機器學習和圖形處理算法。

  在內部,它的工做方式以下。Spark Streaming接收實時輸入數據流,並將數據分爲批次,而後由Spark引擎進行處理,以生成批次的最終結果流。

  和Spark基於RDD的概念很類似,Spark Streaming使用離散化流(discretized stream)做爲抽象表示,叫做DStream。DStream是隨時間推移而收到的數據的序列。在內部,每一個時間區間收到的數據都做爲 RDD 存在,而DStream是由這些RDD所組成的序列(所以得名「離散化」)。

  Spark Streaming提供了稱爲離散流或DStream的高級抽象,它表示連續的數據流。DStreams能夠根據來自Kafka和Kinesis等來源的輸入數據流來建立,也能夠經過對其餘DStreams應用高級操做來建立。在內部,DStream表示爲RDD序列。

  博主推薦閱讀:
    http://spark.apache.org/docs/latest/streaming-programming-guide.html

2>.Spark Streaming的特色機器學習

  易用:
    Spark Streaming將Apache Spark的 語言集成API 引入流處理,使您能夠像編寫批處理做業同樣編寫流做業。它支持Java,Scala和Python。

  容錯:
    Spark Streaming能夠當即恢復丟失的工做和操做員狀態(例如,滑動窗口),而無需任何額外的代碼。

  易整合到Spark體系:
    經過在Spark上運行,Spark Streaming可以讓您將相同的代碼重用於批處理,針對歷史數據加入流或對流狀態運行臨時查詢。構建功能強大的交互式應用程序,而不單單是分析。

3>.Spark Streaming架構socket

 

 

二.DStream入門案例(wordcount)ide

  需求說明:
    使用netcat工具向8888端口不斷的發送數據,經過SparkStreaming讀取端口數據並統計不一樣單詞出現的次數。

1>.添加依賴關係工具

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.1.1</version>
</dependency>

2>.安裝netcat工具並監聽相應端口oop

[root@hadoop101.yinzhengjie.org.cn ~]# yum -y install nc
Loaded plugins: fastestmirror
Loading mirror speeds from cached hostfile
 * base: mirrors.aliyun.com
 * extras: mirrors.aliyun.com
 * updates: mirror.bit.edu.cn
Resolving Dependencies
--> Running transaction check
---> Package nmap-ncat.x86_64 2:6.40-19.el7 will be installed
--> Finished Dependency Resolution

Dependencies Resolved

==============================================================================================================================================================================================================================================================================
 Package                                                           Arch                                                           Version                                                                  Repository                                                    Size
==============================================================================================================================================================================================================================================================================
Installing:
 nmap-ncat                                                         x86_64                                                         2:6.40-19.el7                                                            base                                                         206 k

Transaction Summary
==============================================================================================================================================================================================================================================================================
Install  1 Package

Total download size: 206 k
Installed size: 423 k
Downloading packages:
nmap-ncat-6.40-19.el7.x86_64.rpm                                                                                                                                                                                                                       | 206 kB  00:00:00     
Running transaction check
Running transaction test
Transaction test succeeded
Running transaction
  Installing : 2:nmap-ncat-6.40-19.el7.x86_64                                                                                                                                                                                                                             1/1 
  Verifying  : 2:nmap-ncat-6.40-19.el7.x86_64                                                                                                                                                                                                                             1/1 

Installed:
  nmap-ncat.x86_64 2:6.40-19.el7                                                                                                                                                                                                                                              

Complete!
[root@hadoop101.yinzhengjie.org.cn ~]# 
[root@hadoop101.yinzhengjie.org.cn ~]# yum -y install nc  
[root@hadoop101.yinzhengjie.org.cn ~]# nc -lk 8888          #監聽端口

3>.編寫wordcount代碼

package com.yinzhengjie.bigdata.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object WordCount {
  def main(args: Array[String]): Unit = {

    /**
      *   1>.初始化Spark配置信息
      */
    val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount")

    /**
      *   2>.初始化SparkStreamingContext(實時數據分析環境對象)
      *
      *   自定義採集週期:
      *     以指定的時間爲週期採集實時數據。我這裏指定採集週期是5秒.生產環境中咱們能夠將這個值改小,好比每秒採集一次.
      */
    val ssc = new StreamingContext(sparkConf, Seconds(5))

    /**
      *   3>.經過監控端口建立DStream,讀進來的數據爲一行行(即從指定端口中採集數據)
      */
    val socketLineDStream:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101.yinzhengjie.org.cn", 8888)

    /**
      *   4>.將採集的數據進行扁平化操做(即將每一行數據作切分,造成一個個單詞)
      */
    val wordDStreams:DStream[String] = socketLineDStream.flatMap(_.split(" "))

    /**
      *   5>.將數據進行結構的轉換方便統計分析(即將單詞映射成元組(word,1))
      */
    val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1))

    /**
      *   6>.將相同的單詞次數作統計
      */
    val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_)

    /**
      *   7>.將結果打印出來
      */
    wordToCountDStream.print()

    /**
      *   8>.啓動(SparkStreamingContext)採集器
      */
    ssc.start()

    /**
      *   9>.Driver等待採集器的執行(即禁止main線程主動退出)
      */
    ssc.awaitTermination()

    /**
      *   舒適提示:
      *       我們的程序是實時處理數據的,所以生產環境中不能中止採集程序,所以不建議使用喲~
      */
    //    ssc.stop()

  }
}

 

三.博主推薦閱讀

  Spark Streaming-DStream實戰案例:
    https://www.cnblogs.com/yinzhengjie2020/p/13233192.html
相關文章
相關標籤/搜索