Spark Streaming之一:總體介紹

提到Spark Streaming,咱們不得不說一下BDAS(Berkeley Data Analytics Stack),這個伯克利大學提出的關於數據分析的軟件棧。從它的視角來看,目前的大數據處理能夠分爲如如下三個類型。 html

  • 複雜的批量數據處理(batch data processing),一般的時間跨度在數十分鐘到數小時之間。
  • 基於歷史數據的交互式查詢(interactive query),一般的時間跨度在數十秒到數分鐘之間。
  • 基於實時數據流的數據處理(streaming data processing),一般的時間跨度在數百毫秒到數秒之間。 

目前已有不少相對成熟的開源軟件來處理以上三種情景,咱們能夠利用MapReduce來進行批量數據處理,能夠用Impala來進行交互式查詢,對於流式數據處理,咱們能夠採用Storm。對於大多數互聯網公司來講,通常都會同時遇到以上三種情景,那麼在使用的過程當中這些公司可能會遇到以下的不便。 node

  • 三種情景的輸入輸出數據沒法無縫共享,須要進行格式相互轉換。
  • 每個開源軟件都須要一個開發和維護團隊,提升了成本。
  • 在同一個集羣中對各個系統協調資源分配比較困難。 

BDAS就是以Spark爲基礎的一套軟件棧,利用基於內存的通用計算模型將以上三種情景一網打盡,同時支持Batch、Interactive、Streaming的處理,且兼容支持HDFS和S3等分佈式文件系統,能夠部署在YARN和Mesos等流行的集羣資源管理器之上。算法

一、Spark Streaming簡介

1.1 概述

Spark Streaming 是Spark核心API的一個擴展,能夠實現高吞吐量的、具有容錯機制的實時流數據的處理。支持從多種數據源獲取數據,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,從數據源獲取數據以後,可使用諸如map、reduce、join和window等高級函數進行復雜算法的處理。最後還能夠將處理結果存儲到文件系統,數據庫和現場儀表盤。在「One Stack rule them all」的基礎上,還可使用Spark的其餘子框架,如集羣學習、圖計算等,對流數據進行處理。shell

Spark Streaming處理的數據流圖:數據庫

 

Spark的各個子框架,都是基於核心Spark的,Spark Streaming在內部的處理機制是,接收實時流的數據,並根據必定的時間間隔拆分紅一批批的數據,而後經過Spark Engine處理這些批數據,最終獲得處理後的一批批結果數據。apache

對應的批數據,在Spark內核對應一個RDD實例,所以,對應流數據的DStream能夠當作是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分紅一批一批後,經過一個先進先出的隊列,而後 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,而後進行處理,這是一個典型的生產者消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。編程

1.2 術語定義

l離散流(discretized stream)或DStream:這是Spark Streaming對內部持續的實時數據流的抽象描述,即咱們處理的一個實時數據流,在Spark Streaming中對應於一個DStream 實例。緩存

l批數據(batch data):這是化整爲零的第一步,將實時流數據以時間片爲單位進行分批,將流處理轉化爲時間片數據的批處理。隨着持續時間的推移,這些處理結果就造成了對應的結果數據流了。服務器

l時間片或批處理時間間隔( batch interval):這是人爲地對流數據進行定量的標準,以時間片做爲咱們拆分流數據的依據。一個時間片的數據對應一個RDD實例。網絡

l窗口長度(window length):一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數,

l滑動時間間隔:前一個窗口到後一個窗口所通過的時間長度。必須是批處理時間間隔的倍數

lInput DStream :一個input DStream是一個特殊的DStream,將Spark Streaming鏈接到一個外部數據源來讀取數據。

1.3 Storm與Spark Streming比較

l處理模型以及延遲

雖然兩框架都提供了可擴展性(scalability)和可容錯性(fault tolerance),可是它們的處理模型從根本上說是不同的。Storm能夠實現亞秒級時延的處理,而每次只處理一條event,而Spark Streaming能夠在一個短暫的時間窗口裏面處理多條(batches)Event。因此說Storm能夠實現亞秒級時延的處理,而Spark Streaming則有必定的時延。

l容錯和數據保證

然而二者的代價都是容錯時候的數據保證,Spark Streaming的容錯爲有狀態的計算提供了更好的支持。在Storm中,每條記錄在系統的移動過程當中都須要被標記跟蹤,因此Storm只能保證每條記錄最少被處理一次,可是容許從錯誤狀態恢復時被處理屢次。這就意味着可變動的狀態可能被更新兩次從而致使結果不正確。

任一方面,Spark Streaming僅僅須要在批處理級別對記錄進行追蹤,因此他能保證每一個批處理記錄僅僅被處理一次,即便是node節點掛掉。雖說Storm的 Trident library能夠保證一條記錄被處理一次,可是它依賴於事務更新狀態,而這個過程是很慢的,而且須要由用戶去實現。

l實現和編程API

Storm主要是由Clojure語言實現,Spark Streaming是由Scala實現。若是你想看看這兩個框架是如何實現的或者你想自定義一些東西你就得記住這一點。Storm是由BackType和 Twitter開發,而Spark Streaming是在UC Berkeley開發的。

Storm提供了Java API,同時也支持其餘語言的API。 Spark Streaming支持Scala和Java語言(其實也支持Python)。

l批處理框架集成

Spark Streaming的一個很棒的特性就是它是在Spark框架上運行的。這樣你就能夠想使用其餘批處理代碼同樣來寫Spark Streaming程序,或者是在Spark中交互查詢。這就減小了單獨編寫流批量處理程序和歷史數據處理程序。

l生產支持

Storm已經出現好多年了,並且自從2011年開始就在Twitter內部生產環境中使用,還有其餘一些公司。而Spark Streaming是一個新的項目,而且在2013年僅僅被Sharethrough使用(據做者瞭解)。

Storm是 Hortonworks Hadoop數據平臺中流處理的解決方案,而Spark Streaming出如今 MapR的分佈式平臺和Cloudera的企業數據平臺中。除此以外,Databricks是爲Spark提供技術支持的公司,包括了Spark Streaming。

雖說二者均可以在各自的集羣框架中運行,可是Storm能夠在Mesos上運行, 而Spark Streaming能夠在YARN和Mesos上運行。

二、運行原理

2.1 Streaming架構

SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,能夠對多種數據源(如Kdfka、Flume、Twitter、Zero和TCP 套接字)進行相似Map、Reduce和Join等複雜操做,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。

l計算流程:Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。

 

圖Spark Streaming構架

l容錯性:對於流式計算來講,容錯性相當重要。首先咱們要明確一下Spark中RDD的容錯機制。每個RDD都是一個不可變的分佈式可重算的數據集,其記錄着肯定性的操做繼承關係(lineage),因此只要輸入數據是可容錯的,那麼任意一個RDD的分區(Partition)出錯或不可用,都是能夠利用原始輸入數據經過轉換操做而從新算出的。  

對於Spark Streaming來講,其RDD的傳承關係以下圖所示,圖中的每個橢圓形表示一個RDD,橢圓形中的每一個圓形表明一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每個Batch Size所產生的中間結果RDD。咱們能夠看到圖中的每個RDD都是經過lineage相鏈接的,因爲Spark Streaming輸入數據能夠來自於磁盤,例如HDFS(多份拷貝)或是來自於網絡的數據流(Spark Streaming會將網絡輸入數據的每個數據流拷貝兩份到其餘的機器)都能保證容錯性,因此RDD中任意的Partition出錯,均可以並行地在其餘機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

 

Spark Streaming中RDD的lineage關係圖

l實時性:對於實時性的討論,會牽涉到流式處理框架的應用場景。Spark Streaming將流式計算分解成多個Spark Job,對於每一段數據的處理都會通過Spark DAG圖分解以及Spark的任務集的調度過程。對於目前版本的Spark Streaming而言,其最小的Batch Size的選取在0.5~2秒鐘之間(Storm目前最小的延遲是100ms左右),因此Spark Streaming可以知足除對實時性要求很是高(如高頻實時交易)以外的全部流式準實時計算場景。

l擴展性與吞吐量:Spark目前在EC2上已可以線性擴展到100個節點(每一個節點4Core),能夠以數秒的延遲處理6GB/s的數據量(60M records/s),其吞吐量也比流行的Storm高2~5倍,圖4是Berkeley利用WordCount和Grep兩個用例所作的測試,在Grep這個測試中,Spark Streaming中的每一個節點的吞吐量是670k records/s,而Storm是115k records/s。

 

Spark Streaming與Storm吞吐量比較圖

2.2 編程模型

DStream(Discretized Stream)做爲Spark Streaming的基礎抽象,它表明持續性的數據流。這些數據流既能夠經過外部輸入源賴獲取,也能夠經過現有的Dstream的transformation操做來得到。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每一個RDD都包含了本身特定時間間隔內的數據流。如圖7-3所示。

 

 

       圖7-4

對DStream中數據的各類操做也是映射到內部的RDD上來進行的,如圖7-4所示,對Dtream的操做能夠經過RDD的transformation生成新的DStream。這裏的執行引擎是Spark。

2.2.1 如何使用Spark Streaming

做爲構建於Spark之上的應用框架,Spark Streaming承襲了Spark的編程風格,對於已經瞭解Spark的用戶來講可以快速地上手。接下來以Spark Streaming官方提供的WordCount代碼爲例來介紹Spark Streaming的使用方式。

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()              // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

1.建立StreamingContext對象 同Spark初始化須要建立SparkContext對象同樣,使用Spark Streaming就須要建立StreamingContext對象。建立StreamingContext對象所需的參數與SparkContext基本一致,包括指明Master,設定名稱(如NetworkWordCount)。須要注意的是參數Seconds(1),Spark Streaming須要指定處理數據的時間間隔,如上例所示的1s,那麼Spark Streaming會以1s爲時間窗口進行數據處理。此參數須要根據用戶的需求和集羣的處理能力進行適當的設置;

2.建立InputDStream如同Storm的Spout,Spark Streaming須要指明數據源。如上例所示的socketTextStream,Spark Streaming以socket鏈接做爲數據源讀取數據。固然Spark Streaming支持多種不一樣的數據源,包括Kafka、Flume、HDFS/S三、Kinesis和Twitter等數據源;

3.操做DStream對於從數據源獲得的DStream,用戶能夠在其基礎上進行各類操做,如上例所示的操做就是一個典型的WordCount執行流程:對於當前時間窗口內從數據源獲得的數據首先進行分割,而後利用Map和ReduceByKey方法進行計算,固然最後還有使用print()方法輸出結果;

4.啓動Spark Streaming以前所做的全部步驟只是建立了執行流程,程序沒有真正鏈接上數據源,也沒有對數據進行任何操做,只是設定好了全部的執行計劃,當ssc.start()啓動後程序才真正進行全部預期的操做。

至此對於Spark Streaming的如何使用有了一個大概的印象,在後面的章節咱們會經過源代碼深刻探究一下Spark Streaming的執行流程。

2.2.2 DStream的輸入源

在Spark Streaming中全部的操做都是基於流的,而輸入源是這一系列操做的起點。輸入 DStreams 和 DStreams 接收的流都表明輸入數據流的來源,在Spark Streaming 提供兩種內置數據流來源:

l  基礎來源 在 StreamingContext API 中直接可用的來源。例如:文件系統、Socket(套接字)鏈接和 Akka actors;

l  高級來源 如 Kafka、Flume、Kinesis、Twitter 等,能夠經過額外的實用工具類建立。

2.2.2.1 基礎來源

在前面分析怎樣使用Spark Streaming的例子中咱們已看到ssc.socketTextStream()方法,能夠經過 TCP 套接字鏈接,從從文本數據中建立了一個 DStream。除了套接字,StreamingContext 的API還提供了方法從文件和 Akka actors 中建立DStreams做爲輸入源。

Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法能夠從任何文件系統(如:HDFS、S三、NFS等)的文件中讀取數據,而後建立一個DStream。Spark Streaming 監控 dataDirectory 目錄和在該目錄下任何文件被建立處理(不支持在嵌套目錄下寫文件)。須要注意的是:讀取的必須是具備相同的數據格式的文件;建立的文件必須在 dataDirectory目錄下,並經過自動移動或重命名成數據目錄;文件一旦移動就不能被改變,若是文件被不斷追加,新的數據將不會被閱讀。對於簡單的文本文,可使用一個簡單的方法streamingContext.textFileStream(dataDirectory)來讀取數據。

Spark Streaming也能夠基於自定義 Actors 的流建立DStream ,經過 Akka actors 接受數據流,使用方法streamingContext.actorStream(actorProps, actor-name)。Spark Streaming使用streamingContext.queueStream(queueOfRDDs)方法能夠建立基於 RDD 隊列的DStream,每一個RDD 隊列將被視爲DStream 中一塊數據流進行加工處理。

2.2.2.2 高級來源

這一類的來源須要外部 non-Spark 庫的接口,其中一些有複雜的依賴關係(如 Kafka、Flume)。所以經過這些來源建立DStreams 須要明確其依賴。例如,若是想建立一個使用 Twitter tweets 的數據的DStream 流,必須按如下步驟來作:

1)在 SBT 或 Maven工程裏添加 spark-streaming-twitter_2.10 依賴。

2)開發:導入 TwitterUtils 包,經過 TwitterUtils.createStream 方法建立一個DStream。

3)部署:添加全部依賴的 jar 包(包括依賴的spark-streaming-twitter_2.10 及其依賴),而後部署應用程序。

須要注意的是,這些高級的來源通常在Spark Shell中不可用,所以基於這些高級來源的應用不能在Spark Shell中進行測試。若是你必須在Spark shell中使用它們,你須要下載相應的Maven工程的Jar依賴並添加到類路徑中。

其中一些高級來源以下:

lTwitter Spark Streaming的TwitterUtils工具類使用Twitter4j,Twitter4J 庫支持經過任何方法提供身份驗證信息,你能夠獲得公衆的流,或獲得基於關鍵詞過濾流。

lFlume Spark Streaming能夠從Flume中接受數據。

lKafka Spark Streaming能夠從Kafka中接受數據。

lKinesis Spark Streaming能夠從Kinesis中接受數據。

須要重申的一點是在開始編寫本身的 SparkStreaming 程序以前,必定要將高級來源依賴的Jar添加到SBT 或 Maven 項目相應的artifact中。常見的輸入源和其對應的Jar包以下圖所示。

另外,輸入DStream也能夠建立自定義的數據源,須要作的就是實現一個用戶定義的接收器。

2.2.3 DStream的操做

與RDD相似,DStream也提供了本身的一系列操做方法,這些操做能夠分紅四類:

  • Transformations 普通的轉換操做
  • Window Operations 窗口轉換操做
  • Join Operations 合併操做
  • Output Operations 輸出操做

2.2.3.1 普通的轉換操做

普通的轉換操做以下表所示:

轉換

描述

map(func)

源 DStream的每一個元素經過函數func返回一個新的DStream。

flatMap(func)

相似與map操做,不一樣的是每一個輸入元素能夠被映射出0或者更多的輸出元素。

filter(func)

在源DSTREAM上選擇Func函數返回僅爲true的元素,最終返回一個新的DSTREAM 。

repartition(numPartitions)

經過輸入的參數numPartitions的值來改變DStream的分區大小。

union(otherStream)

返回一個包含源DStream與其餘 DStream的元素合併後的新DSTREAM。

count()

對源DStream內部的所含有的RDD的元素數量進行計數,返回一個內部的RDD只包含一個元素的DStreaam。

reduce(func)

使用函數func(有兩個參數並返回一個結果)將源DStream 中每一個RDD的元素進行聚 合操做,返回一個內部所包含的RDD只有一個元素的新DStream。

countByValue()

計算DStream中每一個RDD內的元素出現的頻次並返回新的DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素出現的頻次。

reduceByKey(func, [numTasks])

當一個類型爲(K,V)鍵值對的DStream被調用的時候,返回類型爲類型爲(K,V)鍵值對的新 DStream,其中每一個鍵的值V都是使用聚合函數func彙總。注意:默認狀況下,使用 Spark的默認並行度提交任務(本地模式下並行度爲2,集羣模式下位8),能夠經過配置numTasks設置不一樣的並行任務數。

join(otherStream, [numTasks])

當被調用類型分別爲(K,V)和(K,W)鍵值對的2個DStream 時,返回類型爲(K,(V,W))鍵值對的一個新DSTREAM。

cogroup(otherStream, [numTasks])

當被調用的兩個DStream分別含有(K, V) 和(K, W)鍵值對時,返回一個(K, Seq[V], Seq[W])類型的新的DStream。

transform(func)

經過對源DStream的每RDD應用RDD-to-RDD函數返回一個新的DStream,這能夠用來在DStream作任意RDD操做。

updateStateByKey(func)

返回一個新狀態的DStream,其中每一個鍵的狀態是根據鍵的前一個狀態和鍵的新值應用給定函數func後的更新。這個方法能夠被用來維持每一個鍵的任何狀態數據。

在上面列出的這些操做中,transform()方法和updateStateByKey()方法值得咱們深刻的探討一下:

l  transform(func)操做

該transform操做(轉換操做)連同其其相似的 transformWith操做容許DStream 上應用任意RDD-to-RDD函數。它能夠被應用於未在 DStream API 中暴露任何的RDD操做。例如,在每批次的數據流與另外一數據集的鏈接功能不直接暴露在DStream API 中,但能夠輕鬆地使用transform操做來作到這一點,這使得DStream的功能很是強大。例如,你能夠經過鏈接預先計算的垃圾郵件信息的輸入數據流(可能也有Spark生成的),而後基於此作實時數據清理的篩選,以下面官方提供的僞代碼所示。事實上,也能夠在transform方法中使用機器學習和圖形計算的算法。

l  updateStateByKey操做

該 updateStateByKey 操做可讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :

(1)  定義狀態 - 狀態能夠是任意的數據類型。

(2)  定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。

讓咱們用一個例子來講明,假設你要進行文本數據流中單詞計數。在這裏,正在運行的計數是狀態並且它是一個整數。咱們定義了更新功能以下:

 

此函數應用於含有鍵值對的DStream中(如前面的示例中,在DStream中含有(word,1)鍵值對)。它會針對裏面的每一個元素(如wordCount中的word)調用一下更新函數,newValues是最新的值,runningCount是以前的值。

 

詳細見:Spark Streaming之六:Transformations 普通的轉換操做

2.2.3.2 窗口轉換操做

Spark Streaming 還提供了窗口的計算,它容許你經過滑動窗口對數據進行轉換,窗口轉換操做以下:

轉換

描述

window(windowLengthslideInterval)

返回一個基於源DStream的窗口批次計算後獲得新的DStream。

countByWindow(windowLength,slideInterval)

返回基於滑動窗口的DStream中的元素的數量。

reduceByWindow(funcwindowLength,slideInterval)

基於滑動窗口對源DStream中的元素進行聚合操做,獲得一個新的DStream。

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])

基於滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操做,獲得一個新的DStream。

reduceByKeyAndWindow(funcinvFunc,windowLength,slideInterval, [numTasks])

一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合並移去最先的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那麼咱們能夠將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法能夠複用中間三秒的統計量,提升統計的效率。

countByValueAndWindow(windowLength,slideInterval, [numTasks])

基於滑動窗口計算源DStream中每一個RDD內每一個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue同樣,reduce任務的數量能夠經過一個可選參數進行配置。

 

 

批處理間隔示意圖

在Spark Streaming中,數據處理是按批進行的,而數據採集是逐條進行的,所以在Spark Streaming中會先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把採集到的數據彙總起來成爲一批數據交給系統去處理。

對於窗口操做而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續時間,在窗口操做中,只有窗口的長度知足了纔會觸發批數據的處理。除了窗口的長度,窗口操做還有另外一個重要的參數就是滑動間隔(slide duration),它指的是通過多長時間窗口滑動一次造成新的窗口,滑動窗口默認狀況下和批次間隔的相同,而窗口間隔通常設置的要比它們兩個大。在這裏必須注意的一點是滑動間隔和窗口間隔的大小必定得設置爲批處理間隔的整數倍。

如批處理間隔示意圖所示,批處理間隔是1個時間單位,窗口間隔是3個時間單位,滑動間隔是2個時間單位。對於初始的窗口time 1-time 3,只有窗口間隔知足了才觸發數據的處理。這裏須要注意的一點是,初始的窗口有可能流入的數據沒有撐滿,可是隨着時間的推動,窗口最終會被撐滿。當每一個2個時間單位,窗口滑動一次後,會有新的數據流入窗口,這時窗口會移去最先的兩個時間單位的數據,而與最新的兩個時間單位的數據進行彙總造成新的窗口(time3-time5)。

對於窗口操做,批處理間隔、窗口間隔和滑動間隔是很是重要的三個時間概念,是理解窗口操做的關鍵所在。

Spark Streaming之五:Window窗體相關操做

2.2.3.3 Join Operations

Join主要可分爲兩種,

一、DStream對象之間的Join
  這種join通常應用於窗口函數造成的DStream對象之間,具體能夠參考第一部分中的join操做,除了簡單的join以外,還有leftOuterJoin, rightOuterJoin和fullOuterJoin。

二、DStream和dataset之間的join
  這一種join,能夠參考前面transform操做中的示例。

2.2.3.4 輸出操做

Spark Streaming容許DStream的數據被輸出到外部系統,如數據庫或文件系統。因爲輸出操做實際上使transformation操做後的數據能夠經過外部系統被使用,同時輸出操做觸發全部DStream的transformation操做的實際執行(相似於RDD操做)。如下表列出了目前主要的輸出操做:

轉換

描述

print()

在Driver中打印出DStream中數據的前10個元素。

saveAsTextFiles(prefix, [suffix])

將DStream中的內容以文本的形式保存爲文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsObjectFiles(prefix, [suffix])

將DStream中的內容按對象序列化而且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsHadoopFiles(prefix, [suffix])

將DStream中的內容以文本的形式保存爲Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

foreachRDD(func)

最基本的輸出操做,將func函數應用於DStream中的RDD上,這個操做會輸出數據到外部系統,好比保存RDD到文件或者網絡數據庫等。須要注意的是func函數是在運行該streaming應用的Driver進程裏執行的。

一、print()
  print操做會將DStream每個batch中的前10個元素在driver節點打印出來。
  看下面這個示例,一行輸入超過10個單詞,而後將這行語句分割成單個單詞的DStream。
val words = lines.flatMap(_.split(" "))
words.print()

 
二、saveAsTextFiles(prefix, [suffix])
  這個操做能夠將DStream中的內容保存爲text文件,每一個batch的數據單獨保存爲一個文夾,文件夾名前綴參數必須傳入,文件夾名後綴參數可選,最終文件夾名稱的完整形式爲 prefix-TIME_IN_MS[.suffix]

  好比下面這一行代碼
lines.saveAsTextFiles("satf", ".txt")

看一下執行結果,在當前項目路徑下,每秒鐘生成一個文件夾,打開的兩個窗口中的內容分別是nc窗口中的輸入。

另外,若是前綴中包含文件完整路徑,則該text文件夾會建在指定路徑下,以下圖所示

三、saveAsObjectFiles(prefix, [suffix])
  這個操做和前面一個相似,只不過這裏將DStream中的內容保存爲SequenceFile文件類型,這個文件中保存的數據都是通過序列化後的Java對象。 
  實驗略過,可參考前面一個操做。 
  
四、saveAsHadoopFiles(prefix, [suffix])
  這個操做和前兩個相似,將DStream每一batch中的內容保存到HDFS上,一樣能夠指定文件的前綴和後綴。 
  
五、foreachRDD(func)

dstream.foreachRDD是一個很是強大的輸出操做,它允將許數據輸出到外部系統。可是 ,如何正確高效地使用這個操做是很重要的,下面展現瞭如何去避免一些常見的錯誤。

一般將數據寫入到外部系統須要建立一個鏈接對象(如 TCP鏈接到遠程服務器),並用它來發送數據到遠程系統。出於這個目的,開發者可能在不經意間在Spark driver端建立了鏈接對象,並嘗試使用它保存RDD中的記錄到Spark worker上,以下面代碼:

 

這是不正確的,這須要鏈接對象進行序列化並從Driver端發送到Worker上。鏈接對象不多在不一樣機器間進行這種操做,此錯誤可能表現爲序列化錯誤(鏈接對不可序列化),初始化錯誤(鏈接對象在須要在Worker 上進行須要初始化) 等等,正確的解決辦法是在 worker上建立的鏈接對象。

一般狀況下,建立一個鏈接對象有時間和資源開銷。所以,建立和銷燬的每條記錄的鏈接對象可能招致沒必要要的資源開銷,並顯著下降系統總體的吞吐量 。一個更好的解決方案是使用rdd.foreachPartition方法建立一個單獨的鏈接對象,而後使用該鏈接對象輸出的全部RDD分區中的數據到外部系統。

 這緩解了建立多條記錄鏈接的開銷。最後,還能夠進一步經過在多個RDDs/ batches上重用鏈接對象進行優化。一個保持鏈接對象的靜態池能夠重用在多個批處理的RDD上將其輸出到外部系統,從而進一步下降了開銷。

 須要注意的是,在靜態池中的鏈接應該按需延遲建立,這樣能夠更有效地把數據發送到外部系統。另外須要要注意的是:DStreams延遲執行的,就像RDD的操做是由actions觸發同樣。默認狀況下,輸出操做會按照它們在Streaming應用程序中定義的順序一個個執行。

 

2.3  容錯、持久化和性能調優

2.3.1 容錯

DStream基於RDD組成,RDD的容錯性依舊有效,咱們首先回憶一下SparkRDD的基本特性。

lRDD是一個不可變的、肯定性的可重複計算的分佈式數據集。RDD的某些partition丟失了,能夠經過血統(lineage)信息從新計算恢復;

l若是RDD任何分區因worker節點故障而丟失,那麼這個分區能夠從原來依賴的容錯數據集中恢復;

l因爲Spark中全部的數據的轉換操做都是基於RDD的,即便集羣出現故障,只要輸入數據集存在,全部的中間結果都是能夠被計算的。

Spark Streaming是能夠從HDFS和S3這樣的文件系統讀取數據的,這種狀況下全部的數據均可以被從新計算,不用擔憂數據的丟失。可是在大多數狀況下,Spark Streaming是基於網絡來接受數據的,此時爲了實現相同的容錯處理,在接受網絡的數據時會在集羣的多個Worker節點間進行數據的複製(默認的複製數是2),這致使產生在出現故障時被處理的兩種類型的數據:

1)Data received and replicated :一旦一個Worker節點失效,系統會從另外一份還存在的數據中從新計算。

2)Data received but buffered for replication :一旦數據丟失,能夠經過RDD之間的依賴關係,從HDFS這樣的外部文件系統讀取數據。

此外,有兩種故障,咱們應該關心:

(1)Worker節點失效:經過上面的講解咱們知道,這時系統會根據出現故障的數據的類型,選擇是從另外一個有複製過數據的工做節點上從新計算,仍是直接從從外部文件系統讀取數據。

(2)Driver(驅動節點)失效 :若是運行 Spark Streaming應用時驅動節點出現故障,那麼很明顯的StreamingContext已經丟失,同時在內存中的數據所有丟失。對於這種狀況,Spark Streaming應用程序在計算上有一個內在的結構——在每段micro-batch數據週期性地執行一樣的Spark計算。這種結構容許把應用的狀態(亦稱checkpoint)週期性地保存到可靠的存儲空間中,並在driver從新啓動時恢復該狀態。具體作法是在ssc.checkpoint(<checkpoint directory>)函數中進行設置,Spark Streaming就會按期把DStream的元信息寫入到HDFS中,一旦驅動節點失效,丟失的StreamingContext會經過已經保存的檢查點信息進行恢復。

最後咱們談一下Spark Stream的容錯在Spark 1.2版本的一些改進:

實時流處理系統必需要能在24/7時間內工做,所以它須要具有從各類系統故障中恢復過來的能力。最開始,SparkStreaming就支持從driver和worker故障恢復的能力。然而有些數據源的輸入可能在故障恢復之後丟失數據。在Spark1.2版本中,Spark已經在SparkStreaming中對預寫日誌(也被稱爲journaling)做了初步支持,改進了恢復機制,並使更多數據源的零數據丟失有了可靠。

對於文件這樣的源數據,driver恢復機制足以作到零數據丟失,由於全部的數據都保存在了像HDFS或S3這樣的容錯文件系統中了。但對於像Kafka和Flume等其它數據源,有些接收到的數據還只緩存在內存中,還沒有被處理,它們就有可能會丟失。這是因爲Spark應用的分佈操做方式引發的。當driver進程失敗時,全部在standalone/yarn/mesos集羣運行的executor,連同它們在內存中的全部數據,也同時被終止。對於Spark Streaming來講,從諸如Kafka和Flume的數據源接收到的全部數據,在它們處理完成以前,一直都緩存在executor的內存中。縱然driver從新啓動,這些緩存的數據也不能被恢復。爲了不這種數據損失,在Spark1.2發佈版本中引進了預寫日誌(WriteAheadLogs)功能。

預寫日誌功能的流程是:1)一個SparkStreaming應用開始時(也就是driver開始時),相關的StreamingContext使用SparkContext啓動接收器成爲長駐運行任務。這些接收器接收並保存流數據到Spark內存中以供處理。2)接收器通知driver。3)接收塊中的元數據(metadata)被髮送到driver的StreamingContext。這個元數據包括:(a)定位其在executor內存中數據的塊referenceid,(b)塊數據在日誌中的偏移信息(若是啓用了)。

用戶傳送數據的生命週期以下圖所示。

 

相似Kafka這樣的系統能夠經過複製數據保持可靠性。容許預寫日誌兩次高效地複製一樣的數據:一次由Kafka,而另外一次由SparkStreaming。Spark將來版本將包含Kafka容錯機制的原生支持,從而避免第二個日誌。

2.3.2 持久化

與RDD同樣,DStream一樣也能經過persist()方法將數據流存放在內存中,默認的持久化方式是MEMORY_ONLY_SER,也就是在內存中存放數據同時序列化的方式,這樣作的好處是遇到須要屢次迭代計算的程序時,速度優點十分的明顯。而對於一些基於窗口的操做,如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操做,如updateStateBykey,其默認的持久化策略就是保存在內存中。

對於來自網絡的數據源(Kafka、Flume、sockets等),默認的持久化策略是將數據保存在兩臺機器上,這也是爲了容錯性而設計的。

另外,對於窗口和有狀態的操做必須checkpoint,經過StreamingContext的checkpoint來指定目錄,經過 Dtream的checkpoint指定間隔時間,間隔必須是滑動間隔(slide interval)的倍數。

2.3.3 性能調優

1.  優化運行時間

增長並行度 確保使用整個集羣的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操做,增長其並行度以確保更爲充分地使用集羣資源;

減小數據序列化,反序列化的負擔 Spark Streaming默認將接受到的數據序列化後存儲,以減小內存的使用。可是序列化和反序列話須要更多的CPU時間,所以更加高效的序列化方式(Kryo)和自定義的系列化接口能夠更高效地使用CPU;

設置合理的batch duration(批處理時間間) 在Spark Streaming中,Job之間有可能存在依賴關係,後面的Job必須確保前面的做業執行結束後才能提交。若前面的Job執行的時間超出了批處理時間間隔,那麼後面的Job就沒法按時提交,這樣就會進一步拖延接下來的Job,形成後續Job的阻塞。所以設置一個合理的批處理間隔以確保做業可以在這個批處理間隔內結束時必須的;

l  減小因任務提交和分發所帶來的負擔 一般狀況下,Akka框架可以高效地確保任務及時分發,可是當批處理間隔很是小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone和Coarse-grained Mesos模式一般會比使用Fine-grained Mesos模式有更小的延遲。

2.  優化內存使用

l控制batch size(批處理間隔內的數據量) Spark Streaming會把批處理間隔內接收到的全部數據存放在Spark內部的可用內存區域中,所以必須確保當前節點Spark的可用內存中少能容納這個批處理時間間隔內的全部數據,不然必須增長新的資源以提升集羣的處理能力;

l及時清理再也不使用的數據 前面講到Spark Streaming會將接受的數據所有存儲到內部可用內存區域中,所以對於處理過的再也不須要的數據應及時清理,以確保Spark Streaming有富餘的可用內存空間。經過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據,這個參數須要當心設置以避免後續操做中所須要的數據被超時錯誤處理;

l觀察及適當調整GC策略 GC會影響Job的正常運行,可能延長Job的執行時間,引發一系列不可預料的問題。觀察GC的運行狀況,採用不一樣的GC策略以進一步減少內存回收對Job運行的影響。

 

參考資料:

(1)《Spark Streaming》 http://blog.debugo.com/spark-streaming/

轉自:http://www.cnblogs.com/shishanyuan/p/4747749.html

相關文章
相關標籤/搜索