Spark Streaming學習——DStream

其餘更多java基礎文章:
java基礎學習(目錄)java


概述

SparkStreaming是流式處理框架,是Spark API的擴展,支持可擴展、高吞吐量、容錯的實時數據流處理,實時數據的來源能夠是:Kafka, Flume, Twitter, ZeroMQ或者TCP sockets,而且可使用高級功能的複雜算子來處理流數據。例如:map,reduce,join,window 。最終,處理後的數據能夠存放在文件系統,數據庫等,方便實時展示。node

運行原理

Spark Streaming架構

Spark Streaming是將流式計算分解成一系列短小的批處理做業。這裏的批處理引擎是Spark Core,也就是把Spark Streaming的輸入數據按照batch interval(如5秒)分紅一段一段的數據(Discretized Stream),每一段數據都轉換成Spark中的RDD(Resilient Distributed Dataset),而後將Spark Streaming中對DStream的Transformation操做變爲針對Spark中對RDD的Transformation操做,將RDD通過操做變成中間結果保存在內存中。整個流式計算根據業務的需求能夠對中間的結果進行疊加或者存儲到外部設備數據庫

DStream

DStream(Discretized Stream)做爲Spark Streaming的基礎抽象,它表明持續性的數據流。這些數據流既能夠經過外部輸入源賴獲取,也能夠經過現有的Dstream的transformation操做來得到。在內部實現上,DStream由一組時間序列上連續的RDD來表示。每一個RDD都包含了本身特定時間間隔內的數據流。bash

下面是DStream的建立例子:

SparkConf conf = new SparkConf().setMaster("local[4]").setAppName("NetworkWordCount")
            .set("spark.testing.memory","2147480000");
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);
複製代碼

API

transform算子

Transformation 含義
map(func) 對DStream中的各個元素進行func函數操做,而後返回一個新的DStream
flatMap(func) 與map方法相似,只不過各個輸入項能夠被輸出爲零個或多個輸出項
filter(func) 過濾出全部函數func返回值爲true的DStream元素並返回一個新的DStream
repartition(numPartitions) 增長或減小DStream中的分區數,從而改變DStream的並行度
union(otherStream) 將源DStream和輸入參數爲otherDStream的元素合併,並返回一個新的DStream.
count() 經過對DStream中的各個RDD中的元素進行計數,而後返回只有一個元素的RDD構成的DStream
reduce(func) 對源DStream中的各個RDD中的元素利用func進行聚合操做,而後返回只有一個元素的RDD構成的新的DStream.
countByValue() 對於元素類型爲K的DStream,返回一個元素爲(K,Long)鍵值對形式的新的DStream,Long對應的值爲源DStrea
reduceByKey(func, [numTasks]) 利用func函數對源DStream中的key進行聚合操做,而後返回新的(K,V)對構成的DStream
join(otherStream, [numTasks]) 輸入爲(K,V)、(K,W)類型的DStream,返回一個新的(K,(V,W)類型的DStream
cogroup(otherStream, [numTasks]) 輸入爲(K,V)、(K,W)類型的DStream,返回一個新的 (K, Seq[V], Seq[W]) 元組類型的DStream
transform(func) 經過RDD-to-RDD函數做用於DStream中的各個RDD,能夠是任意的RDD操做,從而返回一個新的RDD
updateStateByKey(func) 根據於key的前置狀態和key的新值,對key進行更新,返回一個新狀態的DStream

Windows Operation

總結:

  • batch interval:5s
    每隔5秒切割一次batch,封裝成DStream
  • window length:15s
    進行計算的DStream中包含15s的數據。即3個batch interval
  • sliding interval:10s
    每隔10s取最近3個batch(window length)封裝的DStream,封裝成一個更大的DStream進行計算
/**
* batch interval:5s
* sliding interval:10s
* window length: 60s
* 因此每隔 10s 會取 12 個 rdd,在計算的時候會將這 12 個 rdd 聚合起來
* 而後一塊兒執行 reduceByKeyAndWindow 操做
* reduceByKeyAndWindow 是針對窗口操做的而不是針對 DStream 操做的
*/
JavaPairDStream<String, Integer> searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer,
Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
}, Durations.seconds(60), Durations.seconds(10));
複製代碼

優化Windows Operation

假設 batch=1s, window length=5s, sliding interval=1s, 那麼每一個 DStream 重複計算了 5 次,優化後, (t+4)時刻的 Window 由(t+3)時刻的 Window 和(t+4)時刻的 DStream 組成, 因爲(t+3)時刻的 Window 包含(t-1)時刻的 DStream,而(t+4)時刻的 Window 中不須要包含(t-1) 時刻的 DStream,因此還須要減去(t-1)時刻的 DStream,因此: Window(t+4) = Window(t+3) + DStream(t+4) - DStream(t-1)。 注意,使用此方法必須設置checkpoint目錄,用來保存Window(t+3)的數據

//必須設置 checkpoint 目錄
jssc.checkpoint("hdfs://node01:8020/spark/checkpoint");
JavaPairDStream<String, Integer> searchWordCountsDStream =
searchWordPairDStream.reduceByKeyAndWindow(new Function2<Integer,
Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}
},new Function2<Integer, Integer, Integer>() {
private static final long serialVersionUID = 1L;
@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 - v2;
}
}, Durations.seconds(60), Durations.seconds(10));
複製代碼

Driver HA

提交任務時設置

spark-submit –supervise
複製代碼

以集羣方式提交到 yarn 上時, Driver 掛掉會自動重啓,不須要任何設置
提交任務,在客戶端啓動 Driver,那麼不論是提交到 standalone 仍是 yarn, Driver 掛掉後 都沒法重啓網絡

代碼中配置

上面的方式從新啓動的 Driver 須要從新讀取 application 的信息而後進行任務調度,實 際需求是,新啓動的 Driver 能夠直接恢復到上一個 Driver 的狀態(能夠直接讀取上一個 StreamingContext 的 DSstream 操做邏輯和 job 執行進度,因此須要把上一個 StreamingContext 的元數據保存到 HDFS 上) ,直接進行任務調度,這就須要在代碼層面進 行配置。架構

public class SparkStreamingOnHDFS {
    public static void main(String[] args) {
        final SparkConf conf = new SparkConf()
            .setMaster("local[1]")
            .setAppName("SparkStreamingOnHDFS");
    //這裏能夠設置一個線程,由於不須要一個專門接收數據的線程,而是監控一個目錄
        final String checkpointDirectory = "hdfs://node01:9000/spark/checkpoint";
        JavaStreamingContextFactory factory = new JavaStreamingContextFactory() {
            @Override
            public JavaStreamingContext create() {
                return createContext(checkpointDirectory,conf);
            }
        };
        JavaStreamingContext jsc = JavaStreamingContext.getOrCreate(checkpointDirectory, factory);
        jsc.start();
        jsc.awaitTermination();
        // jsc.close();
    }
    @SuppressWarnings("deprecation")
    private static JavaStreamingContext createContext(String checkpointDirectory,SparkConf conf) {
        System.out.println("Creating new context");
        SparkConf sparkConf = conf;
        //每隔 15s 查看一下監控的目錄中是否新增了文件
        JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, Durations.seconds(15));
        ssc.checkpoint(checkpointDirectory);
        /**
        * 只是監控文件夾下新增的文件,減小的文件是監控不到的,
        文件內容有改動也是監控不到
        */
        JavaDStream<String> lines = ssc.textFileStream("hdfs://node01:8020/spark");
        /**
        * 接下來能夠寫業務邏輯,好比 wordcount
        */
        return ssc;
    }
}
複製代碼

執行一次程序後, JavaStreamingContext 會在 checkpointDirectory 中保存,當修 改了業務邏輯後,再次運行程序, JavaStreamingContext.getOrCreate(checkpointDirectory, factory); 由於 checkpointDirectory 中有這個 application 的 JavaStreamingContext,因此不會 調用 JavaStreamingContextFactory 來建立 JavaStreamingContext,而是直接 checkpointDirectory 中的 JavaStreamingContext,因此即便業務邏輯改變了,執行的效 果也是以前的業務邏輯, 若是須要執行修改過的業務邏輯,能夠修改或刪除 checkpointDirectory併發

與Kafka的兩種鏈接方式

Receiver模式

獲取 kafka 傳遞的數據來計算:

SparkConf conf = new SparkConf()
    .setAppName("SparkStreamingOnKafkaReceiver")
    .setMaster("local[2]")
    .set("spark.streaming.receiver.writeAheadLog.enable","true");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(5));
//設置持久化數據的目錄
jsc.checkpoint("hdfs://node01:8020/spark/checkpoint");
Map<String, Integer> topicConsumerConcurrency = new HashMap<String,Integer>();
//topic 名 receiver task 數量
topicConsumerConcurrency.put("test_create_topic", 1);
JavaPairReceiverInputDStream<String,String> lines =
KafkaUtils.createStream(
    jsc,
    "node02:2181,node03:2181,node04:2181",
    "MyFirstConsumerGroup",
    topicConsumerConcurrency,
    StorageLevel.MEMORY_AND_DISK_SER());
/*
* 第一個參數是 StreamingContext
* 第二個參數是 ZooKeeper 集羣信息(接受 Kafka 數據的時候會從 Zookeeper 中得到
Offset 等元數據信息)
* 第三個參數是 Consumer Group
* 第四個參數是消費的 Topic 以及併發讀取 Topic 中 Partition 的線程數
* 第五個參數是持久化數據的級別,能夠自定義
*/
//對 lines 進行其餘操做……
複製代碼

kafka 客戶端生產數據的代碼:app

public class SparkStreamingDataManuallyProducerForKafka extends Thread {
    private String topic; //發送給 Kafka 的數據的類別
    private Producer<Integer, String> producerForKafka;
    public SparkStreamingDataManuallyProducerForKafka(String topic){
        this.topic = topic;
        Properties conf = new Properties();
        conf.put("metadata.broker.list","node01:9092,node02:9092,node03:9092");
        conf.put("serializer.class", StringEncoder.class.getName());
        producerForKafka = new Producer<Integer, String>(new ProducerConfig(conf)) ;
    }
    @Override
    public void run() {
        while(true){
            counter ++;
            String userLog = createUserLog();
            //生產數據這個方法能夠根據實際需求本身編寫
            producerForKafka.send(new KeyedMessage<Integer, String>(topic, userLog));
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
    public static void main(String[] args) {
        new SparkStreamingDataManuallyProducerForKafka("test_create_topic").start();
        //test_create_topic 是 topic 名
    }
}
複製代碼

Direct 方式

把 kafka 看成一個存儲系統,直接從 kafka 中讀數據, SparkStreaming 本身維護消費者 的消費偏移量框架

SparkConf conf = new SparkConf()
    .setAppName("SparkStreamingOnKafkaDirected")
    .setMaster("local[1]");
JavaStreamingContext jsc = new JavaStreamingContext(conf,Durations.seconds(10));
Map<String, String> kafkaParameters = new HashMap<String, String>();
kafkaParameters.put("metadata.broker.list","node01:9092,node02:9092,node03:9092");
HashSet<String> topics = new HashSet<String>();
topics.add("test_create_topic");
JavaPairInputDStream<String,String> lines =KafkaUtils.createDirectStream(jsc,
    String.class,
    String.class,
    StringDecoder.class,
    StringDecoder.class,
    kafkaParameters,
    topics);
//對 lines 進行其餘操做……
複製代碼

Direct方式優劣

在實際的應用中,Direct Approach方式很好地知足了咱們的須要,與Receiver-based方式相比,有如下幾方面的優點:異步

  1. 下降資源。Direct不須要Receivers,其申請的Executors所有參與到計算任務中;而Receiver-based則須要專門的Receivers來讀取Kafka數據且不參與計算。所以相同的資源申請,Direct 可以支持更大的業務。
  2. 下降內存。Receiver-based的Receiver與其餘Exectuor是異步的,並持續不斷接收數據,對於小業務量的場景還好,若是遇到大業務量時,須要提升Receiver的內存,可是參與計算的Executor並沒有需那麼多的內存。而Direct 由於沒有Receiver,而是在計算時讀取數據,而後直接計算,因此對內存的要求很低。實際應用中咱們能夠把原先的10G降至如今的2-4G左右。
  3. 魯棒性更好。Receiver-based方法須要Receivers來異步持續不斷的讀取數據,所以遇到網絡、存儲負載等因素,致使實時任務出現堆積,但Receivers卻還在持續讀取數據,此種狀況很容易致使計算崩潰。Direct 則沒有這種顧慮,其Driver在觸發batch 計算任務時,纔會讀取數據並計算。隊列出現堆積並不會引發程序的失敗。

可是也存在一些不足,具體以下:

  1. 提升成本。Direct須要用戶採用checkpoint或者第三方存儲來維護offsets,而不像Receiver-based那樣,經過ZooKeeper來維護Offsets,此提升了用戶的開發成本。
  2. 監控可視化。Receiver-based方式指定topic指定consumer的消費狀況均能經過ZooKeeper來監控,而Direct則沒有這種便利,若是作到監控並可視化,則須要投入人力開發。 接!

兩種方式下提升 SparkStreaming 並行度的方法

Receiver 方式調整 SparkStreaming 的並行度的方法:

  • 假設 batch interval 爲 5s, Receiver Task 會每隔 200ms(spark.streaming.blockInterval 默 認)將接收來的數據封裝到一個 block 中,那麼每一個 batch 中包括 25 個 block, batch 會被封 裝到 RDD 中,因此 RDD 中會包含 25 個 partition,因此提升接收數據時的並行度的方法 是:調低 spark.streaming.blockInterval 的值,建議不低於 50ms

其餘配置:

  • spark.streaming.backpressure.enable 默認 false, 設置爲 true 後, sparkstreaming 會根 據上一個 batch 的接收數據的狀況來動態的調整本次接收數據的速度,可是最大速度不能 超過 spark.streaming.receiver.maxRate 設置的值(設置爲 n,那麼速率不能超過 n/s)
  • spark.streaming.receiver.writeAheadLog.enable 默認 false 是否開啓 WAL 機制 Direct 方式並行度的設置:
  • 第一個 DStream 的分區數是由讀取的 topic 的分區數決定的,能夠經過增長 topic 的 partition 數來提升 SparkStreaming 的並行度

優化

1. 並行化數據接收:處理多個topic的數據時比較有效

int numStreams = 5;
List<JavaPairDStream<String, String>> kafkaStreams = new ArrayList<JavaPairDStream<String, String>>(numStreams);
for (int i = 0; i < numStreams; i++) {
  kafkaStreams.add(KafkaUtils.createStream(...));
}
JavaPairDStream<String, String> unifiedStream = streamingContext.union(kafkaStreams.get(0), kafkaStreams.subList(1, kafkaStreams.size()));
unifiedStream.print();
複製代碼

2. spark.streaming.blockInterval:增長block數量,增長每一個batch rdd的partition數量,增長處理並行度

receiver從數據源源源不斷地獲取到數據;首先是會按照block interval,將指定時間間隔的數據,收集爲一個block;默認時間是200ms,官方推薦不要小於50ms;接着呢,會將指定batch interval時間間隔內的block,合併爲一個batch;建立爲一個rdd,而後啓動一個job,去處理這個batch rdd中的數據

batch rdd,它的partition數量是多少呢?一個batch有多少個block,就有多少個partition;就意味着並行度是多少;就意味着每一個batch rdd有多少個task會並行計算和處理。

固然是但願能夠比默認的task數量和並行度再多一些了;能夠手動調節block interval;減小block interval;每一個batch能夠包含更多的block;有更多的partition;也就有更多的task並行處理每一個batch rdd。

3. inputStream.repartition():重分區,增長每一個batch rdd的partition數量

有些時候,但願對某些dstream中的rdd進行定製化的分區 對dstream中的rdd進行重分區,去重分區成指定數量的分區,這樣也能夠提升指定dstream的rdd的計算並行度

4. 調節並行度

spark.default.parallelism  
reduceByKey(numPartitions)
複製代碼

5. 使用Kryo序列化機制:

spark streaming,也是有很多序列化的場景的 提升序列化task發送到executor上執行的性能,若是task不少的時候,task序列化和反序列化的性能開銷也比較可觀 默認輸入數據的存儲級別是StorageLevel.MEMORY_AND_DISK_SER_2,receiver接收到數據,默認就會進行持久化操做;首先序列化數據,存儲到內存中;若是內存資源不夠大,那麼就寫入磁盤;並且,還會寫一份冗餘副本到其餘executor的block manager中,進行數據冗餘。

6. batch interval:每一個的處理時間必須小於batch interval

實際上你的spark streaming跑起來之後,其實都是能夠在spark ui上觀察它的運行狀況的;能夠看到batch的處理時間; 若是發現batch的處理時間大於batch interval,就必須調節batch interval 儘可能不要讓batch處理時間大於batch interval 好比你的batch每隔5秒生成一次;你的batch處理時間要達到6秒;就會出現,batch在你的內存中日積月累,一直囤積着,無法及時計算掉,釋放內存空間;並且對內存空間的佔用愈來愈大,那麼此時會致使內存空間快速消耗

若是發現batch處理時間比batch interval要大,就儘可能將batch interval調節大一些

相關文章
相關標籤/搜索