Spark Streaming Backpressure分析


                默認狀況下,Spark Streaming經過Receiver以生產者生產數據的速率接收數據,計算過程當中會出現batch processing time > batch interval的狀況,其中batch processing time 爲實際計算一個批次花費時間, batch interval爲Streaming應用設置的批處理間隔。這意味着Spark Streaming的數據接收速率高於Spark從隊列中移除數據的速率,也就是數據處理能力低,在設置間隔內不能徹底處理當前接收速率接收的數據。若是這種狀況持續過長的時間,會形成數據在內存中堆積,致使Receiver所在Executor內存溢出等問題(若是設置StorageLevel包含disk, 則內存存放不下的數據會溢寫至disk, 加大延遲)。Spark 1.5之前版本,用戶若是要限制Receiver的數據接收速率,能夠經過設置靜態配製參數「spark.streaming.receiver.maxRate」的值來實現,此舉雖然能夠經過限制接收速率,來適配當前的處理能力,防止內存溢出,但也會引入其它問題。好比:producer數據生產高於maxRate,當前集羣處理能力也高於maxRate,這就會形成資源利用率降低等問題。爲了更好的協調數據接收速率與資源處理能力,Spark Streaming從v1.5開始引入反壓機制(back-pressure),經過動態控制數據接收速率來適配集羣數據處理能力。併發


                Spark Streaming Backpressure:  根據JobScheduler反饋做業的執行信息來動態調整Receiver數據接收率。經過屬性「spark.streaming.backpressure.enabled」來控制是否啓用backpressure機制,默認值false,即不啓用。ide

2.1 Streaming架構以下圖所示(詳見Streaming數據接收過程文檔和Streaming 源碼解析)oop

2.2 BackPressure執行過程以下圖所示:post

  在原架構的基礎上加上一個新的組件RateController,這個組件負責監聽「OnBatchCompleted」事件,而後從中抽取processingDelay 及schedulingDelay信息.  Estimator依據這些信息估算出最大處理速度(rate),最後由基於Receiver的Input Stream將rate經過ReceiverTracker與ReceiverSupervisorImpl轉發給BlockGenerator(繼承自RateLimiter).this

三、BackPressure 源碼解析url

3.1 RateController類體系spa

                RateController 繼承自StreamingListener. 用於處理BatchCompleted事件。核心代碼爲:線程

 * A StreamingListener that receives batch completion updates, and maintains * an estimate of the speed at which this stream should ingest messages, * given an estimate computation from a `RateEstimator` */
private[streaming] abstract class RateController(val streamUID: Int, rateEstimator: RateEstimator) extends StreamingListener with Serializable { …… …… /** * Compute the new rate limit and publish it asynchronously. */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } } def getLatestRate(): Long = rateLimit.get() override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { val elements = batchCompleted.batchInfo.streamIdToInputInfo for { processingEnd <- batchCompleted.batchInfo.processingEndTime workDelay <- batchCompleted.batchInfo.processingDelay waitDelay <- batchCompleted.batchInfo.schedulingDelay elems <- elements.get(streamUID).map(_.numRecords) } computeAndPublish(processingEnd, elems, workDelay, waitDelay) } } 

3.2 RateController的註冊

                JobScheduler啓動時會抽取在DStreamGraph中註冊的全部InputDstream中的rateController,並向ListenerBus註冊監聽. 此部分代碼以下:

def start(): Unit = synchronized { if (eventLoop != null) return // scheduler has already been started
 logDebug("Starting JobScheduler") eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") { override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event) override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e) } eventLoop.start() // attach rate controllers of input streams to receive batch completion updates
   for { inputDStream <- ssc.graph.getInputStreams rateController <- inputDStream.rateController } ssc.addStreamingListener(rateController) listenerBus.start() receiverTracker = new ReceiverTracker(ssc) inputInfoTracker = new InputInfoTracker(ssc) receiverTracker.start() jobGenerator.start() logInfo("Started JobScheduler") }

3.3 BackPressure執行過程分析

                BackPressure 執行過程分爲BatchCompleted事件觸發時機和事件處理兩個過程

3.3.1 BatchCompleted觸發過程


                Streaming 應用中JobGenerator每一個Batch Interval都會爲應用中的每一個Output Stream創建一個Job, 該批次中的全部Job組成一個JobSet.使用JobScheduler的submitJobSet進行批量Job提交。此部分代碼結構以下所示

/** Generate jobs and perform checkpoint for the given `time`. */
private def generateJobs(time: Time) { // Set the SparkEnv in this thread, so that job generation code can access the environment // Example: BlockRDDs are created in this thread, and it needs to access BlockManager // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env) // Checkpoint all RDDs marked for checkpointing to ensure their lineages are // truncated periodically. Otherwise, we may run into stack overflows (SPARK-6847).
  ssc.sparkContext.setLocalProperty(RDD.CHECKPOINT_ALL_MARKED_ANCESTORS, "true") Try { jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    graph.generateJobs(time) // generate jobs using allocated block
 } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) }, clearCheckpointDataLater = false)) }

 其中,sumitJobSet會建立固定數量的後臺線程(具體由「spark.streaming.concurrentJobs」指定),去處理Job Set中的Job. 具體實現邏輯爲:

def submitJobSet(jobSet: JobSet) { if ( { logInfo("No jobs added for time " + jobSet.time) } else { jobSets.put(jobSet.time, jobSet) => jobExecutor.execute(new JobHandler(job))) logInfo("Added jobs for time " + jobSet.time) } }

其中JobHandler用於執行Job及處理Job執行結果信息。當Job執行完成時會產生JobCompleted事件. JobHandler的具體邏輯以下面代碼所示:

private def handleJobCompletion(job: Job, completedTime: Long) { val jobSet = jobSets.get(job.time) jobSet.handleJobCompletion(job) job.setEndTime(completedTime) logInfo("Finished job " + + " from job set of time " + jobSet.time) if (jobSet.hasCompleted) { jobSets.remove(jobSet.time) jobGenerator.onBatchCompletion(jobSet.time) logInfo("Total delay: %.3f s for time %s (execution: %.3f s)".format( jobSet.totalDelay / 1000.0, jobSet.time.toString, jobSet.processingDelay / 1000.0 )) } job.result match { case Failure(e) => reportError("Error running job " + job, e) case _ => } }



override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) { val elements = batchCompleted.batchInfo.streamIdToInputInfo for { processingEnd <- batchCompleted.batchInfo.processingEndTime workDelay <- batchCompleted.batchInfo.processingDelay waitDelay <- batchCompleted.batchInfo.schedulingDelay elems <- elements.get(streamUID).map(_.numRecords) } computeAndPublish(processingEnd, elems, workDelay, waitDelay) }

onBatchCompleted會從完成的任務中抽取任務的執行延遲和調度延遲,而後用這兩個參數用RateEstimator(目前存在惟一實現PIDRateEstimator,proportional-integral-derivative (PID) controller, PID控制器)估算出新的rate併發布。代碼以下:

/** * Compute the new rate limit and publish it asynchronously. */
  private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit = Future[Unit] { val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay) newRate.foreach { s => rateLimit.set(s.toLong) publish(getLatestRate()) } }


 /** * A RateController that sends the new rate to receivers, via the receiver tracker. */
  private[streaming] class ReceiverRateController(id: Int, estimator: RateEstimator) extends RateController(id, estimator) { override def publish(rate: Long): Unit = ssc.scheduler.receiverTracker.sendRateUpdate(id, rate) }

publish的功能爲新生成的rate 藉助ReceiverTracker進行轉發。ReceiverTracker將rate包裝成UpdateReceiverRateLimit事交ReceiverTrackerEndpoint

/** Update a receiver's maximum ingestion rate */ def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized { if (isTrackerStarted) { endpoint.send(UpdateReceiverRateLimit(streamUID, newRate)) } }


/** RpcEndpointRef for receiving messages from the ReceiverTracker in the driver */
private val endpoint = env.rpcEnv.setupEndpoint( "Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint { override val rpcEnv: RpcEnv = env.rpcEnv override def receive: PartialFunction[Any, Unit] = { case StopReceiver => logInfo("Received stop signal") ReceiverSupervisorImpl.this.stop("Stopped by driver", None) case CleanupOldBlocks(threshTime) => logDebug("Received delete old batch signal") cleanupOldBlocks(threshTime) case UpdateRateLimit(eps) => logInfo(s"Received a new rate limit: $eps.") registeredBlockGenerators.asScala.foreach { bg => bg.updateRate(eps) } } })


/** * Set the rate limit to `newRate`. The new rate will not exceed the maximum rate configured by * {{{spark.streaming.receiver.maxRate}}}, even if `newRate` is higher than that. * * @param newRate A new rate in events per second. It has no effect if it's 0 or negative. */
 private[receiver] def updateRate(newRate: Long): Unit =
   if (newRate > 0) { if (maxRateLimit > 0) { rateLimiter.setRate(newRate.min(maxRateLimit)) } else { rateLimiter.setRate(newRate) } }


public final void setRate(double permitsPerSecond) { Preconditions.checkArgument(permitsPerSecond > 0.0
        && !Double.isNaN(permitsPerSecond), "rate must be positive"); synchronized (mutex) { resync(readSafeMicros()); double stableIntervalMicros = TimeUnit.SECONDS.toMicros(1L) / permitsPerSecond;  //固定間隔
      this.stableIntervalMicros = stableIntervalMicros; doSetRate(permitsPerSecond, stableIntervalMicros); } }



  當Receiver開始接收數據時,會經過supervisor.pushSingle()方法將接收的數據存入currentBuffer等待BlockGenerator定時將數據取走,包裝成block. 在將數據存放入currentBuffer之時,要獲取許可(令牌)。若是獲取到許可就能夠將數據存入buffer, 不然將被阻塞,進而阻塞Receiver從數據源拉取數據。

/** * Push a single data item into the buffer. */ def addData(data: Any): Unit = { if (state == Active) { waitToPush() //獲取令牌
 synchronized { if (state == Active) { currentBuffer += data } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } } } else { throw new SparkException( "Cannot add data as BlockGenerator has not been started or has been stopped") } }

   其令牌投放採用令牌桶機制進行, 原理以下圖所示:

令牌桶機制: 大小固定的令牌桶可自行以恆定的速率源源不斷地產生令牌。若是令牌不被消耗,或者被消耗的速度小於產生的速度,令牌就會不斷地增多,直到把桶填滿。後面再產生的令牌就會從桶中溢出。最後桶中能夠保存的最大令牌數永遠不會超過桶的大小。當進行某操做時須要令牌時會從令牌桶中取出相應的令牌數,若是獲取到則繼續操做,不然阻塞。用完以後不用放回。

  Streaming 數據流被Receiver接收後,按行解析後存入iterator中。而後逐個存入Buffer,在存入buffer時會先獲取token,若是沒有token存在,則阻塞;若是獲取到則將數據存入buffer.  而後等價後續生成block操做。
