Spark 學習 (十二) Spark Streaming詳解

一,簡介node

  1.1 概述算法

  1.2 術語定義shell

  1.3 Storm和Spark Streaming比較數據庫

二,運行原理apache

  2.1 Streaming架構編程

  2.2 容錯,持久化和性能調優服務器

    2.2.1 容錯網絡

    2.2.2 持久化架構

    2.2.3 性能調優框架

三,編程模型

  3.1 如何使用Spark Streaming

  3.2 DStream的輸入源

    3.2.1 基礎來源

    3.2.2 高級來源

  3.3 DStream的操做

    3.3.1 普通的轉換操做

    3.3.2 窗口轉換操做

    3.3.3 輸出操做

 

 

 

 

  

正文

一,簡介

  1.1 概述

  是一個基於Spark Core之上的實時計算框架,能夠從不少數據源消費數據並對數據進行處理.Spark Streaming 是Spark核心API的一個擴展,能夠實現高吞吐量的、具有容錯機制的實時流數據的處理。支持從多種數據源獲取數據,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis 以及TCP sockets,從數據源獲取數據以後,可使用諸如map、reduce、join和window等高級函數進行復雜算法的處理。最後還能夠將處理結果存儲到文件系統,數據庫和現場儀表盤。在「One Stack rule them all」的基礎上,還可使用Spark的其餘子框架,如集羣學習、圖計算等,對流數據進行處理。

  Spark Streaming處理的數據流圖:

  

  Spark的各個子框架,都是基於核心Spark的,Spark Streaming在內部的處理機制是,接收實時流的數據,並根據必定的時間間隔拆分紅一批批的數據,而後經過Spark Engine處理這些批數據,最終獲得處理後的一批批結果數據。

  對應的批數據,在Spark內核對應一個RDD實例,所以,對應流數據的DStream能夠當作是一組RDDs,即RDD的一個序列。通俗點理解的話,在流數據分紅一批一批後,經過一個先進先出的隊列,而後 Spark Engine從該隊列中依次取出一個個批數據,把批數據封裝成一個RDD,而後進行處理,這是一個典型的生產者消費者模型,對應的就有生產者消費者模型的問題,即如何協調生產速率和消費速率。

  1.2 術語定義

  離散流(discretized stream)或DStream

  本質上就是一系列連續的RDD,DStream其實就是對RDD的封裝DStream能夠任務是一個RDD的工廠,該DStream裏面生產都是相同業務邏輯的RDD,只不過是RDD裏面要讀取數據的不相同

  他是sparkStreaming中的一個最基本的抽象,表明了一下列連續的數據流,本質上是一系列連續的RDD,你對DStream進行操做,就是對RDD進行操做

  DStream每隔一段時間生成一個RDD,你對DStream進行操做,本質上是對裏面的對應時間的RDD進行操做

  DSteam和DStream之間存在依賴關係,在一個固定的時間點,對個存在依賴關係的DSrteam對應的RDD也存在依賴關係,
每一個一個固定的時間,其實生產了一個小的DAG,週期性的將生成的小DAG提交到集羣中運行

  DStream圖例:

  

 

  批數據(batch data):這是化整爲零的第一步,將實時流數據以時間片爲單位進行分批,將流處理轉化爲時間片數據的批處理。隨着持續時間的推移,這些處理結果就造成了對應的結果數據流了。

  時間片或批處理時間間隔( batch interval):這是人爲地對流數據進行定量的標準,以時間片做爲咱們拆分流數據的依據。一個時間片的數據對應一個RDD實例。

  窗口長度(window length):一個窗口覆蓋的流數據的時間長度。必須是批處理時間間隔的倍數,

  滑動時間間隔:前一個窗口到後一個窗口所通過的時間長度。必須是批處理時間間隔的倍數

  Input DStream :一個input DStream是一個特殊的DStream,將Spark Streaming鏈接到一個外部數據源來讀取數據。

  1.3 Storm和Spark Streaming比較

  處理模型以及延遲

  雖然兩框架都提供了可擴展性(scalability)和可容錯性(fault tolerance),可是它們的處理模型從根本上說是不同的。Storm能夠實現亞秒級時延的處理,而每次只處理一條event,而Spark Streaming能夠在一個短暫的時間窗口裏面處理多條(batches)Event。因此說Storm能夠實現亞秒級時延的處理,而Spark Streaming則有必定的時延。

  容錯和數據保證

  然而二者的代價都是容錯時候的數據保證,Spark Streaming的容錯爲有狀態的計算提供了更好的支持。在Storm中,每條記錄在系統的移動過程當中都須要被標記跟蹤,因此Storm只能保證每條記錄最少被處理一次,可是容許從錯誤狀態恢復時被處理屢次。這就意味着可變動的狀態可能被更新兩次從而致使結果不正確。

  任一方面,Spark Streaming僅僅須要在批處理級別對記錄進行追蹤,因此他能保證每一個批處理記錄僅僅被處理一次,即便是node節點掛掉。雖說Storm的 Trident library能夠保證一條記錄被處理一次,可是它依賴於事務更新狀態,而這個過程是很慢的,而且須要由用戶去實現。

  實現和編程API

  Storm主要是由Clojure語言實現,Spark Streaming是由Scala實現。若是你想看看這兩個框架是如何實現的或者你想自定義一些東西你就得記住這一點。Storm是由BackType和Twitter開發,而Spark Streaming是在UC Berkeley開發的。

  Storm提供了Java API,同時也支持其餘語言的API。 Spark Streaming支持Scala和Java語言(其實也支持Python)。

  批處理框架集成

  Spark Streaming的一個很棒的特性就是它是在Spark框架上運行的。這樣你就能夠想使用其餘批處理代碼同樣來寫Spark Streaming程序,或者是在Spark中交互查詢。這就減小了單獨編寫流批量處理程序和歷史數據處理程序。

  生產支持

  Storm已經出現好多年了,並且自從2011年開始就在Twitter內部生產環境中使用,還有其餘一些公司。而Spark Streaming是一個新的項目,而且在2013年僅僅被Sharethrough使用(據做者瞭解)。

  Storm是 Hortonworks Hadoop數據平臺中流處理的解決方案,而Spark Streaming出如今 MapR的分佈式平臺和Cloudera的企業數據平臺中。除此以外,Databricks是爲Spark提供技術支持的公司,包括了Spark Streaming。

  雖說二者均可以在各自的集羣框架中運行,可是Storm能夠在Mesos上運行, 而Spark Streaming能夠在YARN和Mesos上運行。

  

二,運行原理

  2.1 Streaming架構

  SparkStreaming是一個對實時數據流進行高通量、容錯處理的流式處理系統,能夠對多種數據源(如Kafka、Flume、Twitter、Zero和TCP 套接字)進行相似Map、Reduce和Join等複雜操做,並將結果保存到外部文件系統、數據庫或應用到實時儀表盤。

  計算流程Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch size(如1秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加或者存儲到外部設備。下圖顯示了Spark Streaming的整個流程。

  

  2.2 容錯,持久化和性能調優

    2.2.1 容錯

    對於流式計算來講,容錯性相當重要。首先咱們要明確一下Spark中RDD的容錯機制。每個RDD都是一個不可變的分佈式可重算的數據集,其記錄着肯定性的操做繼承關係(lineage),因此只要輸入數據是可容錯的,那麼任意一個RDD的分區(Partition)出錯或不可用,都是能夠利用原始輸入數據經過轉換操做而從新算出的。  

    對於Spark Streaming來講,其RDD的傳承關係以下圖所示,圖中的每個橢圓形表示一個RDD,橢圓形中的每一個圓形表明一個RDD中的一個Partition,圖中的每一列的多個RDD表示一個DStream(圖中有三個DStream),而每一行最後一個RDD則表示每個Batch Size所產生的中間結果RDD。咱們能夠看到圖中的每個RDD都是經過lineage相鏈接的,因爲Spark Streaming輸入數據能夠來自於磁盤,例如HDFS(多份拷貝)或是來自於網絡的數據流(Spark Streaming會將網絡輸入數據的每個數據流拷貝兩份到其餘的機器)都能保證容錯性,因此RDD中任意的Partition出錯,均可以並行地在其餘機器上將缺失的Partition計算出來。這個容錯恢復方式比連續計算模型(如Storm)的效率更高。

    以下圖:Spark Streaming中RDD的lineage關係圖

    

    2.2.2 持久化

  與RDD同樣,DStream一樣也能經過persist()方法將數據流存放在內存中,默認的持久化方式是MEMORY_ONLY_SER,也就是在內存中存放數據同時序列化的方式,這樣作的好處是遇到須要屢次迭代計算的程序時,速度優點十分的明顯。而對於一些基於窗口的操做,如reduceByWindow、reduceByKeyAndWindow,以及基於狀態的操做,如updateStateBykey,其默認的持久化策略就是保存在內存中。

  對於來自網絡的數據源(Kafka、Flume、sockets等),默認的持久化策略是將數據保存在兩臺機器上,這也是爲了容錯性而設計的。

另外,對於窗口和有狀態的操做必須checkpoint,經過StreamingContext的checkpoint來指定目錄,經過 Dtream的checkpoint指定間隔時間,間隔必須是滑動間隔(slide interval)的倍數。

    2.2.3 性能調優

  1.  優化運行時間

    增長並行度 確保使用整個集羣的資源,而不是把任務集中在幾個特定的節點上。對於包含shuffle的操做,增長其並行度以確保更爲充分地使用集羣資源;

    減小數據序列化,反序列化的負擔 Spark Streaming默認將接受到的數據序列化後存儲,以減小內存的使用。可是序列化和反序列話須要更多的CPU時間,所以更加高效的序列化方式(Kryo)和自定義的系列化接口能夠更高效地使用CPU;

    設置合理的batch duration(批處理時間間) 在Spark Streaming中,Job之間有可能存在依賴關係,後面的Job必須確保前面的做業執行結束後才能提交。若前面的Job執行的時間超出了批處理時間間隔,那麼後面的Job就沒法按時提交,這樣就會進一步拖延接下來的Job,形成後續Job的阻塞。所以設置一個合理的批處理間隔以確保做業可以在這個批處理間隔內結束時必須的;

    減小因任務提交和分發所帶來的負擔 一般狀況下,Akka框架可以高效地確保任務及時分發,可是當批處理間隔很是小(500ms)時,提交和分發任務的延遲就變得不可接受了。使用Standalone和Coarse-grained Mesos模式一般會比使用Fine-grained Mesos模式有更小的延遲。

  2.  優化內存使用

    控制batch size(批處理間隔內的數據量) Spark Streaming會把批處理間隔內接收到的全部數據存放在Spark內部的可用內存區域中,所以必須確保當前節點Spark的可用內存中少能容納這個批處理時間間隔內的全部數據,不然必須增長新的資源以提升集羣的處理能力;

    及時清理再也不使用的數據 前面講到Spark Streaming會將接受的數據所有存儲到內部可用內存區域中,所以對於處理過的再也不須要的數據應及時清理,以確保Spark Streaming有富餘的可用內存空間。經過設置合理的spark.cleaner.ttl時長來及時清理超時的無用數據,這個參數須要當心設置以避免後續操做中所須要的數據被超時錯誤處理;

    觀察及適當調整GC策略 GC會影響Job的正常運行,可能延長Job的執行時間,引發一系列不可預料的問題。觀察GC的運行狀況,採用不一樣的GC策略以進一步減少內存回收對Job運行的影響。

三,編程模型

  3.1 如何使用Spark Streaming

  DStream(Discretized Stream)做爲Spark Streaming的基礎抽象,它表明持續性的數據流。這些數據流既能夠經過外部輸入源賴獲取,也能夠經過現有的Dstream的transformation操做來得到。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每一個RDD都包含了本身特定時間間隔內的數據流。如圖7-3所示。

  

   DStream中在時間軸下生成離散的RDD序列:

  

  對DStream中數據的各類操做也是映射到內部的RDD上來進行的,如圖7-4所示,對Dtream的操做能夠經過RDD的transformation生成新的DStream。這裏的執行引擎是Spark。

  做爲構建於Spark之上的應用框架,Spark Streaming承襲了Spark的編程風格,對於已經瞭解Spark的用戶來講可以快速地上手。接下來以Spark Streaming官方提供的WordCount代碼爲例來介紹Spark Streaming的使用方式。

  

import org.apache.spark._

import org.apache.spark.streaming._

import org.apache.spark.streaming.StreamingContext._


// Create a local StreamingContext with two working thread and batch interval of 1 second.

// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")

val ssc = new StreamingContext(conf, Seconds(1))


// Create a DStream that will connect to hostname:port, like localhost:9999

val lines = ssc.socketTextStream("localhost", 9999)

// Split each line into words

val words = lines.flatMap(_.split(" "))

import org.apache.spark.streaming.StreamingContext._

// Count each word in each batch

val pairs = words.map(word => (word, 1))

val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console

wordCounts.print()

ssc.start()              // Start the computation

ssc.awaitTermination()  // Wait for the computation to terminate

  1.建立StreamingContext對象 同Spark初始化須要建立SparkContext對象同樣,使用Spark Streaming就須要建立StreamingContext對象。建立StreamingContext對象所需的參數與SparkContext基本一致,包括指明Master,設定名稱(如NetworkWordCount)。須要注意的是參數Seconds(1),Spark Streaming須要指定處理數據的時間間隔,如上例所示的1s,那麼Spark Streaming會以1s爲時間窗口進行數據處理。此參數須要根據用戶的需求和集羣的處理能力進行適當的設置;

  2.建立InputDStream如同Storm的Spout,Spark Streaming須要指明數據源。如上例所示的socketTextStream,Spark Streaming以socket鏈接做爲數據源讀取數據。固然Spark Streaming支持多種不一樣的數據源,包括Kafka、 Flume、HDFS/S三、Kinesis和Twitter等數據源;

  3.操做DStream對於從數據源獲得的DStream,用戶能夠在其基礎上進行各類操做,如上例所示的操做就是一個典型的WordCount執行流程:對於當前時間窗口內從數據源獲得的數據首先進行分割,而後利用Map和ReduceByKey方法進行計算,固然最後還有使用print()方法輸出結果;

  4.啓動Spark Streaming以前所做的全部步驟只是建立了執行流程,程序沒有真正鏈接上數據源,也沒有對數據進行任何操做,只是設定好了全部的執行計劃,當ssc.start()啓動後程序才真正進行全部預期的操做。

  至此對於Spark Streaming的如何使用有了一個大概的印象,在後面的章節咱們會經過源代碼深刻探究一下Spark Streaming的執行流程。

  3.2 DStream的輸入源

   在Spark Streaming中全部的操做都是基於流的,而輸入源是這一系列操做的起點。輸入 DStreams 和 DStreams 接收的流都表明輸入數據流的來源,在Spark Streaming 提供兩種內置數據流來源:

  基礎來源 在 StreamingContext API 中直接可用的來源。例如:文件系統、Socket(套接字)鏈接和 Akka actors;

  高級來源 如 Kafka、Flume、Kinesis、Twitter 等,能夠經過額外的實用工具類建立。

    3.2.1 基礎來源

  在前面分析怎樣使用Spark Streaming的例子中咱們已看到ssc.socketTextStream()方法,能夠經過 TCP 套接字鏈接,從從文本數據中建立了一個 DStream。除了套接字,StreamingContext 的API還提供了方法從文件和 Akka actors 中建立 DStreams做爲輸入源。

  Spark Streaming提供了streamingContext.fileStream(dataDirectory)方法能夠從任何文件系統(如:HDFS、S三、NFS 等)的文件中讀取數據,而後建立一個DStream。Spark Streaming 監控 dataDirectory 目錄和在該目錄下任何文件被建立處理(不支持在嵌套目錄下寫文件)。須要注意的是:讀取的必須是具備相同的數據格式的文件;建立的文件必須在dataDirectory 目錄下,並經過自動移動或重命名成數據目錄;文件一旦移動就不能被改變,若是文件被不斷追加,新的數據將不會被閱讀。對於簡單的文本文,可使用一個簡單的方法streamingContext.textFileStream(dataDirectory)來讀取數據。

  Spark Streaming也能夠基於自定義 Actors 的流建立DStream ,經過 Akka actors 接受數據流,使用方法streamingContext.actorStream(actorProps, actor-name)。Spark Streaming使用 streamingContext.queueStream(queueOfRDDs)方法能夠建立基於 RDD 隊列的DStream,每一個RDD 隊列將被視爲 DStream 中一塊數據流進行加工處理。

    3.2.2 高級來源

  這一類的來源須要外部 non-Spark 庫的接口,其中一些有複雜的依賴關係(如 Kafka、Flume)。所以經過這些來源建立 DStreams 須要明確其依賴。例如,若是想建立一個使用Twitter tweets 的數據的DStream 流,必須按如下步驟來作:

  1)在 SBT 或 Maven工程裏添加 spark-streaming-twitter_2.10 依賴。

  2)開發:導入 TwitterUtils 包,經過 TwitterUtils.createStream 方法建立一個DStream。

  3)部署:添加全部依賴的 jar 包(包括依賴的spark-streaming-twitter_2.10 及其依賴),而後部署應用程序。

  須要注意的是,這些高級的來源通常在Spark Shell中不可用,所以基於這些高級來源的應用不能在Spark Shell中進行測試。若是你必須在Spark shell中使用它們,你須要下載相應的Maven工程的Jar依賴並添加到類路徑中。

  其中一些高級來源以下:

  Twitter Spark Streaming的TwitterUtils工具類使用Twitter4j,Twitter4J 庫支持經過任何方法提供身份驗證信息,你能夠獲得公衆的流,或獲得基於關鍵詞過濾流。

  Flume Spark Streaming能夠從Flume中接受數據。

  Kafka Spark Streaming能夠從Kafka中接受數據。

  Kinesis Spark Streaming能夠從Kinesis中接受數據。

  須要重申的一點是在開始編寫本身的 SparkStreaming 程序以前,必定要將高級來源依賴的Jar添加到SBT 或 Maven 項目相應的artifact中。常見的輸入源和其對應的Jar包以下圖所示。

  

  另外,輸入DStream也能夠建立自定義的數據源,須要作的就是實現一個用戶定義的接收器。

  3.3 DStream的操做

    與RDD相似,DStream也提供了本身的一系列操做方法,這些操做能夠分紅三類:普通的轉換操做、窗口轉換操做和輸出操做。

    3.3.1 普通的轉換操做

    普通的轉換操做以下表所示:

  

    在上面列出的這些操做中,transform()方法和updateStateByKey()方法值得咱們深刻的探討一下:

   transform(func)操做

  該transform操做(轉換操做)連同其其相似的 transformWith操做容許DStream 上應用任意RDD-to-RDD函數。它能夠被應用於未在 DStream API 中暴露任何的RDD操做。例如,在每批次的數據流與另外一數據集的鏈接功能不直接暴露在DStream API 中,但能夠輕鬆地使用transform操做來作到這一點,這使得DStream的功能很是強大。例如,你能夠經過鏈接預先計算的垃圾郵件信息的輸入數據流(可能也有Spark生成的),而後基於此作實時數據清理的篩選,以下面官方提供的僞代碼所示。事實上,也能夠在transform方法中使用機器學習和圖形計算的算法。

  updateStateByKey操做

  該 updateStateByKey 操做可讓你保持任意狀態,同時不斷有新的信息進行更新。要使用此功能,必須進行兩個步驟 :

  (1)  定義狀態 - 狀態能夠是任意的數據類型。

  (2)  定義狀態更新函數 - 用一個函數指定如何使用先前的狀態和從輸入流中獲取的新值 更新狀態。

  讓咱們用一個例子來講明,假設你要進行文本數據流中單詞計數。在這裏,正在運行的計數是狀態並且它是一個整數。咱們定義了更新功能以下:

    3.3.2 窗口轉換操做

  Spark Streaming 還提供了窗口的計算,它容許你經過滑動窗口對數據進行轉換,窗口轉換操做以下:

轉換

描述

window(windowLengthslideInterval)

返回一個基於源DStream的窗口批次計算後獲得新的DStream。

countByWindow(windowLength,slideInterval)

返回基於滑動窗口的DStream中的元素的數量。

reduceByWindow(funcwindowLength,slideInterval)

基於滑動窗口對源DStream中的元素進行聚合操做,獲得一個新的DStream。

reduceByKeyAndWindow(func,windowLength,slideInterval, [numTasks])

基於滑動窗口對(K,V)鍵值對類型的DStream中的值按K使用聚合函數func進行聚合操做,獲得一個新的DStream。

reduceByKeyAndWindow(func,invFunc,windowLengthslideInterval, [numTasks])

一個更高效的reduceByKkeyAndWindow()的實現版本,先對滑動窗口中新的時間間隔內數據增量聚合並移去最先的與新增數據量的時間間隔內的數據統計量。例如,計算t+4秒這個時刻過去5秒窗口的WordCount,那麼咱們能夠將t+3時刻過去5秒的統計量加上[t+3,t+4]的統計量,在減去[t-2,t-1]的統計量,這種方法能夠複用中間三秒的統計量,提升統計的效率。

countByValueAndWindow(windowLength,slideInterval, [numTasks])

基於滑動窗口計算源DStream中每一個RDD內每一個元素出現的頻次並返回DStream[(K,Long)],其中K是RDD中元素的類型,Long是元素頻次。與countByValue同樣,reduce任務的數量能夠經過一個可選參數進行配置。

      

  

  在Spark Streaming中,數據處理是按批進行的,而數據採集是逐條進行的,所以在Spark Streaming中會先設置好批處理間隔(batch duration),當超過批處理間隔的時候就會把採集到的數據彙總起來成爲一批數據交給系統去處理。

  對於窗口操做而言,在其窗口內部會有N個批處理數據,批處理數據的大小由窗口間隔(window duration)決定,而窗口間隔指的就是窗口的持續時間,在窗口操做中,只有窗口的長度知足了纔會觸發批數據的處理。除了窗口的長度,窗口操做還有另外一個重要的參數就是滑動間隔(slide duration),它指的是通過多長時間窗口滑動一次造成新的窗口,滑動窗口默認狀況下和批次間隔的相同,而窗口間隔通常設置的要比它們兩個大。在這裏必須注意的一點是滑動間隔和窗口間隔的大小必定得設置爲批處理間隔的整數倍。

  如批處理間隔示意圖所示,批處理間隔是1個時間單位,窗口間隔是3個時間單位,滑動間隔是2個時間單位。對於初始的窗口time 1-time 3,只有窗口間隔知足了才觸發數據的處理。這裏須要注意的一點是,初始的窗口有可能流入的數據沒有撐滿,可是隨着時間的推動,窗口最終會被撐滿。當每一個2個時間單位,窗口滑動一次後,會有新的數據流入窗口,這時窗口會移去最先的兩個時間單位的數據,而與最新的兩個時間單位的數據進行彙總造成新的窗口(time3-time5)。

  對於窗口操做,批處理間隔、窗口間隔和滑動間隔是很是重要的三個時間概念,是理解窗口操做的關鍵所在。

    3.3.3 輸出操做

  Spark Streaming容許DStream的數據被輸出到外部系統,如數據庫或文件系統。因爲輸出操做實際上使transformation操做後的數據能夠經過外部系統被使用,同時輸出操做觸發全部DStream的transformation操做的實際執行(相似於RDD操做)。如下表列出了目前主要的輸出操做:

  

轉換

描述

print()

在Driver中打印出DStream中數據的前10個元素。

saveAsTextFiles(prefix, [suffix])

將DStream中的內容以文本的形式保存爲文本文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsObjectFiles(prefix, [suffix])

將DStream中的內容按對象序列化而且以SequenceFile的格式保存。其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

saveAsHadoopFiles(prefix, [suffix])

將DStream中的內容以文本的形式保存爲Hadoop文件,其中每次批處理間隔內產生的文件以prefix-TIME_IN_MS[.suffix]的方式命名。

foreachRDD(func)

最基本的輸出操做,將func函數應用於DStream中的RDD上,這個操做會輸出數據到外部系統,好比保存RDD到文件或者網絡數據庫等。須要注意的是func函數是在運行該streaming應用的Driver進程裏執行的。

  

  dstream.foreachRDD是一個很是強大的輸出操做,它允將許數據輸出到外部系統。可是 ,如何正確高效地使用這個操做是很重要的,下面展現瞭如何去避免一些常見的錯誤。

  一般將數據寫入到外部系統須要建立一個鏈接對象(如 TCP鏈接到遠程服務器),並用它來發送數據到遠程系統。出於這個目的,開發者可能在不經意間在Spark driver端建立了鏈接對象,並嘗試使用它保存RDD中的記錄到Spark worker上,以下面代碼:

  

  這是不正確的,這須要鏈接對象進行序列化並從Driver端發送到Worker上。鏈接對象不多在不一樣機器間進行這種操做,此錯誤可能表現爲序列化錯誤(鏈接對不可序列化),初始化錯誤(鏈接對象在須要在Worker 上進行須要初始化) 等等,正確的解決辦法是在 worker上建立的鏈接對象。

  一般狀況下,建立一個鏈接對象有時間和資源開銷。所以,建立和銷燬的每條記錄的鏈接對象可能招致沒必要要的資源開銷,並顯著下降系統總體的吞吐量 。一個更好的解決方案是使用rdd.foreachPartition方法建立一個單獨的鏈接對象,而後使用該鏈接對象輸出的全部RDD分區中的數據到外部系統。

   這緩解了建立多條記錄鏈接的開銷。最後,還能夠進一步經過在多個RDDs/ batches上重用鏈接對象進行優化。一個保持鏈接對象的靜態池能夠重用在多個批處理的RDD上將其輸出到外部系統,從而進一步下降了開銷。

   須要注意的是,在靜態池中的鏈接應該按需延遲建立,這樣能夠更有效地把數據發送到外部系統。另外須要要注意的是:DStreams延遲執行的,就像RDD的操做是由actions觸發同樣。默認狀況下,輸出操做會按照它們在Streaming應用程序中定義的順序一個個執行。

相關文章
相關標籤/搜索