Spark Streaming編程指南

  本文基於Spark Streaming Programming Guide原文翻譯, 加上一些本身的理解和小實驗的結果。
  php

1、概述

  Spark Streaming是基於Core Spark API的可擴展,高吞吐量,並具備容錯能力的用於處理實時數據流的一個組件。Spark Streaming能夠接收各類數據源傳遞來的數據,好比Kafka, Flume, Kinesis或者TCP等,對接收到的數據還可使用一些用高階函數(好比map, reduce, joinwindow)進行封裝的複雜算法作進一步的處理。最後,處理好的數據能夠寫入到文件系統,數據庫,或者直接用於實時展現。除此以外,還能夠在數據流上應用一些機器學習或者圖計算等算法。
  這裏寫圖片描述html

  上圖展現了Spark Streaming的總體數據流轉狀況。在Spark Streaming中的處理過程能夠參考下圖,Spark Streaming接收實時數據,而後把這些數據分割成一個個batch,而後經過Spark Engine分別處理每個batch並輸出。
  這裏寫圖片描述java

  Spark Streaming中一個最重要的概念是DStream,即離散化數據流(discretized stream),DStream由一系列連續的數據集組成。DStream的建立有兩種辦法,一種是從數據源接收數據生成初始DStream,另外一種是由DStream A經過轉換生成DStream B。一個DStream實質上是由一系列的RDDs組成。
  本文介紹瞭如何基於DStream寫出Spark Streaming程序。Spark Streaming提供了Scala, Java以及Python接口,在官方文檔中對這三種語言都有示例程序的實現,在這裏只分析Scala寫的程序。python

2、示例程序

  在深刻分析Spark Streaming的特性和原理以前,以寫一個簡單的Spark Streaming程序並運行起來爲入口先了解一些相關的基礎知識。這個示例程序從TCP socket中接收數據,進行Word Count操做。git

一、Streaming程序編寫

  首先須要導入Spark Streaming相關的類,其中StreamingContext是全部Streaming程序的主要入口。接下來的代碼中建立一個local StreamingContext,batch時間爲1秒,execution線程數爲2。github

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// 建立一個local StreamingContext batch時間爲1秒,execution線程數爲2
// master的線程數數最少爲2,後面會詳細解釋

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, econds(1))

使用上面這個ssc對象,就能夠建立一個lines變量用來表示從TCP接收的數據流了,指定機器名爲localhost端口號爲9999web

// 建立一個鏈接到hostname:port的DStream, 下面代碼中使用的是localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)

lines中的每一條記錄都是TCP中的一行文本信息。接下來,使用空格將每一行語句進行分割。算法

// 將每一行分割成單詞
val words = lines.flatMap(_.split(" "))

上面使用的flatMap操做是一個一對多的DStream操做,在這裏表示的是每輸入一行記錄,會根據空格生成多個單詞,這些單詞造成一個新的DStream words。接下來統計單詞個數。sql

import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// 統計每一個batch中的不一樣單詞個數
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// 打印出其中前10個單詞出現的次數
wordCounts.print()

  上面代碼中,將每個單詞使用map方法映射成(word, 1)的形式,即paris變量。而後調用reduceByKey方法,將相同單詞出現的次數進行疊加,最終打印出統計的結果。數據庫

  寫完上面的代碼,Spark Streaming程序尚未運行起來,須要寫入如下兩行代碼使Spark Streaming程序可以真正的開始執行。

ssc.start()            // 開始計算
ssc.awaitTermination()  // 等待計算結束

二、TCP發送數據並運行Spark Streaming程序

(1)運行Netcat
  使用如下命令啓動一個Netcat

nc -lk 9999

  接下來就能夠在命令行中輸入任意語句了。

(2)運行Spark Streaming程序

./bin/run-example streaming.NetworkWordCount localhost 9999

  程序運行起來後Netcat中輸入的任何語句,都會被統計每一個單詞出現的次數,例如
  這裏寫圖片描述

3、基本概念

  這一部分詳細介紹Spark Streaming中的基本概念。

一、依賴配置

  Spark Streaming相關jar包的依賴也可使用Maven來管理,寫一個Spark Streaming程序的時候,須要將下面的內容寫入到Maven項目中

<dependency>
    <groupId>org.apache.spark</groupId>
    <artifactId>spark-streaming_2.11</artifactId>
    <version>2.0.0</version>
</dependency>

  對於從Kafka,Flume,Kinesis這些數據源接收數據的狀況,Spark Streaming core API中不提供這些類和接口,須要添加下面這些依賴。
  

Source Artifact
Kafka spark-streaming-kafka-0-8_2.11
Flume spark-streaming-flume_2.11
Kinesis spark-streaming-kinesis-asl_2.11 [Amazon Software License]

二、初始化StreamingContext

  Spark Streaming程序的主要入口是一個StreamingContext對象,在程序的開始,須要初始化該對象,代碼以下

import org.apache.spark._
import org.apache.spark.streaming._

val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))

  其中的參數appName是當前應用的名稱,能夠在Cluster UI上進行顯示。master是Spark的運行模式,能夠參考 Spark, Mesos or YARN cluster URL,或者設置成local[*]的形式在本地模式下運行。在生產環境中運行Streaming應用時,通常不會將master參數寫死在代碼中,而是在使用spark-submit命令提交時動態傳入--master參數,具體能夠參考 launch the application with spark-submit
  至於batch時間間隔的設置,須要綜合考慮程序的性能要求以及集羣可提供的資源狀況。

  也能夠基於SparkContext對象,生成一個StreamingContext對象,使用以下代碼

import org.apache.spark.streaming._

val sc = ...                // 已有的SparkContext對象
val ssc = new StreamingContext(sc, Seconds(1))

  當context初始化後,還須要作的工做有:

  1. 根據數據源類型生成輸入DStreams
  2. 經過調用transformation以及輸出操做處理輸入的DStreams
  3. 使用代碼streamingContext.start()啓動程序,開始接收並處理數據
  4. 使用代碼streamingContext.awaitTermination()等待程序運行終止(包括手動中止,或者遇到Error後退出應用)
  5. 可使用streamingContext.stop()手動中止應用

須要注意的點:

  • 當一個context開始運行後,不能再往其中添加新的計算邏輯
  • 當一個context被中止後,不能restart
  • 在一個JVM中只能同時有一個StreamingContext對象處於運行狀態
  • StreamingContext中的stop()方法一樣會終止SparkContext。若是隻須要中止StreamingContext,將stop()方法的可選參數設置成false,避免SparkContext被終止
  • 一個SparkContext對象,能夠用於構造多個StreamingContext對象,只要在新的StreamingContext對象被建立前,舊的StreamingContext對象被中止便可。

三、離散化數據流(Discretized Streams, DStreams)

  DStream是Spark Streaming中最基本最重要的一個抽象概念。DStream由一系列的數據組成,這些數據既能夠是從數據源接收到的數據,也能夠是從數據源接收到的數據通過transform操做轉換後的數據。從本質上來講一個DStream是由一系列連續的RDDs組成,DStream中的每個RDD包含了一個batch的數據。
  這裏寫圖片描述

  DStream上的每個操做,最終都反應到了底層的RDDs上。好比,在前面那個Word Count代碼中將lines轉化成words的邏輯,lines上的flatMap操做就如下圖中所示的形式,做用到了每個底層的RDD上。
  這裏寫圖片描述

  這些底層RDDs上的轉換操做會有Spark Engine進行計算。對於開發者來講,DStream提供了一個更方便使用的高階API,從而開發者無需過多的關注每個轉換操做的細節。
  DStream上能夠執行的操做後續文章中會有進一步的介紹。

四、輸入和接收DStream

  
(1)基本數據源
  在前面Word Count的示例程序中,已經使用到了ssc.socketTextStream(...),這個會根據TCP socket中接收到的數據建立一個DStream。除了sockets以外,StreamingContext API還支持以文件爲數據源生成DStream

  • 文件數據源:若是須要從文件系統,好比HDFS,S3,NFS等中接收數據,可使用如下代碼
streamingContext.fileStream[KeyClass, ValueClass, InputFormatClass](dataDirectory)

Spark Streaming程序會監控用戶輸入的dataDirectory路徑,接收並處理該路徑中的全部文件,不過不支持子文件夾中的文件。
須要注意的地方有:
a、全部的文件數據格式必須相同
b、該路徑下的文件應該是原子性的移動到該路徑,或者重命名到該路徑
c、文件進入該路徑後不可再發生變化,因此這種數據源不支持數據連續寫入的形式
  對於簡單的text文件,有一個簡單的StreamingContext.textFileStream(dataDirectory)方法來進行處理。而且文件數據源的形式不須要運行一個receiver進程,因此對Execution的核數沒有要求。

  • 基於自定義Receiver的數據源:DStream也支持從用戶自定義Receivers中讀取數據。
  • RDDs序列數據源:使用streamingContext.queueStream(queueOfRDDs),能夠將一系列的RDDs轉化成一個DStream。該queue中的每個RDD會被當作DStream中的一個batcn,而後以Streaming的形式處理這些數據。

(2)高階數據源
  
(3)自定義數據源
  除了上面兩類數據源以外,也能夠支持自定義數據源。自定義數據源時,須要實現一個能夠從自定義數據源接收數據併發送到Spark中的用戶自定義receiver。具體能夠參考 Custom Receiver Guide

(4)數據接收的可靠性

五、DStreams上的Transformations

  相似於RDDs,transformations可使輸入DStream中的數據內容根據特定邏輯發生轉換。DStreams上支持不少RDDs上相同的一些transformations
  具體含義和使用方法可參考另外一篇博客:Spark Streaming中的操做函數分析

  在上面這些transformations中,有一些須要進行進一步的分析
(1)UpdateStateByKey操做

(2)Transform操做
  transform操做及其相似的一些transformwith操做,可使DStream中的元素可以調用任意的RDD-to-RDD的操做。可使DStream調用一些只有RDD纔有而DStream API沒有提供的算子。例如,DStream API就不支持一個data DStream中的每個batch數據能夠直接和另外的一個數據集作join操做,可是使用transform就能夠實現這一功能。這個操做能夠說進一步豐富了DStream的操做功能。
  再列舉一個這個操做的使用場景,將某處計算到的重複信息與實時數據流中的記錄進行join,而後進行filter操做,能夠當作一種數據清理的方法。

val spamInfoRDD = ssc.sparkContext.newAPIHadoopRDD(...) // 一個包含重複信息的RDD

val cleanedDStream = wordCounts.transform(rdd => {
  rdd.join(spamInfoRDD).filter(...) // 將重複信息與實時數據作join,而後根據指定規則filter,用於數據清洗
  ...})

  這裏須要注意的是,transform傳入的方法是被每個batch調用的。這樣能夠支持在RDD上作一些時變的操做,即RDD,分區數以及廣播變量能夠在不一樣的batch之間發生變化。

(3)Window操做
  Spark Streaming提供一類基於窗口的操做,這類操做能夠在一個滑動窗口中的數據集上進行一些transformations操做。下圖展現了窗口操做的示例
  這裏寫圖片描述

  上圖中,窗口在一個DStream源上滑動,DStream源中暴露在該窗口中的RDDs可讓這個窗口進行相關的一些操做。在上圖中能夠看到,該窗口中任一時刻都只能看到3個RDD,而且這個窗口每2秒中往前滑動一次。這裏提到的兩個參數,正好是任意一個窗口操做都必須指定的。

  • 窗口長度:例如上圖中,窗口長度爲3
  • 滑動間隔:指窗口多長時間往前滑動一次,上圖中爲2。

      須要注意的一點是,上面這兩個參數,必須是batch時間的整數倍,上圖中的batch時間爲1。

      接下來展現一個簡單的窗口操做示例。好比說,在前面那個word count示例程序的基礎上,我但願每隔10秒鐘就統計一下當前30秒時間內的每一個單詞出現的次數。這一功能的簡單描述是,在paris DStream的當前30秒的數據集上,調用reduceByKey操做進行統計。爲了實現這一功能,可使用窗口操做reduceByKeyAndWindow

val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))

  更多的窗口操做能夠參考:Spark Streaming中的操做函數分析
  

六、DStreams上的輸出操做

  DStream上的輸出操做,可使DStream中的數據發送到外部系統,好比數據庫或文件系統中。DStream只有通過輸出操做,其中的數據才能被外部系統使用。而且下面這些輸出操做才真正的觸發DStream對象上調用的transformations操做。這一點相似於RDDs上的Actions算子。
  輸出操做的使用和功能請參考:Spark Streaming中的操做函數分析

  下面主要進一步分析foreachRDD操做往外部數據庫寫入數據的一些注意事項。
  
  dstream.foreachRDD是DStream輸出操做中最經常使用也最重要的一個操做。關於這個操做如何正確高效的使用,下面會列舉出一些使用方法和案例,能夠幫助讀者在使用過程當中避免踩到一些坑。
  一般狀況下,若是想把數據寫入到某個外部系統中時,須要爲之建立一個鏈接對象(好比提供一個TCP鏈接工具用於鏈接遠程服務器),使用這個鏈接工具才能將數據發送到遠程系統。在Spark Streaming中,開發者極可能會在Driver端建立這個對象,而後又去Worker端使用這個對象處理記錄。好比下面這個例子

dstream.foreachRDD { rdd =>
  val connection = createNewConnection()  // 在driver端執行
  rdd.foreach { record =>
    connection.send(record) // 在wroker端執行
  }}

  上面這個使用方法實際上是錯誤的,當在driver端建立這個鏈接對象後,須要將這個鏈接對象序列化併發送到wroker端。一般狀況下,鏈接對象都是不可傳輸的,即wroker端沒法獲取該鏈接對象,固然也就沒法將記錄經過這個鏈接對象發送出去了。這種狀況下,應用系統的報錯提示多是序列化錯誤(鏈接對象沒法序列化),或者初始化錯誤(鏈接對象須要在wroker端完成初始化),等等。
  正確的作法是在worker端建立這個鏈接對象。
  可是,即便是在worker建立這個對象,又可能會犯如下錯誤。

dstream.foreachRDD { rdd =>
  rdd.foreach { record =>
    val connection = createNewConnection()
    connection.send(record)
    connection.close()
  }}

  上面代碼會爲每一條記錄建立一個鏈接對象,致使鏈接對象太多。 鏈接對象的建立個數會受到時間和系統資源狀況的限制,所以爲每一條記錄都建立一個鏈接對象會致使系統出現沒必要要的高負載,進一步致使系統吞吐量下降。
  一個好的辦法是使用rdd.foreachPartition操做,而後爲RDD的每個partition,使一個partition中的記錄使用同一個鏈接對象。以下面代碼所示

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    val connection = createNewConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    connection.close()
  }}

  
  最後,能夠經過使用鏈接對象池進一步對上面的代碼進行優化。使用鏈接對象池能夠進一步提升鏈接對象的使用效率,使得多個RDDs/batches之間能夠重複使用鏈接對象。

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // 鏈接對象池是靜態的,而且創建對象只有在真正使用時才被建立
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // 使用完以後,將鏈接對象歸還到池中以便下一次使用
  }}

  須要注意的是,鏈接對象池中的對象最好設置成懶生成模式,即在真正使用時纔去建立鏈接對象,而且給鏈接對象設置一個生命週期,必定時間不使用則註銷該鏈接對象。

總結一下關鍵點:

  • DStreamstransformations操做是由輸出操做觸發的,相似於RDDs中的actions操做。上面列舉出某些DStream的輸出操做中能夠將其中的元素轉化成RDD,進而能夠調用RDD提供的一些API操做,這時若是對RDD調用actions操做會當即強制對接收到的數據進行處理。所以,若是用戶應用程序中DStream不須要任何的輸出操做,或者僅僅對DStream使用一些相似於dstream.foreachRDD操做可是在這個操做中不調用任何的RDD action操做時,程序是不會進行任何實際運算的。系統只會簡單的接收數據,任何丟棄數據。
  • 默認狀況下,輸出操做是順序執行的。

七、累加器和廣播變量

  Spark Streaming的累加器和廣播變量沒法從checkpoint恢復。若是在應用中既使用到checkpoint又使用了累加器和廣播變量的話,最好對累加器和廣播變量作懶實例化操做,這樣纔可使累加器和廣播變量在driver失敗重啓時可以從新實例化。參考下面這段代碼

object WordBlacklist {

  @volatile private var instance: Broadcast[Seq[String]] = null

  def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          val wordBlacklist = Seq("a", "b", "c")
          instance = sc.broadcast(wordBlacklist)
        }
      }
    }
    instance
  }}

object DroppedWordsCounter {

  @volatile private var instance: Accumulator[Long] = null

  def getInstance(sc: SparkContext): Accumulator[Long] = {
    if (instance == null) {
      synchronized {
        if (instance == null) {
          instance = sc.accumulator(0L, "WordsInBlacklistCounter")
        }
      }
    }
    instance
  }}

wordCounts.foreachRDD((rdd: RDD[(String, Int)], time: Time) => {
  // Get or register the blacklist Broadcast
  val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
  // Get or register the droppedWordsCounter Accumulator
  val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
  // Use blacklist to drop words and use droppedWordsCounter to count them
  val counts = rdd.filter { case (word, count) =>
    if (blacklist.value.contains(word)) {
      droppedWordsCounter += count
      false
    } else {
      true
    }
  }.collect()
  val output = "Counts at time " + time + " " + counts})

  查看完整代碼請移步 source code

八、DataFrame和SQL操做

  在streaming數據上也能夠很方便的使用到DataFrames和SQL操做。爲了支持這種操做,須要用StreamingContext對象使用的SparkContext對象初始化一個SQLContext對象出來,SQLContext對象設置成一個懶初始化的單例對象。下面代碼對前面的Word Count進行一些修改,經過使用DataFramesSQL來實現Word Count的功能。每個RDD都被轉化成一個DataFrame對象,而後註冊成一個臨時表,最後就能夠在這個臨時表上進行SQL查詢了。

val words: DStream[String] = ...

words.foreachRDD { rdd =>

  // 獲取單例SQLContext對象
  val sqlContext = SQLContext.getOrCreate(rdd.sparkContext)
  import sqlContext.implicits._

  // 將RDD[String]轉化成DataFrame
  val wordsDataFrame = rdd.toDF("word")

  // 註冊表
  wordsDataFrame.registerTempTable("words")

  // 在該臨時表上執行sql語句操做
  val wordCountsDataFrame =
    sqlContext.sql("select word, count(*) as total from words group by word")
  wordCountsDataFrame.show()}

  查看完整代碼請移步 source code.
  也能夠在另外一線程獲取到的Streaming數據上進行SQL操做(這裏涉及到異步運行StreamingContext)。StreamingContext對象沒法感知到異步SQL查詢的存在,所以有StreamingContext對象有可能在SQL查詢完成以前把歷史數據刪除掉。爲了保證StreamingContext不刪除須要用到的歷史數據,須要告訴StreamingContext保留必定量的歷史數據。例如,若是你想在某一個batch的數據上執行SQL查詢操做,可是你這個SQL須要執行5分鐘的時間,那麼,須要執行streamingContext.remember(Minutes(5))語句告訴StreamingContext將歷史數據保留5分鐘。
  有關DataFrames的更多介紹,參考另外一篇博客:Spark-SQL之DataFrame操做大全

九、MLlib操做

十、緩存和持久化

  相似於RDDsDStreams也容許開發者將stream中的數據持久化到內存中。在DStream對象上使用persist()方法會將DStream對象中的每個RDD自動持久化到內存中。這個功能在某個DStream的數據須要進行屢次計算時特別有用。對於窗口操做好比reduceByWindow,以及涉及到狀態的操做好比updateStateByKey,默認會對DStream對象執行持久化。所以,程序在運行時會自動將窗口操做和涉及到狀態的這些操做生成的DStream對象持久化到內存中,不須要開發者顯示的執行persist()操做。
  對那些經過網絡接收到的streams數據(好比Kafka, Flume, Socket等),默認的持久化等級是將數據持久化到兩個節點上,以保證其容錯能力。
  注意,不一樣於RDDs,默認狀況下DStream的持久化等級是將數據序列化保存在內存中。這一特性會在後面的性能調優中進一步分析。有關持久化級別的介紹,能夠參考rdd-persistence

十一、檢查點

  當Streaming應用運行起來時,基本上須要7 * 24的處於運行狀態,因此須要有必定的容錯能力。檢查點的設置就是可以支持Streaming應用程序快速的從失敗狀態進行恢復的。檢查點保存的數據主要有兩種:  

1 . 元數據(Metadata)檢查點:保存Streaming應用程序的定義信息。主要用於恢復運行Streaming應用程序的driver節點上的應用。元數據包括:
  a、配置信息:建立Streaming應用程序的配置信息
  b、DStream操做:在DStream上進行的一系列操做方法
  c、未處理的batch:記錄進入等待隊列可是還未處理完成的批次

2 . 數據(Data)檢查點:將計算獲得的RDD保存起來。在一些跨批次計算並保存狀態的操做時,必須設置檢查點。由於在這些操做中基於其餘批次數據計算獲得的RDDs,隨着時間的推移,計算鏈路會愈來愈長,若是發生錯誤重算的代價會特別高。

  元數據檢查點信息主要用於恢復driver端的失敗,數據檢查點主要用於計算的恢復。

(1)何時須要使用檢查點

  當應用程序出現如下兩種狀況時,須要配置檢查點。
  
- 使用到狀態相關的操做算子-好比updateStateByKey或者reduceByKeyAndWindow等,這種狀況下必須爲應用程序設置檢查點,用於按期的對RDD進行檢查點設置。
- Driver端應用程序恢復-當應用程序從失敗狀態恢復時,須要從檢查點中讀取相關元數據信息。

(2)檢查點設置

  通常是在具備容錯能力,高可靠的文件系統上(好比HDFS, S3等)設置一個檢查點路徑,用於保存檢查點數據。設置檢查點能夠在應用程序中使用streamingContext.checkpoint(checkpointDirectory)來指定路徑。
  若是想要應用程序在失敗重啓時使用到檢查點存儲的元數據信息,須要應用程序具備如下兩個特性,須要使用StreamingContext.getOrCreate代碼在失敗時從新建立StreamingContext對象:

  • 當應用程序是第一次運行時,建立一個新的StreamingContext對象,而後開始執行程序處理DStream。
  • 當應用程序失敗重啓時,能夠從設置的檢查點路徑獲取元數據信息,建立一個StreamingContext對象,並恢復到失敗前的狀態。

      下面用Scala代碼實現上面的要求。

def functionToCreateContext(): StreamingContext = {
    val ssc = new StreamingContext(...)   // 建立一個新的StreamingContext對象
    val lines = ssc.socketTextStream(...) // 獲得DStreams
    ...
    ssc.checkpoint(checkpointDirectory)   // 設置checkpoint路徑
    ssc
}

// 用checkpoint元數據建立StreamingContext對象或根據上面的函數建立新的對象
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

// 設置context的其餘參數
context. ...

// 啓動context
context.start()
context.awaitTermination()

  若是checkpointDirectory路徑存在,會使用檢查點元數據恢復一個StreamingContext對象。若是路徑不存在,或者程序是第一次運行,則會使用functionToCreateContext來建立一個新的StreamingContext對象。
  RecoverableNetWorkWordCount示例代碼演示了一個從檢查點恢復應用程序的示例。
  
  須要注意的是,想要用到上面的getOrCreate功能,須要在應用程序運行時使其支持失敗自動重跑的功能。這一功能,在接下來一節中有分析。

  另外,在往檢查點寫入數據這一過程,是會增長系統負荷的。所以,須要合理的設置寫入檢查點數據的時間間隔。對於小批量時間間隔(好比1秒)的應用,若是每個batch都執行檢查點寫入操做,會顯著的下降系統的吞吐性能。相反的,若是寫入檢查點數據間隔過久,會致使lineage過長。對那些狀態相關的須要對RDD進行檢查點寫入的算子,檢查點寫入時間間隔最好設置成batch時間間隔的整數倍。好比對於1秒的batch間隔,設置成10秒。有關檢查點時間間隔,可使用dstream.checkpoint(checkpointInterval)。通常來講,檢查點時間間隔設置成5~10倍滑動時間間隔是比較合理的。

十二、部署應用程序

  這一節主要討論如何將一個Spark Streaming應用程序部署起來。
  
(1)需求
  運行一個Spark Streaming應用程序,須要知足一下要求。

  • 須要有一個具備集羣管理器的集羣 - 能夠參考Spark應用部署文檔
  • 應用程序打成JAR包 - 須要將應用程序打成JAR包。接下來使用spark-submit命令來運行應用程序的話,在該JAR包中無需打入Spark和Spark Streaming相關JAR包。然而,若是應用程序中使用到了好比Kafka或者Flume等高階數據源的話,須要將這些依賴的JAR包,以及這些依賴進一步的依賴都打入到應用JAR包中。好比,應用中使用到了KafkaUtils的話,須要將spark-streaming-kafka-0.8_2.11以及其依賴都打入到應用程序JAR包中。
  • 爲Executor設置足夠的內存 - 因爲接收到的數據必須保存在內存中,必須爲Executor設置足夠的內存能容納這些接收到的數據。注意,若是在應用程序中作了10分鐘長度的窗口操做,系統會保存最少10分鐘的數據在內存中。因此應用程序須要的內存除了由接收的數據決定以外,還須要考慮應用程序中的操做類型。
  • 設置檢查點 - 若是應用程序須要用到檢查點,則須要在文件存儲系統上設置好檢查點路徑。
  • 爲應用程序的Driver設置自動重啓 - 爲了實現driver失敗後自動重啓的功能,應用程序運行的系統必須可以監控driver進程,而且若是發現driver失敗時可以自動重啓應用。不一樣的集羣使用不一樣的方式實現自動重啓功能。
    • Spark Standalone - 在這種模式下,driver程序運行在某個wroker節點上。而且,Standalone集羣管理器會監控driver程序,若是發現driver中止運行,而且其狀態碼爲非零值或者因爲運行driver程序的節點失敗致使driver失敗,就會自動重啓該應用。具體的監控和失敗重啓能夠進一步參考Spark Standalone guide
    • YARN - Yarn支持相似的自動重啓應用的機制。更多的細節能夠進一步參考YARN的相關文檔
    • Mesos - Mesos使用Marathon實現了自動重啓功能
  • 設置write ahead logs - 從Spark-1.2版本開始,引入了一個write ahead log機制來實現容錯。若是設置了WAL功能,全部接收到的數據會寫入write ahead log中。這樣能夠避免driver重啓時出現數據丟失,所以能夠保證數據的零丟失,這一點能夠參考前面有關介紹。經過將spark.streaming.receiver.writeAheadLog.enable=true來開啓這一功能。然而,這一功能的開啓會下降數據接收的吞吐量。這是能夠經過同時並行運行多個接收進程(這一點在後面的性能調優部分會有介紹)進行來抵消該負面影響。另外,若是已經設置了輸入數據流的存儲級別爲Storagelevel.MEMORY_AND_DISK_SET,因爲接收到的數據已經會在文件系統上保存一份,這樣就能夠關閉WAL功能了。當使用S3以及其餘任何不支持flushng功能的文件系統來write ahead logs時,要記得設置spark.streaming.driver.writeAheadLog.closeFileAfterWrite以及spark.streaming.receiver.writeAheadLog.closeFileAfterWrite兩個參數。
  • 設置Spark Streaming最大數據接收率 - 若是運行Streaming應用程序的資源不是不少,數據處理能力跟不上接收數據的速率,能夠爲應用程序設置一個每秒最大接收記錄數進行限制。對於Receiver模式的應用,設置spark.streaming.receiver.maxRate,對於Direct Kafka模式,設置spark.streaming.kafka.maxRatePerPartition限制從每一個Kafka的分區讀取數據的速率。假如某個Topic有8個分區,spark.streaming.kafka.maxRatePerpartition=100,那麼每一個batch最大接收記錄爲800。從Spark-1.5版本開始,引入了一個backpressure的機制來避免設置這個限制閾值。Spark Streaming會自動算出當前的速率限制,而且動態調整這個閾值。經過將spark.streaming.backpressure.enabledtrue開啓backpressure功能。

(2)升級應用代碼
  若是運行中的應用程序有更新,須要運行更新後的代碼,有如下兩種機制。

  • 升級後的應用程序直接啓動,與現有的應用程序並行執行。在新舊應用並行運行的過程當中,會接收和處理一部分相同的數據。
  • Gracefully停掉正在運行的應用,而後啓動升級後的應用程序,新的應用程序會從舊的應用程序中止處開始繼續處理數據。須要注意的是,使用這種方式,須要其數據源具備緩存數據的能力,不然在新舊應用程序銜接的間歇期內,數據沒法被處理。好比Kafka和Flume都具備數據緩存的能力。而且,在這種狀況下,再從舊應用程序的檢查點從新構造SparkStreamingContext對象再也不合適,由於檢查點中的信息可能不包含更新的代碼邏輯,這樣會致使程序出現錯誤。在這種狀況下,要麼從新指定一個檢查點,要麼刪除以前的檢查點。

1三、監控應用程序

  在Spark Streaming應用程序運行時,Spark Web UI頁面上會多出一個Streaming的選項卡,在這裏面能夠顯示一些Streaming相關的參數,好比Receiver是否在運行,接收了多少記錄,處理了多少記錄等。以及Batch相關的信息,包括batch的執行時間,等待時間,完成的batch數,運行中的batch數等等。這裏面有兩個時間參數須要注意理解一些:

  • Processing Time - 每個batch中數據的處理時間
  • Scheduling Delay - 當前batch從進入隊列到開始執行的延遲時間

      若是處理時間一直比batch時間跨度要長,或者延遲時間逐漸增加,表示系統已經沒法處理當前的數據量了,這時候就須要考慮如何去下降每個batch的處理時間。如何下降batch處理時間,能夠參考第四節。

      除了監控頁面以外,Spark還提供了StreamingListener接口,經過這個接口能夠獲取到receiver以及batch的處理時間等信息。

4、性能調優

  爲了使Spark Streaming應用可以更好的運行,須要進行一些調優設置,這一節會分析一些性能調優中的參數和設置規則。在性能調優方面,主要須要考慮如下兩個問題:

  • 如何充分利用集羣資源下降每一個Batch的處理時間
  • 如何設置合理的Batch大小,以便應用可以及時處理接收到的這些數據

一、下降每一個Batch的處理時間

  接下來的內容在Spark性能調優中已有介紹,這裏再次強調一下在Streaming中須要注意的一些地方。
  
(1)接收數據進程的並行度
  經過網絡(好比Kafka, Flume, socket等)接收到的數據,首先須要反序列化而後保存在Spark中。當數據接收成爲系統的瓶頸時,就須要考慮如何提升系統接收數據的能力了。每個輸入的DStream會在一個Worker節點上運行一個接收數據流的進程。若是建立了多個接收數據流進程,就能夠生成多個輸入DStream了。好比說,對於Kafka數據源,若是使用的是一個DStream接收來自兩個Topic中的數據的話,就能夠將這兩個Topic拆開,由兩個數據接收進程分開接收。當用兩個receiver接收到DStream後,能夠在應用中將這兩個DStream再進行合併。好比下面代碼中所示

val numStreams = 5
val kafkaStreams = (1 to numStreams).map { i => KafkaUtils.createStream(...) }
val unifiedStream = streamingContext.union(kafkaStreams)
unifiedStream.print()

  須要注意一個參數spark.streaming.blockInterval。對於Receiver來講,接收到的數據在保存到Spark內存中以前,會以block的形式匯聚到一塊兒。每一個Batch中block的個數決定了程序運行時處理這些數據的task的個數。每個receiver的每個batck對應的task個數大體爲(batch時間間隔 / block時間間隔)。好比說對於一個2m的batch,若是block時間間隔爲200ms那麼,將會有10個task。若是task的數量太少,對數據的處理就不會很高效。在batch時間固定的狀況下,若是想增大task個數,那麼就須要下降blockInterval參數了,這個參數默認值爲200ms,官方建議的該參數下限爲50ms,若是低於50ms可能會引發其餘問題。
  另外一個提升數據併發處理能力的方法是顯式的對接收數據從新分區,inputStream.repartition(<number of partitions>)
  
(2)數據處理的並行度
  對於reduceByKeyreduceByKeyAndWindow操做來講,並行task個數由參數spark.default.parallelism來控制。若是想要提升數據處理的並行度,能夠在調用這類方法時,指定並行參數,或者將spark.default.parallelism參數根據集羣實際狀況進行調整。

(3)數據序列化
  能夠經過調整序列化相關的參數,來提升數據序列化性能。在Streaming應用中,有兩類數據須要序列化操做。

  • 輸入數據:默認狀況下,Receiver接收到的數據以StorageLevel.MEMORY_AND_DIS_SER_2的形式保存在Executor的內存中。也就是說,爲了下降GC開銷,這些數據會被序列化成bytes形式,而且還考慮到executor失敗的容錯。這些數據首先會保存在內存中,當內存不足時會spill到磁盤上。使用這種方式的一個明顯問題是,Spark接收到數據後,首先須要反序列化這些數據,而後再按照Spark的方式對這些數據從新序列化。
  • Streaming操做中持久化的RDD:Streaming計算產生的RDD可能也會持久化到內存中。好比窗口操做函數會將數據緩存起來以便後續屢次使用。而且Streaming應用中,這些數據的存儲級別是StorageLevel.MEMORY_ONLY_SET(Spark Core的默認方式是StorageLevel.MEMORY_ONLY)。Streaming對這些數據多了一個序列化操做,這主要也是爲了下降GC開銷。

      在上面這兩種狀況中,可使用Kyro方式對數據進行序列化,同時下降CPU和內存的開銷。有關序列化能夠進一步參考Spark調優。對於Kyro方式的參數設置,請參考Spark Kyro參數設置
      通常狀況下,若是須要緩存的數據量不大,能夠直接將數據以非序列化的形式進行存儲,這樣不會明顯的帶來GC的開銷。好比說,batch時間只有若干秒,而且沒有使用到窗口函數操做,那麼能夠在持久化時顯示的指定存儲級別,避免持久化數據時對數據的序列化操做。
      
    (4)提升task啓動性能
      若是每秒啓動的task個數太多(通常指50個以上),那麼對task的頻繁啓動也是一個不容忽視的損耗。遇到這種狀況時,須要考慮一下Execution模式了。通常來講,在Spark的Standalone模式以及coarse-grained Mesos模式下task的啓動時間會比fine-grained Mesos模式要低。

二、如何正確設置Batch時間間隔

  爲了使一個Spark Streaming應用在集羣上穩定運行,須要保證應用在接收到數據時可以及時處理。若是處理速率不匹配,隨着時間的積累,等待處理的數據將會愈來愈多,最終致使應用沒法正常運行。最好的狀況是batch的處理時間小於batch的間隔時間。因此,正確合理的設置Batch時間間隔是很重要的。
  

三、內存調整

  有關Spark內存的使用以及Spark應用的GC性能調節的更多細節在Spark調優中已經有了更加詳細的描述。這裏簡單分析一些Spark Streaming應用程序會用到的參數。
  
  一個Spark Streaming應用程序須要使用集羣多少內存資源,很大程度上是由該應用中的具體邏輯來決定的,即須要看應用程序中的transformations的類型。好比代碼中使用到長達10分鐘的窗口操做時,就須要使用到可以把10分鐘的數據都保存到內存中的內存量。若是使用updateStateByKey這種操做,而數據中不一樣key特別多,也會使用更多的內存。若是應用的邏輯比較簡單,僅僅是接收-過濾-存儲等一系列操做時,消耗的內存量會明顯減小。
  
  默認狀況下,receivers接收到的數據會以StorageLevel.MEMORY_AND_DISK_SER_2級別進程存儲,當內存中容納不下時會spill到磁盤上,可是這樣會下降應用的處理性能,因此爲了應用可以更高效的運行,最好仍是多分配一些內存以供使用。通常能夠經過在少許數據的狀況下,評估一下數據使用的內存量,繼而計算出應用正式部署時須要分配的總內存量大小。
  
  內存調節的另外一方面是垃圾回收的設置。對一個低延遲的應用系統來講,JVM在垃圾回收時致使應用長時間暫停運行是一個很討厭的場景。

  下面有一些可用於調節內存使用量和GC性能的方面:

  • DStreams的持久化級別:在前面已經提到,輸入數據在默認狀況下會以序列化的字節形式進行持久化。與非序列化存儲相比,這樣會下降內存使用率和下降垃圾回收的負擔。使用Kryo方式進行序列化可以進一步下降序列化後數據大小和內存的使用。想要進一步下降內存的使用量,能夠在數據上再增長一個壓縮功能,經過參數spark.rdd.compress來設置。
  • 清除舊數據:默認狀況下,全部輸入數據和DStream經過不一樣的transformations持久化的數據都會自動進行清理。Spark Streaming根據transformations的不一樣來決定哪些數據須要被清理掉。例如,當使用10分鐘的窗口函數時,Spark Streaming會保存最少10分鐘的數據。想要數據保存更長時間,能夠設置streamingContext.remenber參數。
  • 使用CMS垃圾回收算法:特別建議使用CMS垃圾回收機制來下降GC壓力。driver上經過設置spark-submit命令的--driver-java-options參數來指定,executor上經過設置spark.executor.extraJavaOptions參數來指定。
  • 其餘建議:進一步下降GC負擔,可使用如下一些方法。
    • 使用Tachyon提供的OFF_HEAP存儲級別來持久化RDDs,能夠參考RDD Persistence
    • 下降heap大小,使用更多executors。這樣能夠下降每一個JVM堆的GC壓力。

5、容錯性

  本節主要討論Spark Streaming應用程序失敗後的處理辦法。

一、背景

二、定義

三、基本概念

四、數據接收方式

(1)Files輸入

(2)基於Receiverd 數據源

(3)Kafka Direct輸入方式

五、輸出操做

6、Spark Streaming的升級

7、繼續

相關文章
相關標籤/搜索