SparkStreaming

SparkStreaming(1) ~ SparkStreaming編程指南

之因此寫這部份內容的緣由是, 不管是網絡上能夠直接找到的資料, 仍是出版的書籍種種, 版本大都在1.6~2.0不等, 且資源零零散散, 須要處處百度, 蒐羅資源.html

但根據我的開發了一段時間的感受來看, 會遇到的絕大多數問題, 均可以在官方文檔中找到答案.java

所以也能夠理解爲這是官方文檔的部分翻譯.node

我的英文水平有限, 若有錯漏歡迎指正.mysql

就目前來看, 主要分爲這樣幾個板塊.git

  1. Spark Streaming Programming Guide 也即SparkStreaming編程指南.github

  2. Submitting Applications Spark部署發佈相關算法

  3. Tuning Spark Spark調優sql

  4. Spark Configuration Spark可用配置, 可選參數.數據庫

目前已經有了Spark Streaming的中文翻譯. 參考:apache

Spark Streaming編程指南

Spark編程指南

內容自己會比較多, 所以會拆開來, 分多篇介紹.

在這裏就不從word count的簡單示例開始了, 而是直接從基礎概念開始.

Maven依賴

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-core_2.11</artifactId>
    <version>2.4.3</version>
</dependency>

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.12</artifactId>
    <version>2.4.3</version>
    <scope>provided</scope>
</dependency>

而一樣當前版本對應的中間件:

Source  Artifact
Kafka   spark-streaming-kafka-0-10_2.12
Flume   spark-streaming-flume_2.12
Kinesis spark-streaming-kinesis-asl_2.12 [Amazon Software License]

而更完整的, 更新的中間件 Maven 倉庫路徑爲:

Maven repository

若是以爲欠缺什麼, 不妨找找試試.

初始化Streaming Context

爲了初始化一個 Spark Streaming 程序,一個 StreamingContext 對象必需要被建立出來,它是全部的 Spark Streaming 功能的主入口點.

有兩種建立方式:

import org.apache.spark.*;
import org.apache.spark.streaming.api.java.*;

SparkConf conf = new SparkConf().setAppName(appName).setMaster(master);
JavaStreamingContext ssc = new JavaStreamingContext(conf, new Duration(1000));

其中:

appName 是在Spark UI上展現所使用的名稱.

master 是一個 Spark, Mesos or YARN cluster URL, 不瞭解也不要緊, 這部分會在Spark-submit時介紹到.

master 這個指的是Spark項目運行所在的集羣. 若是想在本地啓動SparkStreaming項目: 可使用一個特殊的 「local[*]」 , 啓動Spark的本地模式, *表示會自動檢測系統的內核數量.

然而在集羣環境下, 通常不採用硬編碼的方式使用spark, 即 setMaster. 咱們有更好的方式 經過 spark-submit 在提交時指定master參數便可.

須要注意到的是, 這句代碼會在內部建立一個 SparkContext, 能夠經過 ssc.sparkContext 訪問使用.

batch interval 也即 new Duration(1000) 在這裏指的是毫秒值, 還能夠採用Durations來建立.

Durations.seconds(5)

這個時間, 必須根據您的應用程序和可用的集羣資源的等待時間要求進行設置.

另外一種建立 SparkStreamingContext的方式爲:

import org.apache.spark.streaming.api.java.*;

JavaSparkContext sc = ...   //existing JavaSparkContext
JavaStreamingContext ssc = new JavaStreamingContext(sc, Durations.seconds(1));

在已經有了Context以後, 咱們須要作的是:

  1. 建立Input DStreams, 如kafka就有相應的方法能夠建立DStream

  2. 對輸入流作 轉換 處理, 也即咱們的功能部分.

  3. 開始接收輸入而且使用 streamingContext.start() 來處理數據.

  4. 使用 streamingContext.awaitTermination() 等待處理被終止(手動或者因爲任何錯誤).

  5. 使用 streamingContext.stop() 來手動的中止處理.

同時, 有幾點須要注意的地方:

  • 一旦一個 context 已經啓動,將不會有新的數據流的計算能夠被建立或者添加到它.

  • 一旦一個 context 已經中止,它不會被從新啓動.

  • 同一時間內在 JVM 中只有一個 StreamingContext 能夠被激活. 也即假設在使用SparkStreaming的同時, 須要依賴 SparkContext 或 SparkSQL等作一些操做, 此時不能從新建立 SparkContext 或是 SparkSQL(由於SparkSQL依然會建立Context.) 須要直接使用ssc.sparkContext.

  • 調用 ssc.stop() 會在中止 SparkStreamingContext的同時 中止 SparkContext. 若是須要僅中止 StreamingContext,須要使用 ssc.stop(false);

  • 一個 SparkContext 就能夠被重用以建立多個 StreamingContexts,只要前一個 StreamingContext 在下一個StreamingContext 被建立以前中止(不中止 SparkContext, 即便用 ssc.stop(false)).

在這裏額外添加一條說明, ssc.stop 能夠接收第二個參數, 是指定是否執行完當前批次的剩餘數據.

Discretized Streams

Discretized Stream or DStream 是 Spark Streaming 提供的基本抽象.

有且僅有兩種方式建立一個DStream, 第一種是經過 Spark的API去建立流, 第二種是從一個流轉換成另外一個流.

在內部,一個 DStream 被表示爲一系列連續的 RDDs,它是 Spark 中一個不可改變的抽象,distributed dataset.在一個 DStream 中的每一個 RDD 包含來自必定的時間間隔的數據,以下圖所示.

應用於 DStream 的任何操做轉化爲對於底層的 RDDs 的操做.例如,在 先前的示例,轉換一個行(lines)流成爲單詞(words)中,flatMap 操做被應用於在行離散流(lines DStream)中的每一個 RDD 來生成單詞離散流(words DStream)的 RDDs.以下所示.

所以對於RDD支持的操做, DStream也基本都支持.

Input DStreams 和 Receivers(接收器)

輸入 DStreams 是表明輸入數據是從流的源數據(streaming sources)接收到的流的 DStream.每個 input DStream(除了 file stream 以外)與一個 Receiver 對象關聯,它從 source(數據源)中獲取數據,而且存儲它到 Spark 的內存中用於處理.

receiver的java代碼以下:

class MyReceiver extends Receiver<String> {

    public MyReceiver(StorageLevel storageLevel) {
        //StorageLevel表示存儲級別
        super(storageLevel);
    }

    public void onStart() {
        //1. 啓動線程, 打開Socket鏈接, 準備開始接收數據
        //2. 啓動一個非阻塞線程去接收數據.
        //3. 調用Store方法將數據存儲到 Spark的內存中, store方法有多種實現,支持將多種多樣的數據進行存儲.
        //4. 在發生錯誤或異常時根據自身的處理策略調用stop, restart, reportError 方法.
    }

    public void onStop() {
        //清理各類線程,未關閉的連接等等
    }
}

Spark Streaming 提供了兩種內置的 streaming source(流的數據源).

  • Basic sources(基礎的數據源):在 StreamingContext API 中直接可使用的數據源.例如:file systems 和 socket connections.

  • Advanced sources(高級的數據源):像 Kafka,Flume,Kinesis,等等這樣的數據源.能夠經過對應的maven repository 找到依賴.

須要注意到的是, 若是你想要在你的流處理程序中並行的接收多個數據流,你能夠建立多個 input DStreams.這將建立同時接收多個數據流的多個 receivers(接收器),然而,一個 Spark 的 worker/executor 是一個長期運行的任務(task),所以它將佔用分配給 Spark Streaming 的應用程序的全部核中的一個核(core).

所以,須要記住,一個 Spark Streaming 應用須要分配足夠的核(core)(或線程(threads),若是本地運行的話)來處理所接收的數據,以及來運行接收器(receiver(s)).

所以相應的就須要在建立master的時候 不要使用local[1] 或 local 僅分配一個線程, 這將會使得receiver獲得一個線程 而對應的程序則沒有線程能夠處理.

在集羣模式下, 則須要分配適當的核心數.

而在使用中, 個人數據源是來自於kafka, 使用的是 kafkaUtils.createDirectStream. 而使用的 core數只有1, 或採用 local[1] 也可以正常運行, 這是否是說明上面的說法是錯誤的呢?

並非, 由KafkaUtils.createDirectStream 建立的是DStream, 而並不是單純的使用 receiver的方式實現.

若是採用了自定義的 receiver, 那麼此時經過 javaSparkContext.receiveData() 的方式建立流, 就至少須要兩個線程, 或兩個核心纔可以正常運行.

Basic Sources

  • ssc.socketTextStream() 經過Socket來讀取數據.

  • 經過文件讀取數據, 須要注意的是,文件系統必須是與 HDFS API 兼容的文件系統中(即,HDFS,S3,NFS 等),一個 DStream 能夠像下面這樣建立:

    聲明: 對於下面所描述的這種方式,我我的並無通過驗證, 因爲我的使用的數據源主要是kafka, mysql, es. 在嘗試過程當中fileStream並不能直接使用, 所以有如下猜測.

    streamingContext.textFileStream(dataDirectory);

    而windows的文件系統是 NTFS,這就要求咱們有對應的文件環境才行.

    參考:Hadoop入門系列(一)Window環境下搭建hadoop和hdfs的基本操做

    Spark Streaming 將監控dataDirectory 目錄 以及 該目錄中任何新建的文件 (寫在嵌套目錄中的文件是不支持的)

    須要注意的幾個地方有:

    • 能夠直接監控路徑, 也能夠經過目錄的通配符來監控,hdfs://namenode:8040/logs/ 或 hdfs://namenode:8040/logs/2017/* 都是能夠的.

    • 文件必須具備相同的數據格式.

    • 在讀取文件的時候,參考的時間是 文件的更新時間,而非建立時間.

    • 一旦被加載,即便文件被更新了 在當前窗口內 也不會被從新讀取, 所以即便文件不停被追加新的內容, 可是並不會讀入.

    • 文件越多,檢索文件變動狀況所須要的時間也就越多,即便大多數文件都沒有變動.

    • 若是使用的是通配符的方式 去識別文件目錄,如: hdfs://namenode:8040/logs/2016-*, 在這種狀況下, 經過重命名文件夾 也能夠將對應文件夾下的文件 加入被監控的文件列表中, 固然須要修改時間在對應的window內.

    • 經過調用 FileSystem.setTimes() (hadoop api) 能夠更改文件的更新時間.

    如下部分是對文件流的一個說明, 如何將文件轉換成DStream的概念?

    參考: org.apache.spark.streaming.dstream.FileInputDStream scala api

    *                      |<----- remember window ----->|
     * ignore threshold --->|                             |<--- current batch time
     *                      |____.____.____.____.____.____|
     *                      |    |    |    |    |    |    |
     * ---------------------|----|----|----|----|----|----|-----------------------> Time
     *                      |____|____|____|____|____|____|
     *                             remembered batches

    依然是按照時間批次來將數據轉換成RDDs,整合成 DStream.

    在每一個時間批次中,檢測 被監控的文件列表 若是修改時間在 current batch範圍內的, 在歸入列表, 轉換成DStream, 在excutor的執行期間 新加入的文件,放入下一批次進行處理.

    而文件的修改時間 在 ignore threshold 以後的,則會被忽略掉.

    要求:

    • 運行Spark的系統時間 要與對應的 文件系統時間保持一致.

    • 文件必須在相同的文件系統下經過 atomically(原子的)moving(移動) 或 renaming(重命名) 到數據目錄.

    而duration的定義則是經過:

    spark.streaming.fileStream.minRememberDuration

    默認是一分鐘, 即讀取修改時間在一分鐘之內的文件.

    更細節的能夠自行解讀代碼實現.

  • Queue of RDDs as a Stream(RDDs 隊列做爲一個流)

    爲了使用測試數據測試 Spark Streaming 應用程序,還可使用 streamingContext.queueStream(queueOfRDDs) 建立一個基於 RDDs 隊列的 DStream,每一個進入隊列的 RDD 都將被視爲 DStream 中的一個批次數據,而且就像一個流進行處理.

Advanced Sources(高級數據源)

Kafka: Spark Streaming 2.4.3 要求 Kafka versions 0.8.2.1 以上版本.

官方參考連接: Kafka Integration Guide

我的參考連接: SparkStreaming-Kafka集成

Custom Sources(自定義數據源)

DStreams 可使用經過自定義的 receiver(接收器)接收到的數據來建立.

Spark Streaming Custom Receivers

receiver的大體建立過程在上面已經提到過了.

案例代碼實現:

JavaCustomReceiver.java.

經過以下方式使用 自定義的 Receiver

// Assuming ssc is the JavaStreamingContext
JavaDStream<String> customReceiverStream = ssc.receiverStream(new JavaCustomReceiver(host, port));
JavaDStream<String> words = customReceiverStream.flatMap(s -> ...);

Receiver Reliability(接收器的可靠性)

有兩種receiver, 可靠性, 不可靠性, 區別就在於對於數據的失敗處理上, 可靠receiver並不會丟失數據,而不可靠receiver則不對 數據安全性 提供任何保障.

從數據源上來講, 自己就存在兩種數據源, 如 kafka flume所提供的可靠性數據源.可以對 下發處理數據的 響應 作出相應的處理. 而不可靠數據源, 只負責下發數據.

若是想要實現一個 可靠的 receiver, 須要注意的是, 即便採用的是可靠數據源, 也不必定就是可靠的receiver.

若是你想實現一個可靠的數據接收器,必須用store方法,這是一個阻塞的方法,在它存入spark內部時纔會返回值.若是接受器用的Storage Level 是複製(也可使用默認),那麼在複製完後纔會獲得返回值.所以,在肯定完數據可靠存儲以後,再適當的發送確認信息.這就避免了若是在存儲中途失敗,也不會致使數據丟失.由於緩衝區的數據沒有被確認,那麼數據源將會從新發送.

若是是不可靠接收器,那麼無須以上邏輯,他只是簡單地接收數據並存儲到Spark內存中而已,但並不是說不可靠的 接收器就毫無優勢:

  • 系統會自動把數據分割爲 大小合適的 塊
  • 若是限制速率已經被指定, 那麼系統會自動控制接收速率
  • 因爲上面提到的優勢, 所以實現起來更爲簡單.

而與之相對的, 可靠的接收器就須要本身實現數據分塊, 以及速率控制, 而實現方式主要取決於數據源.

DStreams 上的 Transformations(轉換)

DStreams 支持不少在 RDD 中可用的 transformation 算子, 至於transformation 和 action 算子的區別, 能夠自行百度瞭解.

參考連接: spark算子

而更詳細的須要查看官方API

參考連接: http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/api/java/JavaRDD.html

其中返回值爲各類RDD的通常都是 transformation算子, 不然爲 action算子.

在茫茫多的 transformation中 選擇幾個比較特別的來詳細說明下:

UpdateStateByKey

updateStateByKey 操做容許你維護任意狀態,同時不斷更新新信息.你須要經過兩步來使用它:

  1. 定義 state - state 能夠是任何的數據類型.
  2. 定義 state update function(狀態更新函數)- 使用函數指定如何使用先前狀態來更新狀態,並從輸入流中指定新值.

在每一個 batch 中,Spark 會使用狀態更新函數爲全部已有的 key 更新狀態,無論在 batch 中是否含有新的數據.若是這個更新函數返回一個 none,這個 key-value pair 也會被消除.

Function2<List<Integer>, Optional<Integer>, Optional<Integer>> updateFunction =
(values, state) -> {
    Integer newSum = ...  // add the new values with the previous running count to get the new count
    return Optional.of(newSum);
};

JavaPairDStream<String, Integer> runningCounts = pairs.updateStateByKey(updateFunction);

請注意,使用 updateStateByKey 須要配置的 checkpoint(檢查點)的目錄.

可是, updateStateByKey 有不可避免的缺點.

參考: [spark streaming] 狀態管理 updateStateByKey&mapWithState

總結來講:

updateStateByKey底層是將preSateRDD和parentRDD進行co-group,而後對全部數據都將通過自定義的mapFun函數進行一次計算,即便當前batch只有一條數據也會進行這麼複雜的計算,大大的下降了性能,而且計算時間會隨着維護的狀態的增長而增長.

mapWithstate底層是建立了一個MapWithStateRDD,存的數據是MapWithStateRDDRecord對象,一個Partition對應一個MapWithStateRDDRecord對象,該對象記錄了對應Partition全部的狀態,每次只會對當前batch有的數據進行跟新,而不會像updateStateByKey同樣對全部數據計算.

所以纔會有 mapWithstate 的性能遠遠高於 updateStateByKey

Transform 算子

這是Spark中 自由度最高的一個算子, Spark官方API提供的算子畢竟是有限的, 可能確實不可以知足你的要求, 所以纔會有了這個 transform算子.

其核心做用是:

容許在 DStream 運行任何 RDD-to-RDD 函數.它可以被用來應用任何沒在 DStream API 中提供的 RDD 操做.例如,鏈接數據流中的每一個批(batch)和另一個數據集的功能並無在 DStream API 中提供,然而你能夠簡單的利用 transform 方法作到.

import org.apache.spark.streaming.api.java.*;
// RDD containing spam information
JavaPairRDD<String, Double> spamInfoRDD = jssc.sparkContext().newAPIHadoopRDD(...);

JavaPairDStream<String, Integer> cleanedDStream = wordCounts.transform(rdd -> {
    rdd.join(spamInfoRDD).filter(...); // join data stream with spam information to do data cleaning
    ...
});

transform方法在每一個批次都會進行調用, 所以能夠根據不一樣時間進行相應的處理.

但須要注意的一點是:

雖然transform是 transformation算子, 可是 並不意味着其中的方法必然是在job分配完, 真實提交以後才執行.

緣由也正是在於這個算子的靈活性至關高. 能夠在其中嵌入任何RDD操做.

參考連接: Spark Streaming 誤用.transform(func)函數致使的問題解析

致使其問題的根本緣由就在於 在 transform中執行的 action操做, 是會在 生成job的時候執行的.

通過測試,還會有這樣一點問題:

首先transform 確確實實是會在 job生成的時候執行相關代碼,若是有action的話, 而且使用的線程也是 job generator線程。 其次, 在transform以前的算子 執行次序是會在transform以前的, 如在transform 以前有過filter, 那麼filter必定是在transform以前執行的。

在這一點上, 我更傾向於 transform中的 action 提早引起了 transform算子以前的 算子 執行運算。而並無等到 後續的 真正的 dstream的action觸發時再執行。

然而這並不意味着咱們能夠省掉後續的action算子。 若是沒有後續的 dstream的action算子, 生成job的舉動也不會有, 所以更不會觸發transform中的action。

Window

Spark Streaming 也支持 windowed computations(窗口計算),它容許你在數據的一個滑動窗口上應用 transformation(轉換).下圖說明了這個滑動窗口.

如上圖顯示,窗口在源 DStream 上 slides(滑動),合併和操做落入窗內的源 RDDs,產生窗口化的 DStream 的 RDDs.在這個具體的例子中,程序在三個時間單元的數據上進行窗口操做,而且每兩個時間單元滑動一次.這說明,任何一個窗口操做都須要指定兩個參數.

window length(窗口長度) - 窗口的持續時間.

sliding interval(滑動間隔) - 執行窗口操做的間隔.

這兩個參數必須是 source DStream 的 batch interval(批間隔)的倍數.

// Reduce last 30 seconds of data, every 10 seconds
JavaPairDStream<String, Integer> windowedWordCounts = pairs.reduceByKeyAndWindow((i1, i2) -> i1 + i2, Durations.seconds(30), Durations.seconds(10));

經常使用的 window操做以下:

Transformation(轉換) Meaning(含義)
window(windowLength, slideInterval) 返回一個新的 DStream,它是基於 source DStream 的窗口 batch 進行計算的.
countByWindow(windowLength, slideInterval) 返回 stream(流)中滑動窗口元素的數
reduceByWindow(func, windowLength, slideInterval) 返回一個新的單元素 stream(流),它經過在一個滑動間隔的 stream 中使用 func 來聚合以建立.該函數應該是 associative(關聯的)且 commutative(可交換的),以便它能夠並行計算
reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]) 在一個 (K, V) pairs 的 DStream 上調用時,返回一個新的 (K, V) pairs 的 Stream,其中的每一個 key 的 values 是在滑動窗口上的 batch 使用給定的函數 func 來聚合產生的.Note(注意): 默認狀況下,該操做使用 Spark 的默認並行任務數量(local model 是 2,在 cluster mode 中的數量經過 spark.default.parallelism 來肯定)來作 grouping.您能夠經過一個可選的 numTasks 參數來設置一個不一樣的 tasks(任務)數量.
reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]) 上述 reduceByKeyAndWindow() 的更有效的一個版本,其中使用前一窗口的 reduce 值逐漸計算每一個窗口的 reduce值.這是經過減小進入滑動窗口的新數據,以及 「inverse reducing(逆減)」 離開窗口的舊數據來完成的.一個例子是當窗口滑動時」添加」 和 「減」 keys 的數量.然而,它僅適用於 「invertible reduce functions(可逆減小函數)」,即具備相應 「inverse reduce(反向減小)」 函數的 reduce 函數(做爲參數 invFunc </ i>).像在 reduceByKeyAndWindow 中的那樣,reduce 任務的數量能夠經過可選參數進行配置.請注意,針對該操做的使用必須啓用 checkpointing.
countByValueAndWindow(windowLength, slideInterval, [numTasks]) 在一個 (K, V) pairs 的 DStream 上調用時,返回一個新的 (K, Long) pairs 的 DStream,其中每一個 key 的 value 是它在一個滑動窗口以內的頻次.像 code>reduceByKeyAndWindow 中的那樣,reduce 任務的數量能夠經過可選參數進行配置.

若是能夠, 而且通常均可以調用 reduceByKeyAndWindow 的第二個版本, 描述了那麼多,其實旨在說明, 當採用滑動窗口的時候, 有兩種對數據的處理方式, 其一是 每次都去統計最新的 窗口中的 全部數據. 其二則是, 在原有數據基礎上作出必定更新,這就要求 對已經離開窗口的數據作 ‘減量’ 操做, 對新進入窗口的數據作 ‘增量’ 操做.也即須要提供的 參數:

func 對數據作reduce操做, 完成統計更新

invFunc 對數據作‘減量’ 操做 在原有基礎上進行更新.

Join

join 與 Sql中的 Join極爲相似, 在 SQL中 join操做,要以 join on 「on」後面的參數爲準, 而在 Spark中, 則要求 進行join 的兩個 RDD 或 DStream 都必須是 PairRDD 或是 PairDStream.

這樣在進行join操做的時候, SQL中的 「on」 在Spark中就變成了, Tuple2中的第一個參數.

須要注意的是 join操做最終合併成一個流, 所以也會將多個分區的數據進行合併, 是一次窄依賴變換, 所以最終會造成新的分區.同時通常能夠自行制定分區數, 若是不指定, 則使用Spark默認的分區數.

至於Spark默認的分區數: 參考連接:Spark RDD的默認分區數:(spark 2.1.0)

其核心就是:

sc.defaultParallelism = spark.default.parallelism
sc.defaultMinPartitions = min(spark.default.parallelism,2)

而RDD的分區數,則是 若是在從新分區時指定了分區數, 則採用分區數, 不然,就使用默認值.

另外, 分區的默認方式/規則 是 HashPartition

  1. Join

    Join:Join相似於SQL的inner join操做,返回結果是前面和後面集合中配對成功的,過濾掉關聯不上的.

  2. leftOuterJoin

    leftOuterJoin相似於SQL中的左外關聯left outer join,返回結果之前面的RDD爲主,關聯不上的記錄爲空.從返回類型上就可略知一二.

    JavaPairDStream[K, (V, Optional[W])]

    其第二個參數是 Optional, 能夠接收空值.

  3. rightOuterJoin

    rightOuterJoin相似於SQL中的有外關聯right outer join,返回結果以參數也就是右邊的RDD爲主,關聯不上的記錄爲空.

    JavaPairDStream[K, (Optional[V], W)]
  4. fullOuterJoin

    fullOuterJoin相比前幾種來講並不常見,是 左外 右外鏈接的 結合. 最終的結果 是兩個 流的並集. 返回的數據類型是:

    JavaPairDStream[K, (Optional[V], Optional[W])]

對於Join還有一種不錯的用法:

JavaPairRDD<String, String> dataset = ...
JavaPairDStream<String, String> windowedStream = stream.window(Durations.seconds(20));
JavaPairDStream<String, String> joinedStream = windowedStream.transform(rdd -> rdd.join(dataset));

能夠將RDD 與 流數據進行join操做, 進而完成流數據與 固定數據集的合併.

實際上,您也能夠動態更改要加入的 dataset.提供給 transform 的函數是每一個 batch interval(批次間隔)進行評估,所以能夠將 dataset 引用指向當前的 dataset.

偶然間看到的案例:

Spark Streaming 流計算優化記錄(2)-不一樣時間片數據流的Join

主要解決的就是 來自 HDFS的數據 與 流數據合併.

但在這個案例中是否是有一種更合理的處理方式? 即每隔固定時間去 更新dataset, 在transform中 將流的RDD 與 HDFS的數據合併.

若是是不一樣的 batch interval 之間的數據能夠合併嗎?

目前來看並無看到這種可能性, 對於 SparkContext, 同時只可以啓動一個 SparkStreamingContext, 而在SparkStreamingContext啓動時就須要指定 batch interval, 所以好像不太可能出現 多個 batchInterval.

倒不如說是 JavaPairRDD 與 JavaPairDStream 之間的合併.

DStreams 上的輸出操做

輸出操做 解釋
print() 在運行流應用程序的 driver 節點上的DStream中打印每批數據的前十個元素.這對於開發和調試頗有用.
saveAsTextFiles(prefix, [suffix]) 將此 DStream 的內容另存爲文本文件.每一個批處理間隔的文件名是根據 前綴 和 後綴:"prefix-TIMEIN_MS[.suffix]"_ 生成的.
saveAsObjectFiles(prefix, [suffix]) 將此 DStream 的內容另存爲序列化 Java 對象的 SequenceFiles.每一個批處理間隔的文件名是根據 前綴 和 後綴:"prefix-TIMEIN_MS[.suffix]"_ 生成的.
saveAsHadoopFiles(prefix, [suffix]) 將此 DStream 的內容另存爲 Hadoop 文件.每一個批處理間隔的文件名是根據 前綴 和 後綴:"prefix-TIMEIN_MS[.suffix]"_ 生成的.
foreachRDD(func) 對從流中生成的每一個 RDD 應用函數 func 的最通用的輸出運算符.此功能應將每一個 RDD 中的數據推送到外部系統,例如將 RDD 保存到文件,或將其經過網絡寫入數據庫.請注意,函數 func 在運行流應用程序的 driver 進程中執行,一般會在其中具備 RDD 動做,這將強制流式傳輸 RDD 的計算.

須要注意的是: foreachRDD 自己是運行在 Driver節點的, 而一般要對 RDD作相應的 action 操做. 而這部分操做則是在 各自的 work 上執行的.

foreachRDD 設計模式的使用

錯誤操做:

dstream.foreachRDD(rdd -> {
    Connection connection = createNewConnection(); // 在driver節點執行
    rdd.foreach(record -> {
        connection.send(record); // 在 worker節點執行.
    });
});

而Connection每每又沒法被序列化, 所以在 worker節點上依然拿不到鏈接.

dstream.foreachRDD(rdd -> {
    rdd.foreach(record -> {
        Connection connection = createNewConnection();
        connection.send(record);
        connection.close();
    });
});

所以每每須要採用以下這種方式:

dstream.foreachRDD(rdd -> {
    rdd.foreachPartition(partitionOfRecords -> {
        // ConnectionPool is a static, lazily initialized pool of connections
        Connection connection = ConnectionPool.getConnection();
        while (partitionOfRecords.hasNext()) {
        connection.send(partitionOfRecords.next());
        }
        ConnectionPool.returnConnection(connection); // return to the pool for future reuse
    });
});

若是僅僅是想單純的處理數據, 但並不在這一就須要進行foreachRDD 用以完成流的執行計算呢?

僞代碼以下:

dstreamTemp = dstream.foreachRDD(rdd -> {
    rdd.foreachPartition(partitionOfRecords -> {
        Connection connection = ConnectionPool.getConnection();
        while (partitionOfRecords.hasNext()) {
            //進行查詢數據, 填充 補全, 過濾 當前stream中的數據, 用如下一步繼續處理.
        }
        ConnectionPool.returnConnection(connection); // return to the pool for future reuse
    });
});

dstreamTemp.nextAction;

但foreachRDD 是不會返回流的, 所以能夠採用

dstreamTemp = dstream.mapPairtition();
dstreamTemp = dstream.mapToPairPartition();

另外關於 foreachRDD 須要注意的是:

DStreams 經過輸出操做進行延遲執行,就像 RDD 由 RDD 操做懶惰地執行.具體來講,DStream 輸出操做中的 RDD 動做強制處理接收到的數據.所以,若是您的應用程序沒有任何輸出操做,或者具備 dstream.foreachRDD() 等輸出操做,而在其中沒有任何 RDD 操做,則不會執行任何操做.系統將簡單地接收數據並將其丟棄.

默認狀況下,輸出操做是 one-at-a-time 執行的, 且按照它們在應用程序中定義的順序執行.

DataFrame and SQL

Spark SQL在之後的篇章在詳細介紹,在流數據上使用 DataFrame 和 SQL也是一件很簡單的事情.

首先須要經過 SparkStreaming 獲取對應的 SparkContext, 然後經過SparkContext建立 SparkSession. 而且必須建立, 這樣就能夠在 driver出現故障的時候, 從新啓動. 咱們能夠將 Spark RDD 轉換成 DataFrame,註冊爲臨時表,然後進行SQL查詢. 案例代碼以下:

/** Java Bean class for converting RDD to DataFrame */
public class JavaRow implements java.io.Serializable {
    private String word;

    public String getWord() {
        return word;
    }

    public void setWord(String word) {
        this.word = word;
    }
}

...

/** DataFrame operations inside your streaming program */

JavaDStream<String> words = ... 

words.foreachRDD((rdd, time) -> {
    // Get the singleton instance of SparkSession
    SparkSession spark = SparkSession.builder().config(rdd.sparkContext().getConf()).getOrCreate();

    // Convert RDD[String] to RDD[case class] to DataFrame
    JavaRDD<JavaRow> rowRDD = rdd.map(word -> {
        JavaRow record = new JavaRow();
        record.setWord(word);
        return record;
    });
    DataFrame wordsDataFrame = spark.createDataFrame(rowRDD, JavaRow.class);

    //能夠理解爲 定義表名
    wordsDataFrame.createOrReplaceTempView("words");

    // Do word count on table using SQL and print it
    DataFrame wordCountsDataFrame =
        spark.sql("select word, count(*) as total from words group by word");
    wordCountsDataFrame.show();
});

代碼全鏈接: source code

並不只僅是這些, SparkSQL的另外一大優勢是, 可以跨線程 訪問 其餘Streaming Data定義的 DataFrame.

須要主動將 SparkStreaming 設置爲記住足量數據. 由於 對於 當前線程的 StreamingData 並不知道此時還有來自其餘線程的SQL查詢, 會自動刪除清理 已經不須要的數據.

例如,若是要查詢最後一個批次,可是您的查詢可能須要5分鐘才能運行,則能夠調用 streamingContext.remember(Minutes(5))

緩存 / 持久性

與 RDD 相似,DStreams 還容許開發人員將流的數據保留在內存中.也就是說,在 DStream 上使用 persist() 方法會自動將該 DStream 的每一個 RDD 保留在內存中.若是 DStream 中的數據將被屢次計算(例如,相同數據上的多個操做),這將很是有用.對於基於窗口的操做,如 reduceByWindow 和 reduceByKeyAndWindow 以及基於狀態的操做,如 updateStateByKey,這是隱含的.所以,基於窗口的操做生成的 DStream 會自動保存在內存中,而不須要開發人員調用 persist().

對於經過網絡接收數據(例如:Kafka,Flume,sockets 等)的輸入流,默認持久性級別被設置爲將數據複製到兩個節點進行容錯.

請注意,與 RDD 不一樣,DStreams 的默認持久性級別將數據序列化在內存中.

spark的計算是lazy的,只有在執行action時才真正去計算每一個RDD的數據.要使RDD緩存,必須在執行某個action以前定義RDD.persist().

而在使用完畢以後, 最好也可以主動調用 unpersist() 釋放內存, 固然, 並不意味着, 若是不主動調用, 就不會釋放內存, 它會遵循 LRU原則, 進行內存的釋放, 無效cache的刪除.

參考:Spark cache的用法及其誤區分析

在參考文檔中,有一點我有不一樣的意見:

cache以後必定不能當即有其它算子,不能直接去接算子.由於在實際工做的時候,cache後有算子的話,它每次都會從新觸發這個計算過程.

測試代碼以下:

JavaSparkContext jsc = createJavaSparkContext();
List<Integer> list = new ArrayList<>();
for (int i = 0; i < 16; i++) {
    list.add(i);
}

JavaRDD<Integer> rdd = jsc.parallelize(list);
JavaRDD<Integer> rdd1 = rdd.filter(item -> {
    System.out.println("rdd1:" + item);
    return item > 3;
}).cache();
JavaRDD<Integer> rdd2 = rdd1.filter(item -> {
    System.out.println("rdd2:" + item);
    return item > 6;
}).cache();
rdd2.count();

輸出結果並無將 rdd1:1 重複輸出兩遍,也即意味着 cache以後有算子, 只會將cache以後的算子進行計算, 而已經計算過的並不會致使重複計算. 所以咱們能夠放心使用.

CheckPoint機制

鑑於 SparkStreaming 必須永久運行, 所以對於 程序無關的錯誤, 如系統錯誤, JVM崩潰等問題.具備可恢復的功能.所以 Spark必須保存部分信息, 到容錯存儲系統. check point有兩種類型:

  • Metadata checkpointing - 將定義 streaming 計算的信息保存到容錯存儲(如 HDFS)中,元數據包括:

    • Configuration - 用於建立流應用程序的配置.
    • DStream operations - 定義 streaming 應用程序的 DStream 操做集.
    • Incomplete batches - 已經進入隊列,可是未完成的 Job的 批次.
  • Data checkpointing - 將生成的 RDD 保存到可靠的存儲.這在 基於 批次之間數據具備關聯性 上的數據處理是有必要的, 如 reduceStateByKey,在這種轉換中,生成的 RDD 依賴於先前批次的 RDD,這致使依賴鏈的長度隨時間而增長.爲了不恢復時間的這種無限增長(與依賴關係鏈成比例),有狀態轉換的中間 RDD 會按期 checkpoint 到可靠的存儲(例如 HDFS)以切斷依賴關係鏈.

    同俗來講, 就是爲了防止 在恢復數據時, 須要從第一個RDD開始從新計算狀態,致使計算時間過長, 且保存的依賴鏈太長, 會在中間進行截斷, 保存中間點 RDD的狀態, 這樣在恢復時就無需從最開始進行恢復處理.

使用CheckPoint的時機

checkPoint 並不是老是必要的,當咱們依賴的是可靠數據源,(又或者是丟失部分數據 也無所謂) 而且有本身的方式可以 查找到上次執行的 offset, 則徹底無需checkpoint, 此時只須要自行再度拉取數據, 處理數據便可.

  1. 若是在應用程序中使用 updateStateByKey或 reduceByKeyAndWindow(具備反向功能),則必須提供 checkpoint 目錄以容許按期的 RDD checkpoint.

  2. 從運行應用程序的 driver 的故障中恢復 - 元數據 checkpoint 用於使用進度信息進行恢復.

如何使用checkPoint

首先你須要有一個 容錯的 可靠的 文件系統, 好比 HDFS,S3 去保存你的checkpoint信息.

然後調用 streamingContext.checkpoint(checkpointDirectory), 經過這種方式 就可使用上面提到的 updateStateByKey 或 reduceByKeyAndWindow(具備反向功能) 這類的狀態計算.

另外,若是要使應用程序從 driver 故障中恢復,您應該重寫 streaming:

  • 當程序第一次啓動時,它將建立一個新的 StreamingContext,設置全部流,而後調用 start().
  • 當程序在失敗後從新啓動時,它將從 checkpoint 目錄中的 checkpoint 數據從新建立一個 StreamingContext.

    // Create a factory object that can create and setup a new JavaStreamingContext
      JavaStreamingContextFactory contextFactory = new JavaStreamingContextFactory() {
          @Override public JavaStreamingContext create() {
              JavaStreamingContext jssc = new JavaStreamingContext(...);  // new context
              JavaDStream<String> lines = jssc.socketTextStream(...);     // create DStreams
              ...
              jssc.checkpoint(checkpointDirectory);                       // set checkpoint directory
              return jssc;
          }
      };
    
      //  這行代碼即可以知足上述兩種行爲.
      JavaStreamingContext context = JavaStreamingContext.getOrCreate(checkpointDirectory, contextFactory);
    
      // Do additional setup on context that needs to be done,
      // irrespective of whether it is being started or restarted
      context. ...
    
      // Start the context
      context.start();
      context.awaitTermination();

在上面的方法中, 若是是 第一次啓動, 會調用 contextFactory 建立 Streaming 環境, 同時須要在 factory中建立對應的流, 以及定義流的處理過程. 若是是第二次啓動, 則會經過 checkpoint 恢復程序當時的狀態.

這是故障恢復的一部分, 另外就是咱們須要當 driver 掛掉以後, 讓其可以自動重啓, 這部分會在接下來的部署中講到.

Accumulators,Broadcast 變量,和 Checkpoint

須要注意的是 Accumulators,Broadcast 沒法經過 checkpoint進行恢復, 其惟一處理方式是, 在程序執行時 建立延遲 實例化 的 對象.

class JavaWordBlacklist {

    private static volatile Broadcast<List<String>> instance = null;

    public static Broadcast<List<String>> getInstance(JavaSparkContext jsc) {
        if (instance == null) {
        synchronized (JavaWordBlacklist.class) {
            if (instance == null) {
            List<String> wordBlacklist = Arrays.asList("a", "b", "c");
            instance = jsc.broadcast(wordBlacklist);
            }
        }
        }
        return instance;
    }
}

class JavaDroppedWordsCounter {

    private static volatile LongAccumulator instance = null;

    public static LongAccumulator getInstance(JavaSparkContext jsc) {
        if (instance == null) {
        synchronized (JavaDroppedWordsCounter.class) {
            if (instance == null) {
            instance = jsc.sc().longAccumulator("WordsInBlacklistCounter");
            }
        }
        }
        return instance;
    }
}

wordCounts.foreachRDD((rdd, time) -> {
    // Get or register the blacklist Broadcast
    Broadcast<List<String>> blacklist = JavaWordBlacklist.getInstance(new JavaSparkContext(rdd.context()));
    // Get or register the droppedWordsCounter Accumulator
    LongAccumulator droppedWordsCounter = JavaDroppedWordsCounter.getInstance(new JavaSparkContext(rdd.context()));
    // Use blacklist to drop words and use droppedWordsCounter to count them
    String counts = rdd.filter(wordCount -> {
        if (blacklist.value().contains(wordCount._1())) {
        droppedWordsCounter.add(wordCount._2());
        return false;
        } else {
        return true;
        }
    }).collect().toString();
    String output = "Counts at time " + time + " " + counts;
}

Spark 部署

我的寫的Spark集羣部署相關, 詳細描述了 standalone模式

Spark集羣-Standalone 模式

要運行Spark 應用程序, 須要如下功能:

  1. 集羣管理器, 有這樣幾種

    type mean
    Standalone 一種簡單的 Spark 內置 集羣管理器
    Apache Mesos 經常使用的集羣管理器之一, 能夠運行 Hadoop MapReduce 和 service applications.
    Hadoop YARN Hadoop 2 的資源管理器.
    Kubernetes 簡稱K8S Kubernetes是Google開源的一個容器編排引擎,它支持自動化部署、大規模可伸縮、應用容器化管理
  2. 打包後的應用程序Jar, 要求是一個可以在 Spark環境下直接運行的Jar包,所以須要將引用的各類各樣的其餘jar包, 如kafkaUtils, ZK, Redis 等都打包到一個jar包中, 同時須要在打包時排除 沒必要要的 依賴jar, 如 spark core, scala lib 等,Maven的插件 Shade就能夠很好地實現這一點.

  3. 爲 executor 配置足夠的內存 - 因爲接收到的數據必須存儲在內存中,因此 executor 必須配置足夠的內存來保存接收到的數據.請注意,若是您正在進行10分鐘的窗口操做,系統必須至少保留最近10分鐘的內存中的數據.所以,應用程序的內存要求取決於其中使用的操做.

  4. 配置 checkpoint - 若是 streaming 應用程序須要它,則 Hadoop API 容錯存儲(例如:HDFS,S3等)中的目錄必須配置爲 checkpoint 目錄,而且流程應用程序以 checkpoint 信息的方式編寫Streaming代碼.

  5. 配置應用程序 driver 的自動從新啓動

    • standalone

      能夠提交 Spark 應用程序 driver 以在Spark Standalone集羣中運行即應用程序 driver 自己在其中一個工做節點上運行.此外,能夠指示獨立的羣集管理器來監督 driver,若是因爲 非正常退出(non-zero exit code)而致使 driver 發生故障,或因爲運行 driver 的節點發生故障,則能夠從新啓動它.

      能夠查看: Spark Standalone Mode

    • YARN

      Yarn支持相似的機制實現自啓動,能夠查看yarn相關文檔

    • Mesos Marathon 在 Mesos上實現了這一點.

  6. 配置預寫日誌 - 自 Spark 1.2 以來,咱們引入了寫入日誌來實現強大的容錯保證.

    若是啓用,則從 receiver 接收的全部數據都將寫入配置 checkpoint 目錄中的寫入日誌.這能夠防止 driver 恢復時的數據丟失,從而確保零數據丟失.

    能夠經過將 配置參數 spark.streaming.receiver.writeAheadLog.enable 設置爲 true來啓用此功能.

    然而,這些更強的語義可能以單個 receiver 的接收吞吐量爲代價.經過 並行運行更多的 receiver (會在稍後提到)能夠糾正這一點,以增長總吞吐量.

    另外,建議在啓用寫入日誌時,在日誌已經存儲在複製的存儲系統中時,禁用在 Spark 中接收到的數據的複製.這能夠經過將輸入流的存儲級別設置爲 StorageLevel.MEMORY_AND_DISK_SER 來完成.

    使用 S3(或任何不支持刷新的文件系統)寫入日誌時,請記住啓用 spark.streaming.driver.writeAheadLog.closeFileAfterWrite 和spark.streaming.receiver.writeAheadLog.closeFileAfterWrite.

    有關詳細信息,請參閱官方的 Spark Streaming配置.

    請注意,啓用 I/O 加密時,Spark 不會將寫入寫入日誌的數據加密.若是須要對提早記錄數據進行加密,則應將其存儲在本地支持加密的文件系統中.

  7. 設置最大接收速率 - 若是集羣資源不夠大,須要限制最大接收速率 能夠經過:

    receiver 的 spark.streaming.receiver.maxRate 和用於 Direct Kafka 方法的 spark.streaming.kafka.maxRatePerPartition 的 配置參數.

    而在Spark 1.5中,咱們引入了一個稱爲 backpressure (反壓)的功能,無需設置此速率限制,由於Spark Streaming會自動計算速率限制,並在處理條件發生變化時動態調整速率限制.

    能夠經過將 配置參數 spark.streaming.backpressure.enabled 設置爲 true 來啓用此 backpressure.

升級應用程序代碼

有兩種處理策略:

  1. 新舊代碼並行運行, 直到你認爲能夠的時候, 停掉原有代碼.

  2. 正常中止,在 JavaStreamingContext.stop() 方法中接收兩個參數, 第一個是 是否執行完當前數據, 第二個是是否中止 SparkContext. 在當前任務已經執行完以後, 再中止程序, 新的程序啓動以後, 會從上次的checkpoint去啓動, 而這也正是 checkpoint的缺點, 即 兩次的 代碼已經變動, 兩次的序列化結果, 反序列化並不一致, 所以必須刪除 checkpoint 目錄纔可以正常啓動, 這也就意味着在上次中止是 保存的 application 信息 已經消失.

性能調優

性能調優的核心在於:

  1. 減小數據處理的時間
  2. 設定合理的批處理間隔

當二者基本保持一致, 則就可以快速, 有效地處理數據. 至於如何減小數據處理的時間, 則是仁者見仁智者見智.

基本須要本身的大數據處理經驗, 更優良的算法, 再者就是所使用的 大數據處理框架相結合.

在這裏, 僅僅介紹框架相關的部分, 更多的 則會在 性能調優方面 詳細介紹.

鏈接以下: Spark調優

Level of Parallelism in Data Receiving(數據接收中的並行級別)

經過網絡接收數據(如Kafka,Flume,socket 等)須要 deserialized(反序列化)數據並存儲在 Spark 中.若是數據接收成爲系統的瓶頸,那麼考慮一下 parallelizing the data receiving(並行化數據接收).

注意每一個 input DStream 建立接收 single stream of data(單個數據流)的 single receiver(單個接收器)(在 worker 上運行).所以,能夠經過建立多個 input DStreams 來實現 Receiving multiple data streams(接收多個數據流)並配置它們以從 source(s) 接收 data stream(數據流)的 different partitions(不一樣分區).

例如,接收 two topics of data(兩個數據主題)的單個Kafka input DStream 能夠分爲兩個 Kafka input streams(輸入流),每一個只接收一個 topic(主題).這將運行兩個 receivers(接收器),容許 in parallel(並行)接收數據,從而提升 overall throughput(整體吞吐量).這些 multiple DStreams 能夠 unioned(聯合起來)建立一個 single DStream.而後 transformations(轉化)爲應用於 single input DStream 能夠應用於 unified stream.以下這樣作:

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<>(numStreams);
for (int i = 0; i < numStreams; i++) {
    kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();

與此同時, 須要關注的另外一個參數是: spark.streaming.blockInterval

spark.streaming.blockInterval:

Spark Receiver 接收到的數據 在存入 Spark以前 被進行 分塊操做, 分塊的間隔. 最低推薦是50ms.

每一個 receiver 每批次的任務數量將是大約(batch interval(批間隔)/ block interval(塊間隔)).例如,200 ms的 block interval(塊間隔)每 2 秒 batches(批次)建立 10 個 tasks(任務).若是 tasks(任務)數量太少(即少於每一個機器的內核數量),那麼它將無效,由於全部可用的內核都不會被使用處理數據.要增長 given batch interval(給定批間隔)的 tasks(任務)數量,請減小 block interval(塊間​​隔). 但不該該低於50ms.

使用 多個輸入流/ receivers 接收數據的替代方法是明確 repartition(從新分配)input data stream(輸入數據流)(使用 inputStream.repartition(<number of partitions>)).這會在進一步處理以前將收到的批次數據分發到集羣中指定數量的計算機.

Level of Parallelism in Data Processing(數據處理中的並行度)

若是任意 stage 的並行度設置的不夠, 則會致使 集羣資源 得不到充分利用.

至於並行度的設置

參考:Spark性能調優:合理設置並行度

網上相似的文章數不勝數, 就隨便找了一篇.

核心點在於, 並行度設置太低, 即便分配的CPU再多, 也用不到那麼多資源.

task數量,設置成spark Application 總cpu core數量的2~3倍 ,好比150個cpu core ,基本設置 task數量爲 300~ 500, 與理性狀況不一樣的,有些task 會運行快一點,好比50s 就完了,有些task 可能會慢一點,要一分半才運行完,因此若是你的task數量,恰好設置的跟cpu core 數量相同,可能會致使資源的浪費.

Data Serialization(數據序列化)

能夠經過調優 serialization formats(序列化格式)來減小數據 serialization(序列化)的開銷.在 streaming 的狀況下,有兩種類型的數據被 serialized(序列化).

  • Input data(輸入數據):默認狀況下,經過 Receivers 接收的 input data(輸入數據)經過 StorageLevel.MEMORY_AND_DISK_SER_2 存儲在 executors 的內存中, 數據首先保留在內存中,而且只有在內存不足以容納流計算所需的全部輸入數據時纔會 spilled over(溢出)到磁盤.這個序列化顯然具備開銷 - receiver(接收器)必須使接收的數據 deserialize(反序列化),並使用 Spark 的 serialization format(序列化格式)從新序列化它.

  • Persisted RDDs generated by Streaming Operations(流式操做生成的持久 RDDs):經過 streaming computations(流式計算)生成的 RDD 可能會持久存儲在內存中.例如,window operations(窗口操做)會將數據保留在內存中,由於它們將被處理屢次.可是,與 StorageLevel.MEMORY_ONLY 的 Spark Core 默認狀況不一樣,經過流式計算生成的持久化 RDD 將以 StorageLevel.MEMORY_ONLY_SER(即序列化),以最小化 GC 開銷.

在上述兩種狀況下, 使用kryo均可以有效減小CPU開銷 和 內存開銷.

然而, 序列化自己也是一種開銷, 在須要保存的數據量不大, 內存足夠的狀況下:

能夠將數據做爲 deserialized objects(反序列化對象)持久化,而不會致使過多的 GC 開銷.例如,若是你使用幾秒鐘的 batch intervals(批次間隔)而且沒有 window operations(窗口操做),那麼能夠經過明確地相應地設置 storage level(存儲級別)來嘗試禁用 serialization in persisted data(持久化數據中的序列化).這將減小因爲序列化形成的 CPU 開銷,潛在地提升性能,而不須要太多的 GC 開銷.

Task Launching Overheads(任務啓動開銷)

若是每秒啓動的任務數量很高(好比每秒 50 個或更多),那麼這個開銷向 slaves 發送任務多是重要的,而且將難以實現 sub-second latencies(次要的延遲).能夠經過如下更改減小開銷:

  • Execution mode(執行模式):以 Standalone 模式 或 coarse-grained Mesos 模式運行 Spark 比 fine-grained Mesos 模式更好的任務啓動時間.

這些更改可能會將 批處理時間 縮短 100 毫秒,從而容許 sub-second batch size(次秒批次大小)是可行的.

Setting the Right Batch Interval(設置正確的批次間隔)

毫無疑問,若是想要系統 穩定可持續, 咱們必須保證數據的流入流出速率保持均衡, 也即每批次接收的數據 與數據的 處理速度維持平衡.

而調試速率的辦法就是 增長batchInterval 和 減小 數據的流入速率, 經過 SparkUI Streaming頁簽下中能夠觀測到 處理延時, 也即 total delay列.

保持 total delay 小於等於 batch interval便可.

即便真實環境中, 數據有突發性也無需在意, 只須要保證數據在整個運行期間的速率基本保持均衡便可.

然而須要注意到的一點是, 增大 batchInterval 也意味着 有可能增長了內存開銷.

Memory Tuning(內存調優)

Spark Streaming application 所需的集羣內存量在很大程度上取決於所使用的 transformations 類型.例如,若是要在最近 10 分鐘的數據中使用 window operation(窗口操做),那麼您的集羣應該有足夠的內存來容納內存中 10 分鐘的數據.或者若是要使用大量 keys 的 updateStateByKey,那麼必要的內存將會很高.相反,若是你想作一個簡單的 map-filter-store 操做,那麼所需的內存就會很低.

通常來講, receiver中接收到的數據, 在內存溢出的時候 會序列化 存儲到硬盤中, 這可能會下降 streaming application 的性能,所以建議提供足夠的內存以供使用.最好仔細查看內存使用量並相應地進行估算.

內存調優的另外一個方面是 垃圾收集.對於須要低延遲的 streaming application,由 JVM 垃圾回收引發的大量暫停是不但願的.

就以上幾點來講:

  • DStreams 的持久性級別:如前面在 Data Serialization 部分中所述,input data 和 RDD 默認保持爲 serialized bytes(序列化字節).與 deserialized persistence(反序列化持久性)相比,這減小了內存使用量和 GC 開銷.

    能夠經過啓用 Kryo serialization 進一步減小了序列化大小和內存使用.

    以及能夠經過 compression(壓縮)來實現內存使用的進一步減小(參見Spark配置 spark.rdd.compress),代價是 CPU 時間.

  • Clearing old data(清除舊數據):默認狀況下,DStream 轉換生成的全部 input data 和 persisted RDDs 將自動清除.Spark Streaming 決定什麼時候根據所使用的 transformations 來清除數據.例如,若是您使用 10 分鐘的 window operation(窗口操做),則 Spark Streaming 將保留最近 10 分鐘的數據,並主動丟棄舊數據.數據能夠經過設置 streamingContext.remember 保持更長的持續時間(例如交互式查詢舊數據, 以前提到的跨線程 訪問StreamingData).

  • CMS Garbage Collector(CMS垃圾收集器):強烈建議使用 concurrent mark-and-sweep GC,以保持 GC 相關的暫停始終如一.CMS的優勢就是, 儘可能減小暫停式GC,經過與任務並行執行的方式, 執行GC.即便 concurrent GC 已知能夠減小 系統的總體處理吞吐量,但仍然建議實現更多一致的 batch processing times(批處理時間).確保在 driver( 在 spark-submit 中使用 --driver-java-options)和 executors(使用 Spark configuration spark.executor.extraJavaOptions)中設置 CMS GC.

  • Other tips(其餘提示):爲了進一步下降 GC 開銷,如下是一些更多的提示.

    使用 OFF_HEAP 存儲級別的保持 RDDs,使用更小的 heap sizes 的 executors.這將下降每一個 JVM heap 內的 GC 壓力.

    至於什麼是 OFF_HEAP?

    與ON_HEAP對立, 表示 存儲在 Java堆內存以外的數據.

    也即 將數據存儲在機器內存中.

    參考連接: Spark 內存管理之—OFF_HEAP

小結

總結來講:

  • 每一個DStream 都與 single receiver相關聯.爲了得到讀取並行性,須要建立多個 receivers,即 multiple DStreams.receiver 在一個 executor 中運行.它佔據一個 core(內核).確保在 receiver slots are booked 後有足夠的內核進行處理,即 spark.cores.max 應該考慮 receiver slots.receivers 以循環方式分配給 executors.

  • 當從 stream source 接收到數據時,receiver 建立數據 blocks(塊).每一個 blockInterval(默認200ms) 毫秒生成一個新的數據塊.在 N = batchInterval/blockInterval 的 batchInterval 期間建立 N 個數據塊.這些塊由當前 executor 的 BlockManager 分發給其餘執行程序的 block managers.以後,在驅動程序上運行的 Network Input Tracker(網絡輸入跟蹤器)通知有關進一步處理的塊位置

  • 在驅動程序中爲在 batchInterval 期間建立的塊建立一個 RDD.在 batchInterval 期間生成的塊是 RDD 的 partitions.每一個分區都是一個 spark 中的 task.blockInterval == batchinterval 意味着建立 single partition(單個分區),而且可能在本地進行處理.

  • 除非使用non-local(非本地調度)的方式, 不然這些塊上的map任務都運行在執行器單元上(一個在接收數據塊的位置,另外一個在數據塊被備份到的位置),而不會考慮block interval,而更大的 block interval 意味着更大的塊.

    增大spark.locality.wait 即增長了處理 local node(本地節點)上的塊的可能性.

    所以,須要在這兩個參數之間找到平衡,以確保在本地處理較大的塊.

  • 除了調整block interval 和 batch interval以外, 您能夠經過調用 inputDstream.repartition(n) 來定義分區數.

    這樣能夠隨機從新組合 RDD 中的數據,建立 n 個分區以求更大的並行性.雖然是 shuffle 的代價.

    可是咱們須要注意的是,若是數據自己每一個partition中的數據有較強的關聯性, 使用這種方法須要謹慎.

    另外,須要考慮到的問題是,雖然咱們有了更大的並行度, 但自身的集羣資源是否支持這樣高的並行性?即分配的核心數, executor數量是否足夠?

    RDD 的處理由 driver’s jobscheduler 做爲一項工做安排.在給定的時間點,只有一個 job 是 active 的.所以,若是一個做業正在執行,則其餘做業將排隊.

  • 若是您有兩個 dstream,將會有兩個 RDDs 被建立,而且會建立兩個任務,而後被一個接一個的調度.爲了不這種狀況,你能夠對這兩個DStream執行union操做.這保證了兩個DStream RDD會產生一個unionRDD,這個unionRDD會當作一個單獨的job.但 RDD 的 partitioning(分區)不受影響.

    而Spark Job: 每一個Action算子本質上是執行了sc的runJob方法,這是一個重載方法.核心是交給DAGScheduler中的submitJob執行, 進而建立了不一樣的job.

  • 若是 批處理時間)超過 batchinterval(批次間隔),那麼顯然 receiver 的內存將會開始填滿,最終會拋出 exceptions(最多是 BlockNotFoundException).目前沒有辦法暫停 receiver.使用 SparkConf 配置 spark.streaming.receiver.maxRate,receiver 的 rate 能夠受到限制.

相關文章
相關標籤/搜索