spark第六篇:Spark Streaming Programming Guide

預覽html

Spark Streaming是Spark核心API的擴展,支持高擴展,高吞吐量,實時數據流的容錯流處理。數據能夠從Kafka,Flume或TCP socket等許多來源獲取,而且可使用複雜的算法進行處理(好比map,reduce,join,window等高級函數)。最終,處理的結果數據能夠推送到文件系統,數據庫或實時儀表盤上。          java

在內部,它的工做原理以下圖。Spark Streaming接收實時輸入數據流並將數據分紅批,而後由Spark引擎處理,進而批量生成最終結果流。node

Spark Streaming提供了一個高層次的抽象,稱爲DStream(離散流),它表明連續數據流。DStream能夠經過Kafka,Flume等來源的輸入數據流建立,也能夠經過在其餘DStream上應用高級函數來建立。在內部,一個DStream被表示爲一系列RDD。git

本指南將向你介紹如何使用DStream編寫Spark Streaming程序。github

一個快速例子算法

假如咱們想統計從監聽TCP套接字的數據服務器接收到的文本數據中的字數。sql

java代碼示例:數據庫

    public static void main(String[] args) throws Exception {
        System.setProperty("hadoop.home.dir", "D:/Users/KOUSHENGRUI976/winutils-master/winutils-master/hadoop-2.6.4");
        SparkConf conf = new SparkConf().setAppName("heihei").setMaster("local[*]");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(5));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("127.0.0.1", 9999);
        JavaDStream<String> words = lines.flatMap(p -> Arrays.asList(p.split(" ")).iterator());
        JavaPairDStream<String, Integer> pairs = words.mapToPair(p -> new Tuple2<>(p, 1));
        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);
        wordCounts.print();
        jssc.start();
        jssc.awaitTermination();
    }

首先,咱們建立一個JavaStreamingContext對象,這是全部Spark Streaming程序的入口。使用這個StreamingContext對象建立一個DStream來表示來自TCP源的流數據,指定主機(好比127.0.0.1)和端口(好比9999)。以上lines表示從數據服務器接收的數據流,流中每條記錄都是一行文本。以後,咱們把lines用空格分割成words。flatMap是一個轉換操做,它經過從源DStream中的每條記錄生成多個新記錄來建立一個新的DStream。在這種狀況下,每一行被分紅多個單詞,單詞流被表示爲words DStream。flatMap()方法的參數是一個FlatMapFunction對象(接收一個入參,返回一個Iterator對象)。接下來,咱們要統計這些單詞:words DStream經過mapToPair()方法,進一步映射成pair DStream,而後調用reduceByKey()方法,求每批數據中每一個單詞的出現頻率。注意,執行以上行時,Spark Streaming只會設置它在啓動後執行的計算,並未實際處理。最終咱們調用StreamingContext 的start()方法,在全部轉換完成以後開始處理。完整的代碼能夠參閱JavaNetworkWordCountapache

若是你已經下載而且構建了Spark,你能夠如下面方式運行這個例子。你首先須要運行Netcat(若是提示nc命令找不到的話,就yum install nc 安裝便可),來做爲數據服務器:編程

$ nc -lk 9999

而後,在一個不一樣的終端,你能夠經過以下命令啓動這個例子:

$ ./bin/run-example streaming.JavaNetworkWordCount localhost 9999

run-example文件在SPAKR_HOME/bin目錄中。

觀察現象http://www.jianshu.com/p/59733597d448

基本概念

引入jar包

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.2.1</version>
</dependency>

若是數據來源是Kafka或者Flume的話,還須要引入整合jar包。Kafka整合jar包版本在Spark Streaming + Kafka Integration Guide有討論:

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-8 -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
    <version>2.2.1</version>
    <scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming-flume_2.11</artifactId>
    <version>2.2.1</version>
</dependency>

初始化StreamingContext

開發Spark Streaming應用時,StreamingContext對象(java語言中是JavaStreamingContext對象)是全部Spark Streaming程序的入口。JavaStreamingContext對象能夠經過SparkConf對象建立。這會在內部建立一個JavaSparkContext對象,能夠經過jssc.sparkContext()來獲取。批處理間隔必須根據應用程序和可用集羣資源的延遲要求來設置。

JavaStreamingContext對象也可經過現有的JavaSparkConext對象來建立:

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

定義JavaStreamingContext對象後,你必須執行如下操做:

1.經過建立input DStream來定義輸入源。

2.經過對DStream應用轉換和輸出操做定義流式計算。

3.調用JavaStreamingContext對象的start()方法開始接收數據並處理。

4.調用JavaStreamingContext對象的awaitTermination()方法來等待處理中止(人爲結束或者由於錯誤結束)。

5.能夠經過調用JavaStreamingContext對象的stop()方法來人爲中止處理。

須要注意的幾點:

1.一旦一個JavaStreamingContext對象啓動(調用start()方法)後,就不能創建或者添加新的流式計算了。

2.一旦一個JavaStreamingContext對象中止後,就不能再從新啓動了。

3.在一個JVM中,最多隻有一個StreamingContext能夠處於活躍狀態。

4.stop()方法會在中止JavaStreamingConext的同時中止SparkContext,若是隻想中止JavaStreamingContext,則須要調用有參的stop()方法,參數值爲false。

5.可使用一個SparkContext對象來建立多個JavaStreamingContext對象,只要先前的JavaStreamingContext對象在下一個JavaStreamingContext對象被建立以前中止(不中止SparkContext)就能夠。

離散流(DStream)

離散流(DStream)是Spark Streaming中提供的基本抽象。它表明了一個持續的數據流,或者是從源接收的輸入數據流,或者是經過轉換輸入流而生成的處理過的數據流。在內部,DStream由連續的RDD表示,每一個RDD都包含必定時間間隔的數據,以下圖所示:

在DStream上應用的任何操做都會轉換爲對RDD的操做。例如,在前面將lines DStream轉換成words DStream的例子中,flatMap操做是應用到lines DStream中的每個RDD的,進而生成words DStream中的RDD。以下圖所示:

RDD的這些轉換由Spark引擎處理。DStream操做隱藏了大部分細節,併爲開發人員提供了一個更高級別的API。這些操做將在後面的章節中詳細討論。

輸入DStream和接收器

輸入DStream表明從流數據源接收的DStream。在上面例子中,lines就是一個輸入DStream,它表明了從netcat服務器接收到的流數據。每個輸入DStream(除了文件流)都與一個接收器(Receiver)對象相關聯,該對象從一個源接收數據並將數據存儲在Spark的內存中以供處理。

Spark Streaming提供了兩類內置的流數據源:

1.基本數據源:好比文件系統、socket鏈接。

2.高級數據源:像Kafka,Flume等其餘中間件,客戶端編程時須要引入額外的jar依賴。

若是但願在流處理程序中同時接受多個數據流,則須要建立多個輸入DStream。這將建立多個接收器,同時接受多個數據流。

須要注意的是,每一個接收器都會佔用一個核(或者是線程,若是是以本地模式運行的話)。所以,當在本地運行Spark Streaming程序時,不要使用"local"或者"local[1]"做爲master URL,這兩個都意味着只有一個線程運行任務。若是你使用基於接收器的輸入DStream,則單線程將用於運行接收器,而不會處理接收到的數據。一樣的,以集羣模式運行時,分配給Spark Streaming程序的核心數也必須大於接收器數量,不然也只是接收數據而不作處理。

基本數據源

在上面例子中,咱們已經看了jssc.socketTextStream("127.0.0.1", 9999),它經過TCP socket接收文本數據並建立一個輸入DStream。除了socket外,StreamingContext API還提供了從文件建立輸入DStream的方法。

文件流:爲了從與HDFS API兼容的任何文件系統上(如HDFS,S3,NFS等)的文件中讀取數據,能夠經過如下方式建立輸入DStream:

jssc.fileStream(String directory, Class<K> kClass, Class<V> vClass, Class<F> fClass);

Spark Streaming將監視directory目錄,並處理在該目錄中建立的全部文件(不包括子目錄中的文件)。注意:

1.這些文件必須具備相同的數據格式。

2.這些文件必須經過經過原子移動或者重命名來建立。

3.一旦移動,文件不得更改。因此即便文件被連續追加,新的數據將不會被讀取。

對於簡單的文本文件,有一個更簡單的方法:

jssc.textFileStream(String directory);

文件流不須要運行接收器,因此不須要分配內核。

高級數據源

這類數據源須要整合其餘第三方中間件,好比Kafka和Flume。在編程方面,須要引入一些與第三方中間件整合使用的依賴jar,從這些源建立DStream的功能被移到了jar包中。

其中一些高級數據源以下:

Kafka:Spark Streaming2.2.1與Kafka broker0.8.2.1及更高版本兼容。有關更多詳細信息,請參閱Spark Streaming + Kafka Integration Guide

Flume:Spark Streaming2.2.1與Flume1.6.0及更高版本兼容。有關更多詳細信息,請參閱Spark Streaming + Flume Integration Guide

接收器可靠性

數據源根據其可靠性能夠分爲兩類。像Kafka和Flume這種數據源,容許傳輸的數據被確認。若是從這些可靠的數據源接收數據的系統正確地確認接收到的數據,則能夠確保沒有數據因爲任何故障而丟失。這致使兩種接收器:

1.可靠的接收器,一個可靠的接收器在收到數據並將數據存儲在Spark中時,能夠正確地向可靠的數據源發送確認。

2.不可靠的接收器,不可靠的接收器不會向數據源發送確認。這能夠用於不支持確認的數據源,或者不但願或者不須要確認的數據源。

DStream上的轉換操做

Similar to that of RDDs, transformations allow the data from the input DStream to be modified. DStreams support many of the transformations available on normal Spark RDD’s. Some of the common ones are as follows:

map(Function func):

flatMap(FlatMapFunction func)

filter(Function<T, Boolean> func)

repartition(int numPartitions)

union(JavaDStream otherDStream)  union(JavaPairDStream otherPairDStream)

count()  獲得一個JavaDStream<Long>實例

reduce(Function2 func)

countByValue()

reduceByKey(Function2 func)  僅可由JavaPairDStream實例調用,返回一個新的JavaPairDStream實例

join(JavaPairDStream otherPairDStream)  僅可由JavaPairDStream實例調用,返回一個新的JavaPairDStream實例

cogroup(JavaPairDStream otherPairDStream)  僅可由JavaPairDStream實例調用,返回一個新的JavaPairDStream實例

transform(Function func):在源DStream中的每一個RDD上應用任意的RDD-RDD函數,返回一個新的DStream。

updateStateByKey(Function2 updateFunc)  僅可由JavaPairDStream實例調用,返回一個新的JavaPairDStream實例

下面針對一些轉換操做作更詳細的解釋。

UpdateStateByKey操做

The updateStateByKey operation allows you to maintain arbitrary state while continuously updating it with new information。使用這個轉換操做須要兩個步驟:

1.定義狀態。狀態能夠是任意的數據類型。

2.定義狀態更新函數。用函數指定如何使用以前的狀態和來自輸入流的新值來更新狀態

在每一批數據中,Spark會對全部現有的key應用狀態更新函數,無論批數據是否有新數據。若是狀態更新函數返回none,則鍵值對將被消除。

咱們來舉個例子說明一下。假設你想保持在文本數據流中出現的每一個單詞出現的次數。在這裏,出現的次數是狀態,它是一個整數。咱們將更新函數定義爲:

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
  (values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
  };

這適用於包含單詞的DStream,例如,上面例子中的包含(word, 1)對的pairs DStream。

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

上例中的Optional類不是jdk8自帶的那個,而是org.apache.spark.api.java.Optional,在spark-core.jar中。Function2是個函數式接口,全類名是org.apache.spark.api.java.function.Function2,也在spark-core.jar中,相似的函數式接口還有Function、Function0、Function三、Function4。

The updateFunction will be called for each word, with newValues having a sequence of 1’s (from the (word, 1) pairs) and the runningCount having the previous count。須要注意的是,使用updateStateByKey必須配置checkpoint目錄,將在下面的checkpointing章節詳細討論。完整的Java代碼,請參閱JavaStatefulNetworkWordCount.java

Transform操做

transform操做及其變形(如transformWith、transformToPair、transformWithToPair),容許任意RDD-RDD函數應用於DStream。它能夠用於應用那些還沒有在DStream API中公開的RDD操做。例如,將數據流中的每一個批次與其餘數據集鏈接起來的功能沒有直接暴露在DStream API中。可是,你可使用transform操做完成這個功能。實際應用場景:經過將輸入數據流與預先計算的垃圾信息進行實時數據清理,而後基於此進行過濾。

import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
  rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
  ...
});

Note that the supplied function gets called in every batch interval。這容許你執行隨時間變化的RDD操做,便可以在批次之間更改RDD操做,更改分區數量或者廣播變量等等。

窗口操做

Spark Streaming還提供了窗口化的計算,容許你在滑動的數據窗口上應用轉換操做。下圖闡釋了滑動窗口:

如上圖所示,每當窗口滑過源DStream時,窗口內的源RDD被組合並操做以產生窗口DStream的RDD。上圖中,操做被應用在最後3個時間單位的數據上,而且滑動2個時間單位。這代表任何窗口操做都須要指定兩個參數:

1.窗口長度。窗口的持續時間(圖中是3個時間單位)

2.滑動間隔。執行窗口操做的間隔(圖中是2個時間單位)

這兩個參數必須是源DStream的批次間隔的整數倍。

下面以一個例子來講明窗口操做。咱們擴展下前面的例子,如今要統計前30s內出現的單詞的個數,每10s統計一次。To do this, we have to apply the reduceByKey operation on the pairs DStream of (word, 1) pairs over the last 30 seconds of data. This is done using the operation reduceByKeyAndWindow。

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

reduceByKeyAndWindow(Function2 reduceFunc, Duration windowDuration, Duration slideDuration),返回一個JavaPairDStream實例。

一些經常使用的窗口操做以下,它們都須要以上所說的兩個參數:

window(Duration windowDuration, Duration slideDuration)  

countByWindow(Duration windowDuration, Duration slideDuration)

countByValueAndWindow(Duration windowDuration, Duration slideDuration)

reduceByWindow(Function2 reduceFunc, Duration windowDuration, Duration slideDuration)

reduceByKeyAndWindow(Function2 reduceFunc, [Function2 invReduceFunc,] Duration windowDuration, Duration slideDuration)

Join操做

最後,值得強調的是,能夠輕鬆地在Spark Streaming中執行不一樣類型的鏈接。

Stream-Stream joins(僅JavaPairDStream有join相關方法,JavaDStream沒有)

JavaPairDStream<String, String> stream1 = ...
JavaPairDStream<String, String> stream2 = ...
JavaPairDStream<String, Tuple2<String, String>> joinedStream = stream1.join(stream2);

在這裏,在每一個批間隔中,由stream1生成的RDD將與由stream2生成的RDD鏈接。除了join(),還有leftOuterJoin()、rightOuterJoin()。

此外,在windowed stream上進行鏈接也是很經常使用的:

JavaPairDStream<String, String> windowedStream1 = stream1.window(Durations.seconds(20));
JavaPairDStream<String, String> windowedStream2 = stream2.window(Durations.minutes(1));
JavaPairDStream<String, Tuple2<String, String>> joinedStream = windowedStream1.join(windowedStream2);

Stream-dataset joins

這在DStream的transform操做章節已經講過了。如今咱們用一個windowed stream鏈接一個dataset:

JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

這裏解釋下,DStream與RDD鏈接不是DStream.join(RDD),DStream與RDD維度不同,DSteam是一連串的RDD,因此DSteam與RDD鏈接,其實指的是DStream中的RDD與RDD鏈接。

DStream上的輸出操做

輸出操做容許將DStream的數據推送到外部系統,如數據庫或者文件系統。像RDD的行爲操做和轉換操做的關係同樣,DStream輸出操做會觸發轉換操做的實際執行。

有如下常見輸出操做:

print():在driver node上打印DStream每一批數據的前10個元素,僅用於開發和調試。

saveAsHadoopFiles(String prefix, String suffix)

saveAsNewAPIHadoopFiles(String prefix, String suffix)

foreachRDD(VoidFunction func)、foreachRDD(VoidFunction2 func):對DStream/PairDStream的每一個RDD都應用func函數。

正確使用foreachRDD

foreachRDD()功能很強大,容許將數據發送到外部系統。可是,正確且高效使用該方法卻不簡單,一些常見的錯誤以下:

一般,將數據寫入外部系統須要建立鏈接對象,並使用它來將數據發送到遠程系統。爲此,開發人員可能會在Spark driver中建立鏈接對象,而後在Spark worker上使用它來將RDD的數據發送出去。以下代碼:

dstream.foreachRDD(rdd -> {
  Connection connection = createNewConnection(); // executed at the driver
  rdd.foreach(record -> {
    connection.send(record); // executed at the worker
  });
});

這是不正確的,由於這須要將鏈接對象序列化並從driver發送到worker。這種鏈接對象是很難跨機器傳輸的,這種錯誤可能表現爲序列化錯誤(鏈接對象不可序列化)或初始化錯誤(鏈接對象須要在worker初始化)等。正確作法是在worker建立鏈接對象。然而,這將會致使另外一個常見錯誤,即爲每個記錄建立一個鏈接對象。代碼以下:

dstream.foreachRDD(rdd -> {
  rdd.foreach(record -> {
    Connection connection = createNewConnection();
    connection.send(record);
    connection.close();
  });
});

一般狀況下,建立一個鏈接對象須要時間和資源的開銷。因此,爲每條記錄建立和銷燬鏈接對象可能會產生沒必要要的高開銷,而且會顯著下降系統的吞吐量。

更好的解決方案是使用rdd.foreachPartition(),爲每一個分區建立一個鏈接對象,並使用該鏈接發送分區中的全部記錄。代碼以下:

dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    Connection connection = createNewConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    connection.close();
  });
});

最後,經過跨批次重用鏈接對象,能夠進一步優化。咱們能夠維護一個靜態的鏈接對象池,裏面的鏈接對象能夠被重用。代碼以下:

dstream.foreachRDD(rdd -> {
  rdd.foreachPartition(partitionOfRecords -> {
    // ConnectionPool is a static, lazily initialized pool of connections
    Connection connection = ConnectionPool.getConnection();
    while (partitionOfRecords.hasNext()) {
      connection.send(partitionOfRecords.next());
    }
    ConnectionPool.returnConnection(connection); // return to the pool for future reuse
  });
});

至此,就是將數據發送到外部系統的最有效的解決方案。咱們再回顧一下這個方案,首先對DStream執行foreachRDD()方法,去操做每一個RDD的數據,而後,對每一個RDD執行foreachPartition()方法,去操做每一個RDD的每個分區的數據。此外,操做數據的鏈接要從鏈接池中拿,而且是懶初始化和有過時時間的。

其餘須要注意的幾點:

1.若是應用程序沒有對DStream執行輸出操做的話,那麼根本不會執行任何操做。系統只會簡單地接收數據並丟棄它。

2.默認狀況下,輸出操做是一次一個執行的,並且他們是按照在應用程序中定義的順序執行的。

DataFrame和SQL操做

能夠輕鬆使用DataFrame和SQL操做流數據。你必須使用StreamingContext正在使用的SparkContext來建立一個SparkSession對象。此外,必須這樣作才能在驅動程序故障時重啓。這是經過懶加載建立一個SparkSession單例實現的。下面的例子展現了這一點,它修改了以前的單詞計數示例,使用DataFrame和SQL生成字數。每一個RDD都轉換爲一個DataFrame,註冊爲一個臨時表,而後使用SQL進行查詢。

/** Java Bean class */
public class JavaRow implements java.io.Serializable {
  private String word;
  public String getWord() {
    return word;
  }
  public JavaRow(String word) {
    this.word = word;
  }
}

/** DataFrame operations inside your streaming program */
JavaDStream<String> words = ... 
words.foreachRDD((rdd, time) -> {
  SparkSession spark = SparkSession.builder().config(conf).getOrCreate();
  JavaRDD<JavaRow> rowRDD = rdd.map(word -> new JavaRow(word));
  Dataset<Row> wordsDF = spark.createDataFrame(rowRDD, JavaRow.class);
  wordsDF.createOrReplaceTempView("words");
  Dataset<Row> wordCountsDF = spark.sql("select word, count(*) as total from words group by word");
  wordCountsDF.show();
});

查看完整的源代碼。JavaSqlNetworkWordCount.java

MLib操做

用的時候再學。

緩存/持久化 Caching/Persistence

同RDD相似,DStream也容許開發人員將流數據保存在內存中。也就是說,調用DStream的persist()方法會自動將該DStream中的每一個RDD都保存在內存中。若是DStream中的數據被屢次計算的話,這將很是有用。對於像reduceByWindow()和reduceByKeyAndWindow()這樣的基於窗口的操做以及像updateStateByKey()這樣的基於狀態的操做,這是隱含的。所以,基於窗口的操做生成的DStream會自動持久化到內存中,開發人員無需顯式調用persist()方法。

對於經過網絡接收數據的輸入流,例如Kafka,Flume,sockets等,默認持久化級別是將數據複製到兩個節點以實現容錯。

請注意,與RDD不一樣,DStream默認持久化級別使數據在內存中保持序列化。這在Performance Tuning進一步討論。

檢查點 Checkpointing

流應用必須全天候運行,因此必須對與應用程序邏輯無關的故障(例如,系統故障,JVM崩潰等)具備恢復能力。爲了作到這一點,Spark Streaming須要存儲足夠的信息到容錯的存儲系統,以便從故障中恢復。有兩種類型的檢查點數據:

1.元數據檢查點。將定義流式計算的信息保存到HDFS等容錯系統中。This is used to recover from failure of the node running the driver of the streaming application (discussed in detail later)。元數據包括:

①配置--用於建立流應用程序的配置

②DStream操做--定義流應用程序的操做集合

③incomplete batches--Batches whose jobs are queued but have not completed yet

2.數據檢查點。將生成的RDD保存到可靠的存儲系統中。這在將多個批次的數據組合在一塊兒的有狀態轉換中是必需的。在這樣的轉換中,生成的RDD依賴於以前批次的RDD,致使依賴鏈的長度隨着時間的推移不斷的增長。爲了不恢復時間的這種無限增加(與依賴鏈成比例),有狀態轉換的中間RDD被週期性地保存到可靠存儲系統中(例如HDFS),以切斷依賴鏈。

什麼時候啓用檢查點

必須爲具備如下任何要求的應用程序啓用檢查點:

1.程序中有狀態轉換操做--若是在程序中使用了updateStateByKey()或者reduceByKeyAndWindow(with inverse function),那麼就必須提供檢查點目錄以容許週期性地保存中間RDD。

2.從運行應用程序的驅動程序的故障中恢復--Metadata checkpoints are used to recover with progress information。

請注意,沒有應用上述狀態轉換操做的簡單的流應用程序能夠不啓用檢查點。在這種狀況下,從驅動程序故障中恢復也將是部分的(一些接收到但未處理的數據可能會丟失)。這一般是能夠接受的,許多都以這種方式運行Spark Streaming應用程序。預計對非Hadoop環境的支持將來將獲得改善。

如何配置檢查點

檢查點能夠經過設置一個在容錯的、可靠的文件系統(例如HDFS、S3)中的目錄來啓用,檢查點信息將被保存到該文件系統中。這是經過調用StreamingContext實例的checkpoint(String directory)方法來實現的。這將容許你使用上述的狀態轉換操做。此外,若是你想讓應用程序從驅動程序故障中恢復,則應該重寫你的流應用程序以具備如下行爲:

當程序第一次啓動時,它會建立一個新的StreamingContext實例,設置全部的流,而後調用start()方法。

程序在失敗後重啓時,將會根據檢查點目錄中的檢查點數據從新建立一個StreamingContext實例。

這種行爲能夠經過JavaStreamingContext.getOrCreate(String checkpointPath, Function0<JavaStreamingContext> creatingFunc)方法很簡單地實現:

String checkpointDirectory = "hdfs://192.168.100.100:9000/checkpoint/application1";
Function0<JavaStreamingContext> creatingFunc = () -> {
    SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("SparkStreamingText2");
    JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
    JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);
    // set checkpoint directory
    jssc.checkpoint(checkpointDirectory);
    return jssc;
};

// Get JavaStreamingContext from checkpoint data or create a new one
JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, creatingFunc);

// Do additional setup on context that needs to be done, irrespective of whether it is being started or restarted
//        context. ...

// Start the context
context.start();
try {
    context.awaitTermination();
} catch (InterruptedException e) {
    e.printStackTrace();
}

JavaStreamingContext.getOrCreate(String checkpointPath, Function0<JavaStreamingContext> creatingFunc)方法,若是checkpointPath存在,則將根據檢查點數據從新建立JavaStreamingContext實例。若是checkpointPath不存在(即第一次運行),那麼creatingFunc函數將被調用,來建立一個新的JavaStreamingContext實例並設置輸入DStream。

除了使用getOrCreate()方法外,還須要確保驅動程序進程在失敗時自動從新啓動。這隻能經過運行應用程序的部署基礎結構來完成,在下面的部署應用章節有詳細的討論。

請注意,RDD的檢查點致使了存儲到可靠存儲系統的成本。這可能會致使RDD檢查點的批處理時間變長。所以,檢查點須要當心設置。在批間隔很小的狀況下(例如1s),檢查點可能會顯著下降吞吐量。並且,若是檢查點過於頻繁,則會致使譜系和任務規模增加,這可能有不利影響。

對於須要RDD檢查點的有狀態轉換操做,默認的間隔是批間隔的整數倍。它能夠經過調用DStream的checkpoint(Duration checkpointInterval)方法來設置。一般狀況下,檢查點間隔設置爲滑動間隔(若是有的話)的5-10倍是一個很好的嘗試。

累加器,廣播變量和檢查點

累加器和廣播變量是不能從檢查點恢復的。若是你啓用了檢查點,並使用了累加器和廣播變量,那麼你必須建立一個懶實例化的累加器和廣播變量單例,以便在驅動程序從新啓動時從新實例化這些實例。

見下面例子:

class JavaWordBlacklist {

  private static volatile Broadcast<List<String>> instance = null;

  public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaWordBlacklist.class) {
        if (instance == null) {
          List<String> wordBlacklist = Arrays.asList("a", "b", "c");
          instance = jsc.broadcast(wordBlacklist);
        }
      }
    }
    return instance;
  }
}

class JavaDroppedWordsCounter {

  private static volatile LongAccumulator instance = null;

  public static LongAccumulator getInstance(JavaSparkContext jsc) {
    if (instance == null) {
      synchronized (JavaDroppedWordsCounter.class) {
        if (instance == null) {
          instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
        }
      }
    }
    return instance;
  }
}

wordCounts.foreachRDD((rdd, time) -> {
  // Get or register the blacklist Broadcast
  Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
  // Get or register the droppedWordsCounter Accumulator
  LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
  // Use blacklist to drop words and use droppedWordsCounter to count them
  String counts = rdd.filter(wordCount -> {
    if (blacklist.value().contains(wordCount._1())) {
      droppedWordsCounter.add(wordCount._2());
      return false;
    } else {
      return true;
    }
  }).collect().toString();
  String output = "Counts at time " + time + " " + counts;
}

所有代碼請參閱JavaRecoverableNetworkWordCount.

部署應用

本節討論部署Spark Streaming應用程序的步驟。

要求

要運行一個Spark Streaming應用程序,你須要如下:

1.具備集羣管理器的集羣--這是任何Spark應用程序的通常要求。

2.打包應用程序jar--你必須將你的流應用程序編譯成jar包。

3.爲executor節點配置足夠的內存--因爲接收到的數據必須存儲在內存中,executor節點必須配置足夠的內存以保存接收到的數據。請注意,若是你正在進行10分鐘的窗口操做,則系統必須在內存中保留至少10分鐘的數據。因此應用程序須要的內存取決於在其中使用的操做。

4.配置檢查點 - 若是流應用程序須要配置檢查點,那麼必須將Hadoop API兼容的容錯存儲系統(HDFS)中的一個目錄設置爲檢查點目錄,使得流應用能以檢查點信息來進行故障恢復。

5.配置驅動程序的自動重啓 - 爲了能自動從驅動程序故障中恢復,運行流應用程序的部署基礎架構必須監視驅動程序進程,並在驅動程序失敗時從新啓動驅動程序。不一樣的集羣管理器有不一樣的工具來實現:

①Spark Standalone - Spark驅動應用程序能夠提交到Spark Standalone集羣中運行。也就是說,驅動應用程序自己在其中一個工做節點上運行。此外,Standalone集羣管理器能夠被指示監督驅動應用程序,並在驅動程序因爲非零退出代碼或者因爲所在的節點故障而失敗時從新啓動它。查看Spark Standalone來獲取更多詳情。

②YARN - YARN支持相似的機制來重啓應用程序。

6.配置預寫入日誌 - Spark能夠啓用預寫入日誌來實現強大的容錯保證。啓用以後,從接收器接收到的全部數據將被寫入到檢查點目錄中的預寫入日誌中。這能夠避免驅動程序恢復時數據丟失,從而確保數據零丟失(在下面容錯語義部分詳細討論)。啓用的方式是設置配置參數spark.streaming.receiver.writeAheadLog.enable爲true。然而,這些更強語義多是以單個接收器的接收吞吐量爲代價的。這能夠經過並行運行更多的接收器來增長總吞吐量來糾正。此外,建議在啓用預寫入日誌時,Spark接收數據不要存儲副本,由於至關於一個副本的日誌已存儲在容錯存儲系統中。這能夠經過將輸入流的存儲級別設置爲StorageLevel.MEMORY_AND_DISK_SER來實現。查看Spark Streaming Configuration獲取更多詳情。請注意,啓用I/O加密時,Spark不會加密寫入預寫入日誌的數據。若是須要對預寫入日誌的數據進行加密,則應將其存儲在原生支持加密的文件系統中。

7,.設置最大接收速率 - 若是集羣資源不足以使流式應用程序處理數據的速度與接收速度同樣快,從1.5開始,Spark提供了一個叫作backpressure的特性,啓用此特性後,Spark Streaming會自動調整速率限制。啓用方式是設置配置參數spark.streaming.backpressure.enabled爲true。

升級應用程序代碼

若是正在運行的Spark Streaming應用程序須要使用新的應用程序代碼升級,則有兩種機制可供選擇:

1.新的Spark Streaming應用程序啓動並與舊應用程序並行運行。一旦新應用程序(接收的數據與舊應用程序相同)被預熱好,舊應用程序就能夠中止。請注意,在數據源支持發送數據到兩個目的地的狀況下,這種機制纔可使用。

2.舊應用程序平滑關閉(調用JavaStreamingContext的stop()方法),以確保已接收到的數據在關閉完被處理。而後啓用新應用程序,該應用程序從舊應用程序中止的同一點開始處理。請注意,只有使用支持源端緩衝的數據源(如Kafka和Flume)才能完成此操做,由於數據須要在舊應用程序關閉、新應用程序啓動前進行緩衝。同時,還須要使用不一樣的檢查點目錄來啓動新應用程序,或者刪除以前的檢查點目錄。由於檢查點信息本質上包含序列化的Scala/Java/Python對象,試圖用新應用程序的類來反序列化這些對象可能會出錯。

監控應用程序

除了Spark自己的監控功能外,Spark Streaming還提供了一些其餘特定功能。當使用StreamingContext時,Spark Web UI會顯示一個額外的Streaming選項卡,它顯示有關正在運行的接收器(接收器是否處於活動狀態,接收到的記錄數量,接收器錯誤等)和已完成的批次(批處理時間,排隊延遲等),這可用來監控流應用程序的進度。

Web UI中的如下兩個指標尤其重要:

1.處理時間 - 處理每批數據的時間。

2.Scheduling Delay - 批次在隊列中等待處理先前批次的時間(the time a batch waits in a queue for the processing of previous batches to finish)。

若是批處理時間一直比批處理間隔大,或者scheduling delay持續增大,則表示系統處理數據的速度比不上接收數據的速度。在這種狀況下,考慮如何減小批處理時間。

Spark Streaming程序的進度也可使用StreamingListener接口進行監控,該接口容許你獲取接收器狀態和處理時間等信息。

性能調整

使集羣上的Spark Streaming應用程序表現出最佳的性能須要進行一些調整。本節介紹可調整的參數和配置,以提升應用程序的性能。從較高層次考慮,你須要考慮兩件事情:

1.經過有效利用集羣資源減小批處理時間。

2.設置正確的批次大小,以使得批處理的速度能跟得上接收數據的數據。

減小批處理時間

Spark有不少優化可使每一個批次的處理時間最少。這些在Tuning Guide有詳細討論。本節介紹一些最重要的內容:

數據接收的並行度

經過網絡接收數據(如Kafka,Flume,socket等)須要將數據反序列化並存儲到Spark中。若是數據接收成爲系統中的瓶頸,則能夠考慮並行化數據接收。請注意,每一個輸入DStream都會建立一個接收器。所以能夠經過建立多個輸入DStream並配置它們接收流數據源的不一樣分區。例如,單個接收兩個主題數據的輸入DStream能夠被切分爲兩個輸入DStream,每一個輸入DStream接收一個主題的數據。這將建立兩個接收器,並行接收數據,從而提升總體吞吐量。多個DStream能夠經過union操做鏈接成一個DStream。以下;

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

另外一個應該考慮的參數是接收器的塊間隔,它由配置參數spark.streaming.blockInterval決定。對於大多數接收器來講,接收到的數據在存儲在Spark的內存以前會被合併成塊。每一個批次的塊數決定了將用於處理數據的任務數量。塊數=batch interval / block interval。例如,批間隔設置爲2s,塊間隔設置爲200ms,那麼會建立10個任務。若是任務數太少(少於每臺機器的內核數),那麼效率將會很低,由於不是全部的內核都用於處理數據。要增長任務數量,就須要減小塊間隔。可是,建議的塊間隔最少爲50ms,低於此值,任務啓動開銷可能會成爲問題。

用多個輸入DStream、多個接收器去接收數據的另外一種方法是顯示從新分配輸入DStream的分區(調用inputStream.repartition(int numPartitions))。

數據處理的並行度

若是在計算的某些階段使用的並行任務數量不夠高,則集羣資源可能未被充分利用。例如,對於像reduceByKey和reduceByKeyAndWindow這樣的分佈式reduce操做,並行任務數量的默認值由spark.default.parallelism配置屬性控制。你能夠將並行級別看成一個參數傳遞,或者設置上述屬性以更改默認值。

數據序列化

數據序列化的開銷能夠經過調整序列化格式來減小。在Spark Streaming中,有兩種類型的數據被序列化;

1.輸入數據。默認狀況下,經過接收器接收到的輸入數據會以StorageLevel.MEMORY_AND_DISK_SER_2級別存儲在executor的內存中。useDisk,useMemory,not useOffHeap,not deserialized,2 replication。也就是說,將數據序列化爲字節以減小GC開銷,並有2個副本用於容錯。此外,數據首先保存在內存中,而且只有當內存不足以保存流式計算所需的全部輸入數據時纔會溢出到磁盤。這個序列化顯然有CPU開銷 - 接收器必須反序列化接收的數據,並使用Spark的序列化格式從新序列化它。

2.持久化經過流操做生成的RDD。由流計算生成的RDD可能會持久化到內存中。例如,窗口操做會將數據存儲到內存中,由於它們將被屢次處理。可是,與Spark Core默認的StorageLevel.MEMORY_ONLY級別不一樣,流式計算生成的RDD持久化級別是StorageLevel.MEMORY_ONLY_SER,以最大限度地減小GC開銷。

在以上兩種狀況下,使用Kryo序列化既能夠減小CPU又能夠減小內存開銷。有關更多詳細信息,查看Spark Tuning Guide。對於Kryo,請考慮註冊自定義類,並禁用對象引用跟蹤。查看Spark Configuration

若是須要爲流應用程序保存的數據量不大的話,能夠將數據保存爲反序列化對象而不會致使過分的GC開銷。例如,若是使用幾秒鐘的批間隔而且沒有窗口操做,那麼你能夠經過顯式設置存儲級別來禁用持久化數據的序列化。這能夠減小因爲序列化形成的CPU開銷,並可能在不增長太多GC開銷的狀況下提升性能。綜上,序列化會形成很較大的CPU開銷,可是減小GC開銷。

任務啓動開銷

若是每秒啓動的任務不少(好比說50或者更多),那麼向slaves發送任務的開銷就會很大,這使得很難達到亞秒級的延遲。能夠經過如下更改來減小開銷:

Execution mode:以Spark Standalone模式運行Spark或者以粗粒度的Mesos模式運行Spark會比以細粒度的Mesos模式運行Spark有更好的任務啓動時間。查看Running Spark on Mesos獲取更多詳情。

這種更改可能會使批處理時間減小100ms,從而容許亞秒批大小可行。亞秒:沒有達到秒,也就是說不到一秒。

設置合適的批間隔

爲了讓Spark Streaming應用程序穩定地運行在集羣上,系統應該有能力像接收數據同樣快速處理數據。換句話說,數據處理應該像數據生成同樣快。經過Web UI,咱們能夠看到批處理時間是否比批間隔小,進而判斷程序是否穩定。

爲你的應用程序找出合適的批大小的一個好方法是用一個保守的批間隔(比方說,5-10s)和較低的數據速率進行測試。要驗證系統是否可以跟上數據速率,能夠檢查每一個處理過的批次所經歷的端到端延遲的值(能夠在Spark驅動程序log4j日誌中查找"Total delay",或者使用StremingListener接口)。若是延遲保持與批大小至關,那麼系統是穩定的。不然,若是延遲不斷增長,則意味着系統沒法跟上,不穩定。一旦你有了一個穩定的配置,你能夠嘗試提升數據速率或者減少批大小。注意,只要延遲小於批大小,因爲數據速率增長而引發的瞬時延遲增長是正常的。

內存調整

Tune Spark中詳細討論了調整Spark應用程序的內存使用狀況和GC行爲,必須閱讀。在本節中,咱們將專門討論Spark Streaming應用程序中的一些調優參數。

Spark Streaming應用程序須要的內存大小很大程度上取決於所使用的轉換操做。例如,若是你想在最後10分鐘的數據上使用窗口操做,那麼你的集羣應該有足夠的內存來存儲10分鐘的數據。或者你想在有不少key的狀況下使用updateStateByKey操做,那麼就須要不少內存。相反,若是你想作一個簡單的map-filter-store操做,那麼就不須要不少內存。

一般,由於經過接收器接收的數據是以StorageLevel.MEMORY_AND_DISK_SER_2級別存儲的,內存中放不下的數據會溢出到磁盤,這可能會下降流應用程序的性能,所以建議根據流應用程序的須要提供足夠的內存。最好嘗試一下小規模的內存使用狀況並作相應的評估。

內存調整的另外一個方面是GC(垃圾回收)。對於須要低延遲的流應用程序,不但願由於JVM垃圾回收而形成大量暫停。有幾個參數能夠幫你調整內存使用狀況和GC開銷:

1.DStream的持久化級別:如在上面數據序列化部分所說,輸入數據和RDD默認以序列化的字節被持久化。與反序列化的持久化相比,這同時減小了內存使用和GC開銷。啓用Kryo序列化進一步減小了序列化的大小和內存使用。進一步減小內存使用還能夠經過壓縮來實現(查看Spark的配置參數spark.rdd.compress),代價是cpu時間。

2.清理舊數據:默認狀況下,全部的輸入數據和持久化的RDD是自動清理的。Spark Streaming根據使用的轉換操做來決定什麼時候清理數據。例如,你正在使用10分鐘的窗口操做,則Spark Streaming將保留最近10分鐘的數據,並主動丟棄舊數據。經過設置streamingcontext.remember,數據能夠保留更長的時間。

3.CMS垃圾回收器:強烈建議使用CMS(concurrent mark-and sweep,併發標記掃描)GC,以使GC相關的暫停持續低水平。儘管併發GC會下降系統的總體吞吐量,但仍推薦使用並行GC來實現更加一致的批處理時間。確保在驅動程序上(在spark-submit中使用 --driver-java-options )和executor上(使用Spark配置參數spark.executor.extraJavaOptions)設置了CMS GC。

4.其餘技巧:爲了進一步減小GC開銷,這裏有更多的技巧值得嘗試。

①用StorageLevel.OFF_HEAP存儲級別持久化RDD。(useDisk,useMemory,useOffHeap,not deserialized,1 replication)

②使用更多的executor和較小的堆大小。這會下降每一個JVM 堆內的GC壓力。

須要記住的幾點:

1.每一個DStream都與一個接收器相關聯(除了文件流)。爲了並行讀取數據,須要建立多個接收器,即多個DStream。接收器在executor上運行,它佔據一個內核。確保除去接收器所佔內核以後,還有足夠的內核來處理數據。spark.cores.max應該把接收器的內核也算在內。接收器以循環的方式分配給executor。

2.當從數據源接收數據時,接收器建立數據塊。每隔塊間隔(blockInterval)都會生成一個新的數據塊。在批間隔中會生成N個數據塊,N=批間隔/塊間隔。這些塊由當前executor的塊管理器分發給其餘executor的塊管理器。以後,驅動程序上運行的的網絡輸入跟蹤器(Network Input Tracker)將被通知塊的位置以供進一步處理。

3.An RDD is created on the driver for the blocks created during the batchInterval. The blocks generated during the batchInterval are partitions of the RDD. Each partition is a task in spark. blockInterval== batchinterval would mean that a single partition is created and probably it is processed locally.

4.The map tasks on the blocks are processed in the executors (one that received the block, and another where the block was replicated) that has the blocks irrespective of block interval, unless non-local scheduling kicks in. Having bigger blockinterval means bigger blocks. A high value of spark.locality.wait increases the chance of processing a block on the local node. A balance needs to be found out between these two parameters to ensure that the bigger blocks are processed locally.

5.Instead of relying on batchInterval and blockInterval, you can define the number of partitions by calling inputDstream.repartition(n). This reshuffles the data in RDD randomly to create n number of partitions. Yes, for greater parallelism. Though comes at the cost of a shuffle. An RDD’s processing is scheduled by driver’s jobscheduler as a job. At a given point of time only one job is active. So, if one job is executing the other jobs are queued.

6.If you have two dstreams there will be two RDDs formed and there will be two jobs created which will be scheduled one after the another. To avoid this, you can union two dstreams. This will ensure that a single unionRDD is formed for the two RDDs of the dstreams. This unionRDD is then considered as a single job. However the partitioning of the RDDs is not impacted.

容錯語義

在本節中,咱們將討論Spark Streaming應用程序在發生故障時的行爲。

背景:

爲了理解Spark Streaming提供的語義,讓咱們記住Spark RDD的基本容錯語義:

1.RDD是一個不可變的,能夠從新計算的,分佈式的數據集。Each RDD remembers the lineage of deterministic operations that were used on a fault-tolerant input dataset to create it.

2.若是RDD的任何分區因爲工做節點故障而丟失,那麼這個分區能夠由原始數據集經過操做譜系從新計算出來。

3.假定RDD的全部轉換操做都是肯定的,最終轉換的RDD中的數據老是相同的,而無論Spark集羣中的故障如何。

Spark操做容錯文件系統(好比說HDFS)中的數據。因此,全部由容錯數據生成的RDD也應該是容錯的。然而,對於Spark Streaming,狀況並不是如此,由於大多數狀況下是經過網絡接收的數據(除了文件流)。

要爲全部生成的RDD實現相同的容錯屬性,須要爲接收的數據在集羣工做節點的多個executor上建立副本(默認是2個副本)。這致使系統中有兩種數據在發生故障時須要恢復:

1.接收到以及建立了副本的數據。這些數據在單個工做節點故障時仍然存在,由於它在另外一個工做節點上有副本。

2.Data received but buffered for replication。由於這些數據沒有副本,恢復這些數據惟一的方法是從數據源再接收一次。

此外,有兩種失敗咱們應該關注:

1.工做節點失敗。運行executor的任意一個工做節點均可能會失敗,此時這些節點上全部的內存中的數據都會丟失。若是在失敗節點上有接收器運行,那麼它們緩衝的數據將會丟失。

2.驅動程序節點失敗。若是運行Spark Streaming應用的驅動程序節點失敗,那麼顯然SparkContext會丟失,全部executor及其內存中的數據都會丟失。

定義

關於每一個記錄可能會被系統處理多少次,有三種可能:

1.At most once。每一個記錄將被處理一次或者根本不處理。

2.At least once。每一個記錄將被處理一到屢次。這比最多一次要強一些,由於它確保不會丟失任何數據,可是可能有重複。

3.Exactly once。每一個記錄將被處理一次,沒有數據丟失,也沒有數據被重複處理。這是三個最好的。

基本語義

在任何流處理系統中,廣義來說,處理數據有三個步驟。

1.接收數據:數據經過接收器或其餘方式從源接收數據。

2.轉換數據:使用DStream和RDD的轉換操做來轉換接收到的數據。

3.輸出數據:最終轉換的數據被輸出到外部系統,好比文件系統,數據庫或者儀表盤等。

若是一個流應用程序必須實現端到端的exactly-once保證,那麼每一個步驟都必須提供exactly-once保證。也就是說,每一個記錄只能被接收一次,只能被轉換一次,並被輸出到下游系統一次。讓咱們在Spark Streaming的上下文中理解這些步驟的語義。

1.接收數據:不一樣的輸入源提供不一樣的保證。在下面會討論。

2.轉換數據:因爲RDD提供的保證,全部接收的數據都會被正好處理一次。即便有故障,只要接收到的輸入數據是可訪問的,最終轉換的RDD將老是具備相同的內容。

3.輸出數據:輸出操做默認保證至少一次,由於它依賴於輸出操做的類型(是否冪等性idempotent)以及下游系統(是否支持事務)。可是用戶能夠實現本身的事務機制來實現exactly-once。在下面會討論。

接收數據的語義

不一樣的輸入源提供不一樣的保證,從最少一次到恰好一次。

文件流

若是全部輸入數據都在容錯系統中,如HDFS,那麼Spark Streaming能夠從任意故障中恢復並處理全部數據。這就是恰好一次,意味着全部的數據都會被恰好處理一次,無論什麼失敗。

基於接收器的源

對於基於接收器的輸入源,容錯語義取決於故障狀況和接收器的類型。正如咱們以前討論過的,有兩種類型的接收器;

1.可靠接收器。這些接收器僅在接收到的數據建立完副本後纔去確認通知可靠的數據源。若是這樣的接收器失敗了,那麼數據源不會收到緩衝(未複製)數據的確認通知,因此,若是接收器重啓的話,數據源將從新發送數據,而且沒有數據會因爲失敗而丟失。

2.不可靠接收器。這些接收器不會發送確認通知,因此在工做節點或者驅動程序故障時可能會丟失數據。

若是一個工做節點失敗,可靠的接收器不會丟失數據。對於不可靠的接收器,接收可是沒有建立副本的數據會丟失。若是驅動程序故障,那麼無論是可靠接收器仍是不可靠接收器,除了上面這些丟失外,全部在內存中接收和複製的數據都會丟失,這會影響有狀態轉換操做的結果。 If the driver node fails, then besides these losses, all of the past data that was received and replicated in memory will be lost. This will affect the results of the stateful transformations.???

爲了不以前接收到的數據丟失,從1.2版本開始,Spark引入了預寫入日誌,將接收到的數據保存到容錯存儲系統中。預寫入日誌的啓用和可靠的接收器能夠確保數據零丟失(即便是驅動程序故障狀況下)。就語義而言,它提供了至少一次的保證。

Kafka直接API

從1.3版本開始,咱們引入了一個新的Kafka Direct API,它能夠確保全部的Kafka數據只被Spark Streaming接收一次。除此以外,若是你執行的是exactly-once的輸入操做,那麼能夠實現端到端的exactly-once。參閱Kafka Integration Guide查看詳情。

輸出操做的語義

輸出操做(好比foreachRDD)有至少一次的語義。也就是說,轉換後的數據可能在工做節點失敗的狀況下不止一次地被寫入外部實體。雖然這對於使用saveAs*HadoopFiles操做保存到文件系統是能夠接受的(由於文件將被相同的數據簡單地覆蓋),可是可要額外的努力來實現exactly-once。有兩種方法:

1.冪等更新(Idempotent updates):屢次嘗試老是寫相同的數據。例如,saveAs*HadoopFiles老是將相同的數據寫到生成的文件中。

2.事務更新(Transctional updates):全部的更新都以事務方式進行,所以更新只能以原子方式進行一次。下面是一個具體實現方法:

①使用批次時間(在foreachRDD中可用)和RDD的分區索引建立一個標識符,該標識符惟一標識流應用程序中的blob data。

②使用這個標識符事務性地更新外部系統。Update external system with this blob transactionally (that is, exactly once, atomically) using the identifier. 也就是說,若是這個標識符還沒提交,就以原子方式提交分區數據和這個標識符。不然,若是該標識符已經提交的話,就跳過更新。

dstream.foreachRDD { (rdd, time) =>
  rdd.foreachPartition { partitionIterator =>
    val partitionId = TaskContext.get.partitionId()
    val uniqueId = generateUniqueId(time.milliseconds, partitionId)
    // use this uniqueId to transactionally commit the data in partitionIterator
  }
}
相關文章
相關標籤/搜索