Spark Streaming概述html
做者:尹正傑算法
版權聲明:原創做品,謝絕轉載!不然將追究法律責任。數據庫
一.Spark Streaming概覽apache
1>.什麼是Spark Streaming架構
Spark Streaming是核心Spark API的擴展,可實現實時數據流的可伸縮,高吞吐量,容錯流處理。
以下圖所示,Spark Streaming支持的數據輸入源不少,例如:Kafka、Flume、Twitter、ZeroMQ,Kinesis,HDFS,簡單的TCP套接字,甚至你還能夠自定義數據源等等。
數據輸入後能夠用Spark的高度抽象原語如:map、reduce、join、window等進行運算。
最後,能夠將處理後的數據推送到文件系統,數據庫和實時儀表板。
實際上,您能夠在數據流上應用Spark的機器學習和圖形處理算法。
在內部,它的工做方式以下。Spark Streaming接收實時輸入數據流,並將數據分爲批次,而後由Spark引擎進行處理,以生成批次的最終結果流。
和Spark基於RDD的概念很類似,Spark Streaming使用離散化流(discretized stream)做爲抽象表示,叫做DStream。DStream是隨時間推移而收到的數據的序列。在內部,每一個時間區間收到的數據都做爲 RDD 存在,而DStream是由這些RDD所組成的序列(所以得名「離散化」)。 Spark Streaming提供了稱爲離散流或DStream的高級抽象,它表示連續的數據流。DStreams能夠根據來自Kafka和Kinesis等來源的輸入數據流來建立,也能夠經過對其餘DStreams應用高級操做來建立。在內部,DStream表示爲RDD序列。 博主推薦閱讀: http://spark.apache.org/docs/latest/streaming-programming-guide.html
2>.Spark Streaming的特色機器學習
易用:
Spark Streaming將Apache Spark的 語言集成API 引入流處理,使您能夠像編寫批處理做業同樣編寫流做業。它支持Java,Scala和Python。
容錯:
Spark Streaming能夠當即恢復丟失的工做和操做員狀態(例如,滑動窗口),而無需任何額外的代碼。
易整合到Spark體系:
經過在Spark上運行,Spark Streaming可以讓您將相同的代碼重用於批處理,針對歷史數據加入流或對流狀態運行臨時查詢。構建功能強大的交互式應用程序,而不單單是分析。
3>.Spark Streaming架構socket
二.DStream入門案例(wordcount)ide
需求說明:
使用netcat工具向8888端口不斷的發送數據,經過SparkStreaming讀取端口數據並統計不一樣單詞出現的次數。
1>.添加依賴關係工具
<dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming_2.11</artifactId> <version>2.1.1</version> </dependency>
2>.安裝netcat工具並監聽相應端口oop
[root@hadoop101.yinzhengjie.org.cn ~]# yum -y install nc Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile * base: mirrors.aliyun.com * extras: mirrors.aliyun.com * updates: mirror.bit.edu.cn Resolving Dependencies --> Running transaction check ---> Package nmap-ncat.x86_64 2:6.40-19.el7 will be installed --> Finished Dependency Resolution Dependencies Resolved ============================================================================================================================================================================================================================================================================== Package Arch Version Repository Size ============================================================================================================================================================================================================================================================================== Installing: nmap-ncat x86_64 2:6.40-19.el7 base 206 k Transaction Summary ============================================================================================================================================================================================================================================================================== Install 1 Package Total download size: 206 k Installed size: 423 k Downloading packages: nmap-ncat-6.40-19.el7.x86_64.rpm | 206 kB 00:00:00 Running transaction check Running transaction test Transaction test succeeded Running transaction Installing : 2:nmap-ncat-6.40-19.el7.x86_64 1/1 Verifying : 2:nmap-ncat-6.40-19.el7.x86_64 1/1 Installed: nmap-ncat.x86_64 2:6.40-19.el7 Complete! [root@hadoop101.yinzhengjie.org.cn ~]#
[root@hadoop101.yinzhengjie.org.cn ~]# nc -lk 8888 #監聽端口
3>.編寫wordcount代碼
package com.yinzhengjie.bigdata.spark.streaming import org.apache.spark.SparkConf import org.apache.spark.streaming.dstream.{DStream, ReceiverInputDStream} import org.apache.spark.streaming.{Seconds, StreamingContext} object WordCount { def main(args: Array[String]): Unit = { /** * 1>.初始化Spark配置信息 */ val sparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStreamingWordCount") /** * 2>.初始化SparkStreamingContext(實時數據分析環境對象) * * 自定義採集週期: * 以指定的時間爲週期採集實時數據。我這裏指定採集週期是5秒.生產環境中咱們能夠將這個值改小,好比每秒採集一次. */ val ssc = new StreamingContext(sparkConf, Seconds(5)) /** * 3>.經過監控端口建立DStream,讀進來的數據爲一行行(即從指定端口中採集數據) */ val socketLineDStream:ReceiverInputDStream[String] = ssc.socketTextStream("hadoop101.yinzhengjie.org.cn", 8888) /** * 4>.將採集的數據進行扁平化操做(即將每一行數據作切分,造成一個個單詞) */ val wordDStreams:DStream[String] = socketLineDStream.flatMap(_.split(" ")) /** * 5>.將數據進行結構的轉換方便統計分析(即將單詞映射成元組(word,1)) */ val wordAndOneStreams:DStream[(String,Int)] = wordDStreams.map((_, 1)) /** * 6>.將相同的單詞次數作統計 */ val wordToCountDStream:DStream[(String,Int)] = wordAndOneStreams.reduceByKey(_+_) /** * 7>.將結果打印出來 */ wordToCountDStream.print() /** * 8>.啓動(SparkStreamingContext)採集器 */ ssc.start() /** * 9>.Driver等待採集器的執行(即禁止main線程主動退出) */ ssc.awaitTermination() /** * 舒適提示: * 我們的程序是實時處理數據的,所以生產環境中不能中止採集程序,所以不建議使用喲~ */ // ssc.stop() } }
三.博主推薦閱讀
Spark Streaming-DStream實戰案例: https://www.cnblogs.com/yinzhengjie2020/p/13233192.html