Spark Streaming反壓機制探祕

1.反壓機制原理

Spark Streaming中的反壓機制是Spark 1.5.0推出的新特性,能夠根據處理效率動態調整攝入速率。ide

當批處理時間(Batch Processing Time)大於批次間隔(Batch Interval,即 BatchDuration)時,說明處理數據的速度小於數據攝入的速度,持續時間過長或源頭數據暴增,容易形成數據在內存中堆積,最終致使Executor OOM或任務奔潰。性能

在這種狀況下,如果基於Kafka Receiver的數據源,能夠經過設置spark.streaming.receiver.maxRate來控制最大輸入速率;如果基於Direct的數據源(如Kafka Direct Stream),則能夠經過設置spark.streaming.kafka.maxRatePerPartition來控制最大輸入速率。固然,在事先通過壓測,且流量高峯不會超過預期的狀況下,設置這些參數通常沒什麼問題。但最大值,不表明是最優值,最好還能根據每一個批次處理狀況來動態預估下個批次最優速率。在Spark 1.5.0以上,就可經過背壓機制來實現。開啓反壓機制,即設置spark.streaming.backpressure.enabled爲true,Spark Streaming會自動根據處理能力來調整輸入速率,從而在流量高峯時仍能保證最大的吞吐和性能。大數據

override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
    val elements = batchCompleted.batchInfo.streamIdToInputInfo

    for {
      // 處理結束時間
      processingEnd <- batchCompleted.batchInfo.processingEndTime
      // 處理時間,即`processingEndTime` - `processingStartTime`
      workDelay <- batchCompleted.batchInfo.processingDelay
      // 在調度隊列中的等待時間,即`processingStartTime` - `submissionTime`
      waitDelay <- batchCompleted.batchInfo.schedulingDelay
      // 當前批次處理的記錄數
      elems <- elements.get(streamUID).map(_.numRecords)
    } computeAndPublish(processingEnd, elems, workDelay, waitDelay)
  }

能夠看到,接着又調用的是computeAndPublish方法,以下:spa

private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
    Future[Unit] {
      // 根據處理時間、調度時間、當前Batch記錄數,預估新速率
      val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
      newRate.foreach { s =>
      // 設置新速率
        rateLimit.set(s.toLong)
      // 發佈新速率
        publish(getLatestRate())
      }
    }

更深一層,具體調用的是rateEstimator.compute方法來預估新速率,以下:線程

def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Double]

2.反壓機制相關參數

  • spark.streaming.backpressure.enabled
    默認值false,是否啓用反壓機制。scala

  • spark.streaming.backpressure.initialRate
    默認值無,初始最大接收速率。只適用於Receiver Stream,不適用於Direct Stream。類型爲整數,默認直接讀取全部,在1開啓的狀況下,限制第一次批處理應該消費的數據,由於程序冷啓動隊列裏面有大量積壓,防止第一次所有讀取,形成系統阻塞code

  • spark.streaming.kafka.maxRatePerPartition
    類型爲整數,默認直接讀取全部,限制每秒每一個消費線程讀取每一個kafka分區最大的數據量生命週期

  • spark.streaming.stopGracefullyOnShutdown
    優雅關閉,確保在kill任務時,可以處理完最後一批數據,再關閉程序,不會發生強制kill致使數據處理中斷,沒處理完的數據丟失
注意: 只有 3 激活的時候,每次消費的最大數據量,就是設置的數據量,若是不足這個數,就有多少讀多少,若是超過這個數字,就讀取這個數字的設置的值
只有 1+3 激活的時候,每次消費讀取的數量最大會等於3設置的值,最小是spark根據系統負載自動推斷的值,消費的數據量會在這兩個範圍以內變化根據系統狀況,但第一次啓動會有多少讀多少數據。此後按 1+3 設置規則運行
1+2+3 同時激活的時候,跟上一個消費狀況基本同樣,但第一次消費會獲得限制,由於咱們設置第一次消費的頻率了。
  • spark.streaming.backpressure.rateEstimator
    默認值pid,速率控制器,Spark 默認只支持此控制器,可自定義。隊列

  • spark.streaming.backpressure.pid.proportional
    默認值1.0,只能爲非負值。當前速率與最後一批速率之間的差值對總控制信號貢獻的權重。用默認值便可。內存

  • spark.streaming.backpressure.pid.integral
    默認值0.2,只能爲非負值。比例偏差累積對總控制信號貢獻的權重。用默認值便可。

  • spark.streaming.backpressure.pid.derived
    默認值0.0,只能爲非負值。比例偏差變化對總控制信號貢獻的權重。用默認值便可。

  • spark.streaming.backpressure.pid.minRate

    默認值100,只能爲正數,最小速率。

3.反壓機制的使用

//啓用反壓機制
conf.set("spark.streaming.backpressure.enabled","true")
//最小攝入條數控制
conf.set("spark.streaming.backpressure.pid.minRate","1")
//最大攝入條數控制
conf.set("spark.streaming.kafka.maxRatePerPartition","12")
//初始最大接收速率控制
conf.set("spark.streaming.backpressure.initialRate","10")

要保證反壓機制真正起做用前Spark 應用程序不會崩潰,須要控制每一個批次最大攝入速率。以Direct Stream爲例,如Kafka Direct Stream,則能夠經過spark.streaming.kafka.maxRatePerPartition參數來控制。此參數表明了 每秒每一個分區最大攝入的數據條數。假設BatchDuration爲10秒,spark.streaming.kafka.maxRatePerPartition爲12條,kafka topic 分區數爲3個,則一個批(Batch)最大讀取的數據條數爲360條(31210=360)。同時,須要注意,該參數也表明了整個應用生命週期中的最大速率,即便是背壓調整的最大值也不會超過該參數。

相關文章
相關標籤/搜索