Spark Streaming初步使用以及工做原理詳解

在大數據的各類框架中,hadoop無疑是大數據的主流,可是隨着電商企業的發展,hadoop只適用於一些離線數據的處理,沒法應對一些實時數據的處理分析,咱們須要一些實時計算框架來分析數據。所以出現了不少流式實時計算框架,好比Storm,Spark Streaming,Samaz等框架,本文主要講解Spark Streaming的工做原理以及如何使用。html

1、流式計算

1.什麼是流?前端

Streaming:是一種數據傳送技術,它把客戶機收到的數據變成一個穩定連續的
流,源源不斷地送出,使用戶聽到的聲音或看到的圖象十分平穩,並且用戶在
整個文件送完以前就能夠開始在屏幕上瀏覽文件。node

2.常見的流式計算框架算法

  • Apache Storm
  • Spark Streaming
  • Apache Samza

上述三種實時計算系統都是開源的分佈式系統,具備低延遲、可擴展和容錯性
諸多優勢,它們的共同特點在於:容許你在運行數據流代碼時,將任務分配到
一系列具備容錯能力的計算機上並行運行。此外,它們都提供了簡單的API來
簡化底層實現的複雜程度。shell

對於上面的三種流使計算框架的比較能夠參考這篇文章流式大數據處理的三種框架:Storm,Spark和Samza數據庫

2、Spark Streaming

1.Spark Streaming介紹express

Spark Streaming是Spark生態系統當中一個重要的框架,它創建在Spark Core之上,下面這幅圖也能夠看出Sparking Streaming在Spark生態系統中地位。
這裏寫圖片描述apache

官方對於Spark Streaming的解釋以下:編程

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Data can be ingested from many sources like Kafka, Flume, Twitter, ZeroMQ, Kinesis or TCP sockets can be processed using complex algorithms expressed with high-level functions like map, reduce, join and window. Finally, processed data can be pushed out to filesystems, databases, and live dashboards. In fact, you can apply Spark’s machine learning and graph processing algorithms on data streams.markdown

這裏寫圖片描述

Spark Streaming是Spark Core的擴展應用,它具備可擴展,高吞吐量,對於流數據的可容錯性等特色。能夠監控來自Kafka,Flumn,HDFS。Kinesis,Twitter,ZeroMQ或者Scoket套接字的數據經過複雜的算法以及一系列的計算分析數據,而且能夠將分析結果存入到HDFS文件系統,數據庫以及前端頁面中。

這裏寫圖片描述
Spark Streaming有如下特色

  • 高可擴展性,能夠運行在上百臺機器上(Scales to hundreds of nodes)
  • 低延遲,能夠在秒級別上對數據進行處理(Achieves low latency)
  • 高可容錯性(Efficiently recover from failures)
  • 可以集成並行計算程序,好比Spark Core(Integrates with batch and interactive processing)

2.Spark Streaming工做原理
對於Spark Core它的核心就是RDD,對於Spark Streaming來講,它的核心是DStream,DStream相似於RDD,它實質上一系列的RDD的集合,DStream能夠按照秒數將數據流進行批量的劃分。首先從接收到流數據以後,將其劃分爲多個batch,而後提交給Spark集羣進行計算,最後將結果批量輸出到HDFS或者數據庫以及前端頁面展現等等。能夠參考下面這幅圖來幫助理解:
這裏寫圖片描述

對於DStream如何理解呢?它是一系列連續的RDD,它是創建在Spark之上的不可變的,分佈式數據集,在DStream中的每個RDD包含着必定時間間隔的數據,以下圖所示:
這裏寫圖片描述
這裏寫圖片描述

那麼,Spark Streaming的工做原理是什麼呢?它是怎麼運行在集羣上的呢?其原理架構圖以下所示:
這裏寫圖片描述

咱們都知道Spark Core在初始化時會生成一個SparkContext對象來對數據進行後續的處理,相對應的Spark Streaming會建立一個Streaming Context,它的底層是SparkContext,也就是說它會將任務提交給SparkContext來執行,這也很好的解釋了DStream是一系列的RDD。當啓動Spark Streaming應用的時候,首先會在一個節點的Executor上啓動一個Receiver接受者,而後當從數據源寫入數據的時候會被Receiver接收,接收到數據以後Receiver會將數據Split成不少個block,而後備份到各個節點(Replicate Blocks 容災恢復),而後Receiver向StreamingContext進行塊報告,說明數據在那幾個節點的Executor上,接着在必定間隔時間內StreamingContext會將數據處理爲RDD而且交給SparkContext劃分到各個節點進行並行計算。

3.Spark Streaming Demo

介紹完Spark Streaming的基本原理以後,下面來看看如何運行Spark Streaming,官方給出了一個例子,從Socket源端監控收集數據運行wordcount的案例,案例很簡單,這裏再也不說明,讀者可參考官方文檔【http://spark.apache.org/docs/1.3.0/streaming-programming-guide.html】

對於Spark Streaming的編程模型有兩種方式

第一種:經過SparkConf來建立SparkStreaming

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 
val conf=new SparkConf().setAppName("SparkStreamingDemo").setMaster("master")
val scc=new StreamingContext(conf,Seconds(1)) //每一個1秒鐘檢測一次數據
  • 1
  • 2
  • 3
  • 4
  • 5

第二種:經過SparkContext來建立,也就是在Spark-Shell命令行運行:

import org.apache.spark.streaming._
val scc=new StreamingContext(sc,Seconds(1))
  • 1
  • 2

固然,咱們也能夠收集來自HDFS文件系統中數據,查閱Spark的源碼,能夠發現以下方法:
這裏寫圖片描述
這個方法會監控指定HDFS文件目錄下的數據,不過忽略以「.」開頭的文件,也就是不會收集以「.」開頭的文件進行數據的處理。

下面介紹一下如何從HDFS文件系統上監控數據運行wordcount案例統計單詞數而且將結果打印出來的案例:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ 

val ssc = new StreamingContext(sc, Seconds(5))

// read data
val lines = ssc.textFileStream("hdfs://hadoop-senior.shinelon.com:8020/user/shinelon/spark/streaming/")

// process
val words = lines.flatMap(_.split(" "))
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
wordCounts.print()
 
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

上面程序會每一個5秒鐘檢測一下HDFS文件系統下的hdfs://hadoop-senior.shinelon.com:8020/user/shinelon/spark/streaming/目錄是否有新的數據,若是有就進行統計,而後將結果打印在控制檯。運行上面代碼有兩種方式,能夠運行Spark-shell客戶端後將上述命令一條條粘貼到命令行執行,顯然這樣很麻煩;第二種就是將上面的程序寫入到一個腳本文件中加載到Spark-shell命令行中執行,這裏採用第二種方式:
在一個目錄下建立SparkStreamingDemo.scala文件,內容如上面的代碼所示。而後啓動Spark-shell客戶端。

$ bin/spark-shell --master local[2]
  • 1

而後加載Spark Streaming應用:

scala>:load /opt/cdh-5.3.6/spark-1.3.0-bin-2.5.0-cdh5.3.6/SparkStreamingDemo.scala
  • 1

而後上傳數據到上述HDFS文件目錄下:

$ bin/hdfs dfs -put /opt/datas/wc.input /user/shinelon/spark/streaming/input7
  • 1

該文件內容以下所示:

hadoop hive
hadoop hbase
hadoop yarn
hadoop hdfs
hdfs spark
  • 1
  • 2
  • 3
  • 4
  • 5

運行結果以下所示:
這裏寫圖片描述

一般對於一個Spark Streaming的應用程序的編寫分下面幾步:

  1. 定義一個輸入流源,好比獲取socket端的數據,HDFS,kafka中數據等等
  2. 定義一系列的處理轉換操做,好比上面的map,reduce操做等等,Spark Streaming也有相似於SparkCore的transformation操做
  3. 啓動程序收集數據(start())
  4. 等待程序中止(遇到錯誤終止或者手動中止awaitTermination())
  5. 手動終止應用程序(stop())

可使用saveAsTextFiles()方法將結果輸出到HDFS文件系統上,讀者能夠自行試驗將結果存入HDFS文件系統中。

最後,介紹一下Spark Streaming應用程序開發的幾種常見方式:

  1. Spark Shell Code:開發、測試(上面提到過,將代碼一條條粘貼到命令行執行,這種方式只適用於測試)
  2. Spark Shell Load Scripts:開發、測試(編寫scala腳本到spark-shell中執行)
  3. IDE Develop App:開發、測試、打包JAR(生產環境),spark-submit提交應用程序
相關文章
相關標籤/搜索