PIDRateEstimator是Spark Streaming用來實現backpressure的關鍵組件。html
看了一些博客文章,感受對它的解釋都沒有說到要點,仍是本身來研究一下比較好。app
首先,須要搞清楚的一個問題是Spark Streaming的backpressure是想讓系統達到怎麼樣的一種狀態。這個問題不明確,PIDRateEstimator的做用就搞不清楚。ide
首先,backpressure這套機制是系統(由應用程序和物理資源組成的總體)的內在性質對Spark Streaming的吞吐量的限制,而並不是是某種優化。能夠認爲,在固定的資源下(CPU、內存、IO),Spark Streaming程序存在吞吐量的上限。優化
放在非micro-batch的狀況下考慮,這意味着存在一個最大處理速度,RateEstimator認爲這個速度的單位爲records/second (不過實際上,每條消息的處理所耗的時間可能差異很大,因此這個速度的單位用records/second其實是可能並不合適,多是一種過分的簡化)。this
放在Spark Streaming的micro-batch的狀況下,因爲調度器每隔batch duration的時間間隔生成一個micro-batch,這個吞吐率的上限意味着每一個batch總的消息數量存在上限。若是給每一個batch分配率的消息總數超過這個上限,每秒處理消息條數是不變的,只會使得batch的處理時間延長,這樣對於系統沒有什麼好處,反而因爲每一個batch太大而可能致使OOM。spa
當達到這個最大處理速度時,表現就是batch duration等於batch的計算階段所花的時間,也就是batch duration == batch processing time。3d
那麼backpressure的目標,就是使得系統達到上邊這個狀態(這個並不是徹底對,下面的分析會給出具體的狀態)。它不會使得系統的累積未處理的數據減小,也不會使得系統的吞吐率提升(在不引發OOM,以及不計算GC的開銷的狀況下,當processing time > batch duration時,系統的吞吐量已經達到最高)。而只是使得系統的實際吞吐量穩定在最大吞吐量(除非你手動設置的rate的最大值小於最大吞吐量)code
首先,要明確PID控制器的做用。orm
引用一篇blog的說法:htm
PID控制器是一個在工業控制應用中常見的反饋迴路部件。
這個控制器把收集到的數據和一個參考值進行比較,而後把這個差異用於計算新的輸入值,
這個新的輸入值的目的是可讓系統的數據達到或者保持在參考值。
PID控制器能夠根據歷史數據和差異的出現率來調整輸入值,使系統更加準確而穩定。
重點在於它的目的是調整輸入,比而使得系統的某個咱們關注的目標指標到目標值。
PID的控制輸出的公式爲
這裏u(t)爲PID的輸出。
SP是setpoint, 就是參考值
PV是 process variable, 也就是測量值。
A PID controller continuously calculates an error value e(t) as the difference between a desired setpoint (SP) and a measured process variable (PV) and applies a correction based on proportional, integral, and derivative terms (denoted P, I, and D respectively), hence the name.
首先,看下RateEstimator的compute方法的定義
private[streaming] trait RateEstimator extends Serializable { /** * Computes the number of records the stream attached to this `RateEstimator` * should ingest per second, given an update on the size and completion * times of the latest batch. * * @param time The timestamp of the current batch interval that just finished * @param elements The number of records that were processed in this batch * @param processingDelay The time in ms that took for the job to complete * @param schedulingDelay The time in ms that the job spent in the scheduling queue */ def compute( time: Long, elements: Long, processingDelay: Long, schedulingDelay: Long): Option[Double] }
看下參數的含義
PIDRateEstimator是獲取當前這個結束的batch的數據,而後估計下一個batch的rate(注意,下一個batch並不必定跟當前結束的batch是連續兩個batch,可能會有積壓未處理的batch)。
PIDRateEstimator對於PID控制器裏的"error"這個值是這麼計算的:
// in seconds, should be close to batchDuration
val delaySinceUpdate = (time - latestTime).toDouble / 1000
// in elements/second
val processingRate = numElements.toDouble / processingDelay * 1000
// In our system `error` is the difference between the desired rate and the measured rate
// based on the latest batch information. We consider the desired rate to be latest rate,
// which is what this estimator calculated for the previous batch.
// in elements/second
val error = latestRate - processingRate
val historicalError = schedulingDelay.toDouble * processingRate / batchIntervalMillis
// in elements/(second ^ 2)
val dError = (error - latestError) / delaySinceUpdate
val newRate = (latestRate - proportional * error -
integral * historicalError -
derivative * dError).max(minRate)
這裏的latestRate是指PID控制器爲上一個batch,也就是當前結束的batch,在生成這個batch的時候估計的處理速度。
因此上邊代碼中,latestRate就是參考值, processingRate就是測量值。
這裏爲何如此計算我仍是沒搞清楚,由於latestRate是一個變化的值,不知道這樣在數學上會對後邊的積分、微分項的含義形成什麼影響。
能夠推導出來當batchDuration = processingDelay時候,這裏的error爲零。
推導過程爲:
latestRate實際上等於numElements / batchDuration,由於numElements是上次生成job時根據這個latestRate(也就是當時的estimated rate)算出來的。
那麼 error = (numElements / batchDuaration) - (numElements/processingDelay) 這裏的processingDelay就是processing time
因此,當processingDelay等於batchDuration時候,error爲零。
可是error爲零時,PID的輸出不必定爲零,由於須要考慮到歷史偏差和偏差的變化。這裏剛結束的batch可能並不是生成後就當即被執行,而是在調度隊列裏排了一會隊,因此仍是須要考慮schedulingDelay,它反應了歷史偏差。
當PID輸出爲0時,newRate就等於latestRate,此時系統達到了穩定狀態,error爲零,historicalError和dError都爲0。
這意味着:
這就是整個RateEstimator,也就是backpressure想要系統達到的狀態。
這裏能夠定性地分析一下達到穩定狀態的過程:
能夠看出來這個PIDRateEstimator並不是是廣泛最優的,由於它的假設是系統的動態特定不隨時間變化,可是實際上若是沒有頗有效的資源隔離,系統對於Spark Streaming程度來說,其資源是隨時間變化的,並且在某些時間可能發生劇烈的變化。此時,此時RateEstimator應該作出更劇烈的變化來應對,好比經過動態調整各個部分的係數。
若是用戶對本身的系統有深的瞭解,好比當資源和負載是週期性變化時,那就能夠定製更合適的RateEstimator,好比考慮到天天同比的流量變化來調整estimatedRate。