SparkStreaming之JobGenerator週期性任務數據處理邏輯源碼深度剖析-Spark商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

1 JobGenerator的前世

1.1 JobGenerator的難兄難弟ReceiverTracker

1.2 ReceiverTracker 的難兄難弟JobGenerator

JobGenerator週期性的不斷產生Job,最終Job會在Executor上執行處理。緩存

1.3 ReceiverTracker與receivedBlockTracker 的相愛相殺

  • 咱們能夠看到receivedBlockTracker包含在ReceiverTracker,最重要的是receivedBlockTracker內部維護了一個 streamIdToUnallocatedBlockQueues,用於追蹤Executor上報上來的Block。架構

    class ReceiverTracker(ssc: StreamingContext, skipReceiverLaunch: Boolean = false) extends Logging {
        private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
        private val receiverInputStreamIds = receiverInputStreams.map { _.id }
        private val receivedBlockTracker = new ReceivedBlockTracker(
          ssc.sparkContext.conf,
          ssc.sparkContext.hadoopConfiguration,
          receiverInputStreamIds,
          ssc.scheduler.clock,
          ssc.isCheckpointPresent,
          Option(ssc.checkpointDir)
        )
    複製代碼
  • receivedBlockTracker內部重要的元數據存儲結構:app

    private val streamIdToUnallocatedBlockQueues = new mutable.HashMap[Int, ReceivedBlockQueue]
    複製代碼

1.4 StreamingContext如何雙劍合璧

JobScheduler裏面包含核心的重量級成員,分別是:jobGenerator 和 receiverTracker。其中初始化以下:框架

注意:jobGenerator中構造函數是JobScheduler

private val jobGenerator = new JobGenerator(this)

receiverTracker = new ReceiverTracker(ssc)
複製代碼

2 JobGenerator的此生

  • JobGenerator 中重要成員RecurringTimer,負責用戶定義時間窗的觸發
    private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
          longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")
    複製代碼
  • JobGenerator 的啓動,經過StreamingContext來觸發,最終調用startFirstTime
    def start(): Unit = synchronized {
      if (eventLoop != null) return // generator has already been started
      // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
      // See SPARK-10125
      checkpointWriter
    
      eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
        override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
    
        override protected def onError(e: Throwable): Unit = {
          jobScheduler.reportError("Error in job generator", e)
        }
      }
      eventLoop.start()
    
      if (ssc.isCheckpointPresent) {
        restart()
      } else {
        startFirstTime()
      }
    }
    複製代碼
  • JobGenerator 最終啓動ssc.graph和timer,所以整個處理邏輯開始啓動了。
    private def startFirstTime() {
          val startTime = new Time(timer.getStartTime())
          graph.start(startTime - graph.batchDuration)
          timer.start(startTime.milliseconds)
          logInfo("Started JobGenerator at " + startTime)
        }
    複製代碼

2.1 JobGenerator的4步核心處理邏輯

2.2 第一步:allocateBlocksToBatch

  • JobGenerator持有jobScheduler的引用,jobScheduler持有receiverTracker的引用ide

    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    複製代碼
  • receiverTracker持有receivedBlockTracker的引用函數

  • 從streamIdToUnallocatedBlockQueues取出streamId對應的全部間隔爲200ms(default)採集的block,並把它放到timeToAllocatedBlocks中。oop

    * Allocate all unallocated blocks to the given batch.
       * This event will get written to the write ahead log (if enabled).
     
        def allocateBlocksToBatch(batchTime: Time): Unit = synchronized {
          if (lastAllocatedBatchTime == null || batchTime > lastAllocatedBatchTime) {
          
          
          (首先按照用戶設置的時間窗,從streamIdToUnallocatedBlockQueues取出全部的Block)
            val streamIdToBlocks = streamIds.map { streamId =>
                (streamId, getReceivedBlockQueue(streamId).dequeueAll(x => true))
            }.toMap                                                   <=點睛之筆
            
            (而後把未分配用戶指定時間窗的block放進timeToAllocatedBlocks)
            val allocatedBlocks = AllocatedBlocks(streamIdToBlocks)    <=點睛之筆
            
            
            if (writeToLog(BatchAllocationEvent(batchTime, allocatedBlocks))) {
              timeToAllocatedBlocks.put(batchTime, allocatedBlocks)
              lastAllocatedBatchTime = batchTime
            } else {
              logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
            }
          } else {
            // This situation occurs when:
            // 1. WAL is ended with BatchAllocationEvent, but without BatchCleanupEvent,
            // possibly processed batch job or half-processed batch job need to be processed again,
            // so the batchTime will be equal to lastAllocatedBatchTime.
            // 2. Slow checkpointing makes recovered batch time older than WAL recovered
            // lastAllocatedBatchTime.
            // This situation will only occurs in recovery time.
            logInfo(s"Possibly processed batch $batchTime needs to be processed again in WAL recovery")
          }
        }
    複製代碼
  • timeToAllocatedBlocks 是 receiverTracker(成員receivedBlockTracker)中包含的核心成員,反向迭代到調用鏈最頂端,根據timeToAllocatedBlocks來生成generatedRDDs
  • streamIdToUnallocatedBlockQueues :沒有被分配批次的Block集合
  • timeToAllocatedBlocks :已經被分配批次的block集合
  • 下面是DStream的模板代碼,就是爲了生成RDD來使用的,getOrCompute方法只有DStream有,因此上一級生成RDD後,就會放入generatedRDDs中。
  • generatedRDDs 中沒有,就會調用compute,而Compute又會調用getOrCompute。getOrCompute又會調用Compute,反反覆覆進行一直回溯到InputDStream的Compute
    * Get the RDD corresponding to the given time; either retrieve it from cache
     * or compute-and-cache it.
     
    private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    
      // If RDD was already generated, then retrieve it from HashMap,
      // or else compute the RDD
      
      generatedRDDs.get(time).orElse {
        // Compute the RDD if time is valid (e.g. correct time in a sliding window)
        // of RDD generation, else generate nothing.
        if (isTimeValid(time)) {
    
          val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
            // Disable checks for existing output directories in jobs launched by the streaming
            // scheduler, since we may need to write output to an existing directory during checkpoint
            // recovery; see SPARK-4835 for more details. We need to have this call here because
            // compute() might cause Spark jobs to be launched.
            SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
              compute(time)
            }
          }
    
          rddOption.foreach { case newRDD =>
            // Register the generated RDD for caching and checkpointing
            if (storageLevel != StorageLevel.NONE) {
              newRDD.persist(storageLevel)
              logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
            }
            if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
              newRDD.checkpoint()
              logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
            }
            
            generatedRDDs.put(time, newRDD)                           <=點睛之筆
            
          }
          rddOption
        } else {
          None
        }
      }
    }
    複製代碼
  • MapPartitionedDStream的compute方法
    override def compute(validTime: Time): Option[RDD[U]] = {
          parent.getOrCompute(validTime).map(_.mapPartitions[U](mapPartFunc, preservePartitioning))
        }
    複製代碼
  • eceiverInputDstream中的compute方法
    * Generates RDDs with blocks received by the receiver of this stream. 
       
            override def compute(validTime: Time): Option[RDD[T]] = {
              val blockRDD = {
      
            if (validTime < graph.startTime) {
              // If this is called for any time before the start time of the context,
              // then this returns an empty RDD. This may happen when recovering from a
              // driver failure without any write ahead log to recover pre-failure data.
              new BlockRDD[T](ssc.sc, Array.empty)
            } else {
            
            
              // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
              // for this batch
              
              (主要從timeToAllocatedBlocks中取出數據)
              val receiverTracker = ssc.scheduler.receiverTracker                    <=點睛之筆
              val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)    <=點睛之筆
      
              // Register the input blocks information into InputInfoTracker
              val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
              ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
      
             (主要從timeToAllocatedBlocks中取出數據,構建RDD,方便後續調用鏈使用generatedRDDs)
              // Create the BlockRDD 
              createBlockRDD(validTime, blockInfos)                       <=點睛之筆                 
            }
          }
          Some(blockRDD)
        }
    複製代碼

可見 allocateBlocksToBatch的做用就是爲了把對應窗的Block放進timeToAllocatedBlocks。方便調用鏈使用。post

2.3 第二步:graph.generateJobs

  • DStreamGraph的核心做用是註冊了outputStreams,那麼是何時註冊的呢?
  • Action函數 print -> foreachRDD -> ForEachDStream -> register -> ssc.graph.addOutputStream(this)
  • DStreamGraph.generateJobs最終調用了 outputStream.generateJob(time)
    private val inputStreams = new ArrayBuffer[InputDStream[_]]()
      private val outputStreams = new ArrayBuffer[DStream[_]]()
     
      def generateJobs(time: Time): Seq[Job] = {
         logDebug("Generating jobs for time " + time)
         val jobs = this.synchronized {
           outputStreams.flatMap { outputStream =>
             val jobOption = outputStream.generateJob(time)
             jobOption.foreach(_.setCallSite(outputStream.creationSite))
             jobOption
           }
         }
         logDebug("Generated " + jobs.length + " jobs for time " + time)
         jobs
       }
    複製代碼
  • outputStream.generateJob定義了jobFunc,生成new Job(time, jobFunc)
    private[streaming] def generateJob(time: Time): Option[Job] = {
         getOrCompute(time) match {
           case Some(rdd) =>
             val jobFunc = () => {
               val emptyFunc = { (iterator: Iterator[T]) => {} }
               context.sparkContext.runJob(rdd, emptyFunc)
             }
             Some(new Job(time, jobFunc))
           case None => None
         }
       }
    複製代碼

2.4 第三步: jobScheduler.inputInfoTracker.getInfo(time)

  • 就是爲了對應批次Block的元數據信息

    // Map to track all the InputInfo related to specific batch time and input stream. private val batchTimeToInputInfos = new mutable.HashMap[Time, mutable.HashMap[Int, StreamInputInfo]]學習

    case class StreamInputInfo(
         inputStreamId: Int, numRecords: Long, metadata: Map[String, Any] = Map.empty) 
    複製代碼

2.5 第四步: jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))

  • JobGenerator 持有JobScheduler的引用,最終會提交Job的並開始驅動Executor計算。
    def submitJobSet(jobSet: JobSet) {
        if (jobSet.jobs.isEmpty) {
          logInfo("No jobs added for time " + jobSet.time)
        } else {
          listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
          jobSets.put(jobSet.time, jobSet)
          jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
          logInfo("Added jobs for time " + jobSet.time)
        }
      }
    複製代碼

    3 總結

本文是做者花大量時間整理而成,請勿作伸手黨,禁止轉載,歡迎學習,有問題請留言。

秦凱新 於深圳 2018

相關文章
相關標籤/搜索