Spark Streaming的PIDRateEstimator與backpressure

PIDRateEstimator是Spark Streaming用來實現backpressure的關鍵組件。html

看了一些博客文章,感受對它的解釋都沒有說到要點,仍是本身來研究一下比較好。app

首先,須要搞清楚的一個問題是Spark Streaming的backpressure是想讓系統達到怎麼樣的一種狀態。這個問題不明確,PIDRateEstimator的做用就搞不清楚。ide

backpressure的目標

首先,backpressure這套機制是系統(由應用程序和物理資源組成的總體)的內在性質對Spark Streaming的吞吐量的限制,而並不是是某種優化。能夠認爲,在固定的資源下(CPU、內存、IO),Spark Streaming程序存在吞吐量的上限。優化

放在非micro-batch的狀況下考慮,這意味着存在一個最大處理速度,RateEstimator認爲這個速度的單位爲records/second (不過實際上,每條消息的處理所耗的時間可能差異很大,因此這個速度的單位用records/second其實是可能並不合適,多是一種過分的簡化)this

放在Spark Streaming的micro-batch的狀況下,因爲調度器每隔batch duration的時間間隔生成一個micro-batch,這個吞吐率的上限意味着每一個batch總的消息數量存在上限。若是給每一個batch分配率的消息總數超過這個上限,每秒處理消息條數是不變的,只會使得batch的處理時間延長,這樣對於系統沒有什麼好處,反而因爲每一個batch太大而可能致使OOM。spa

當達到這個最大處理速度時,表現就是batch duration等於batch的計算階段所花的時間,也就是batch duration == batch processing time。3d

那麼backpressure的目標,就是使得系統達到上邊這個狀態(這個並不是徹底對,下面的分析會給出具體的狀態)。它不會使得系統的累積未處理的數據減小,也不會使得系統的吞吐率提升(在不引發OOM,以及不計算GC的開銷的狀況下,當processing time > batch duration時,系統的吞吐量已經達到最高)。而只是使得系統的實際吞吐量穩定在最大吞吐量(除非你手動設置的rate的最大值小於最大吞吐量)code

 

PIDRateEstimator

首先,要明確PID控制器的做用。orm

引用一篇blog的說法:htm

PID控制器是一個在工業控制應用中常見的反饋迴路部件。

這個控制器把收集到的數據和一個參考值進行比較,而後把這個差異用於計算新的輸入值,

這個新的輸入值的目的是可讓系統的數據達到或者保持在參考值。

PID控制器能夠根據歷史數據和差異的出現率來調整輸入值,使系統更加準確而穩定。

重點在於它的目的是調整輸入,比而使得系統的某個咱們關注的目標指標到目標值。

PID的控制輸出的公式爲

這裏u(t)爲PID的輸出。

SP是setpoint, 就是參考值

PV是 process variable, 也就是測量值。

A PID controller continuously calculates an error value e(t) as the difference between a desired setpoint (SP) and a measured process variable (PV) and applies a correction based on proportionalintegral, and derivative terms (denoted PI, and D respectively), hence the name.

計算邏輯

首先,看下RateEstimator的compute方法的定義

複製代碼
private[streaming] trait RateEstimator extends Serializable {

  /**
   * Computes the number of records the stream attached to this `RateEstimator`
   * should ingest per second, given an update on the size and completion
   * times of the latest batch.
   *
   * @param time The timestamp of the current batch interval that just finished
   * @param elements The number of records that were processed in this batch
   * @param processingDelay The time in ms that took for the job to complete
   * @param schedulingDelay The time in ms that the job spent in the scheduling queue
   */
  def compute(
      time: Long,
      elements: Long,
      processingDelay: Long,
      schedulingDelay: Long): Option[Double]
}
複製代碼

看下參數的含義

  • time: 從它的來源看,它來源於BatchInfo的processingEndTime, 準確含義是 「Clock time of when the last job of this batch finished processing」,也就是這個batch處理結束的時間
  • elements: 這個batch處理的消息條數
  • processingDelay: 這個job在實際計算階段花的時間(不算調度延遲)
  • schedulingDelay:這個job花在調度隊列裏的時間

PIDRateEstimator是獲取當前這個結束的batch的數據,而後估計下一個batch的rate(注意,下一個batch並不必定跟當前結束的batch是連續兩個batch,可能會有積壓未處理的batch)。

PIDRateEstimator對於PID控制器裏的"error"這個值是這麼計算的:

複製代碼
 
 
// in seconds, should be close to batchDuration
val delaySinceUpdate = (time - latestTime).toDouble / 1000

// in elements/second
val processingRate = numElements.toDouble / processingDelay * 1000

// In our system `error` is the difference between the desired rate and the measured rate
// based on the latest batch information. We consider the desired rate to be latest rate,
// which is what this estimator calculated for the previous batch.
// in elements/second
val error = latestRate - processingRate

val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis

// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate

val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
  
複製代碼

這裏的latestRate是指PID控制器爲上一個batch,也就是當前結束的batch,在生成這個batch的時候估計的處理速度。

因此上邊代碼中,latestRate就是參考值, processingRate就是測量值。

這裏爲何如此計算我仍是沒搞清楚,由於latestRate是一個變化的值,不知道這樣在數學上會對後邊的積分、微分項的含義形成什麼影響。

error什麼時候爲0

能夠推導出來當batchDuration = processingDelay時候,這裏的error爲零。

推導過程爲:

latestRate實際上等於numElements / batchDuration,由於numElements是上次生成job時根據這個latestRate(也就是當時的estimated rate)算出來的。

那麼 error = (numElements / batchDuaration) - (numElements/processingDelay)             這裏的processingDelay就是processing time

因此,當processingDelay等於batchDuration時候,error爲零。

可是error爲零時,PID的輸出不必定爲零,由於須要考慮到歷史偏差和偏差的變化。這裏剛結束的batch可能並不是生成後就當即被執行,而是在調度隊列裏排了一會隊,因此仍是須要考慮schedulingDelay,它反應了歷史偏差。

那麼何時達到穩定狀態呢?

 當PID輸出爲0時,newRate就等於latestRate,此時系統達到了穩定狀態,error爲零,historicalError和dError都爲0。

這意味着:

  • 沒有schedulingDelay,意味着job等待被調度的時間爲0. 若是沒有累積的未執行的job,那麼schedulingDelay大體等於0.
  • error爲零,意味着batchDuration等於processingDelay
  • dError爲零,在error等於0時,意味着上一次計算的error也爲零。

這就是整個RateEstimator,也就是backpressure想要系統達到的狀態。

 

這裏能夠定性地分析一下達到穩定狀態的過程:

  • 若是batch分配的消息少於最高吞吐量,就會有processingRate  > latestRate, 從而使得error爲負,若是忽略積分和微分項的影響,就會使得newRate = latestRate - propotional * rate,從而使得newRate增大,所以下一個batch處理的消息會變多。
  • 若是batch分配的消息大於最高吞吐量,就會有processingRate < latestRate,從而使得error爲正,若是此前已經有job被積累,那麼historicalError也爲正,考慮到dError的係數默認爲0,因此此時newRate  = latestRate - proportional * error -integral * historicalError  使得newRate變小,從而使得下一個batch處理的消息變少,當newRate == latestRate時,有 -proportional * error == integral * historicalError,即error爲一個負值,也即processingRate > latestRate,也就是說會使得給每一個batch分配的消息小於它的最大處理量。此時,因爲processingDelay小於batchDuration,會使得歷史上累積的job有機會獲得處理,從而逐漸減小在等待的job數量。

 

能夠看出來這個PIDRateEstimator並不是是廣泛最優的,由於它的假設是系統的動態特定不隨時間變化,可是實際上若是沒有頗有效的資源隔離,系統對於Spark Streaming程度來說,其資源是隨時間變化的,並且在某些時間可能發生劇烈的變化。此時,此時RateEstimator應該作出更劇烈的變化來應對,好比經過動態調整各個部分的係數。

若是用戶對本身的系統有深的瞭解,好比當資源和負載是週期性變化時,那就能夠定製更合適的RateEstimator,好比考慮到天天同比的流量變化來調整estimatedRate。

相關文章
相關標籤/搜索