spark streaming 的 Job建立、調度、提交

上文已經從源碼分析了Receiver接收的數據交由BlockManager管理,整個數據接收流都已經運轉起來了,那麼讓咱們回到分析JobScheduler的博客中。apache

// JobScheduler.scala line 62
  def start(): Unit = synchronized {
    if (eventLoop != null) return // scheduler has already been started

    logDebug("Starting JobScheduler")
    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
    eventLoop.start()

    // attach rate controllers of input streams to receive batch completion updates
    for {
      inputDStream <- ssc.graph.getInputStreams
      rateController <- inputDStream.rateController
    } ssc.addStreamingListener(rateController)

    listenerBus.start(ssc.sparkContext)
    receiverTracker = new ReceiverTracker(ssc)
    inputInfoTracker = new InputInfoTracker(ssc)
    receiverTracker.start()
    jobGenerator.start()
    logInfo("Started JobScheduler")
  }

前面好幾篇博客都是 由 receiverTracker.start() 延展開。延展完畢後,繼續下一步。app

// JobScheduler.scala line 83
jobGenerator.start()

jobGenerator的實例化過程,前面已經分析過。深刻下源碼瞭解到。socket

  1. 實例化eventLoop,此處的eventLoop與JobScheduler中的eventLoop不同,對應的是不一樣的泛型。
  2. EventLoop.start
  3. 首次啓動,startFirstTime
// JobGenerator.scala line 78
  /** Start generation of jobs */
  def start(): Unit = synchronized {
    if (eventLoop != null) return // generator has already been started

    // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
    // See SPARK-10125
    checkpointWriter

    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)
      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

    if (ssc.isCheckpointPresent) {
      restart()
    } else {
      startFirstTime()
    }
  }
// JobGenerator.scala line 189
  /** Starts the generator for the first time */
  private def startFirstTime() {
    val startTime = new Time(timer.getStartTime())
    graph.start(startTime - graph.batchDuration)
    timer.start(startTime.milliseconds)
    logInfo("Started JobGenerator at " + startTime)
  }

將DStreamGraph.startide

  1. 將全部的outputStreams都initialize,初始化首次執行時間,依賴的DStream一併設置。
  2. 若是設置了duration,將全部的outputStreams都remember,依賴的DStream一併設置
  3. 啓動前驗證,主要是驗證chechpoint設置是否衝突以及各類Duration
  4. 將全部的inputStreams啓動;讀者掃描了下目前版本1.6.0InputDStraem及其全部的子類。start方法啥都沒作。結合以前的博客,inputStreams都已經交由ReceiverTracker管理了。
// DStreamGraph.scala line 39
  def start(time: Time) {
    this.synchronized {
      require(zeroTime == null, "DStream graph computation already started")
      zeroTime = time
      startTime = time
      outputStreams.foreach(_.initialize(zeroTime))
      outputStreams.foreach(_.remember(rememberDuration))
      outputStreams.foreach(_.validateAtStart)
      inputStreams.par.foreach(_.start())
    }
  }

至此,只是作了一些簡單的初始化,並無讓數據處理起來。函數

再回到JobGenerator。此時,將循環定時器啓動,oop

// JobGenerator.scala line 193
    timer.start(startTime.milliseconds)

循環定時器啓動;讀者是否是很熟悉,是否是在哪見過這個循環定時器?源碼分析

沒錯,就是BlockGenerator.scala line 105 、109 ,兩個線程,其中一個是循環定時器,定時將數據放入待push隊列中。post

// RecurringTimer.scala line 59
  def start(startTime: Long): Long = synchronized {
    nextTime = startTime
    thread.start()
    logInfo("Started timer for " + name + " at time " + nextTime)
    nextTime
  }

具體的邏輯是在構造是傳入的方法:longTime => eventLoop.post(GenerateJobs(new Time(longTime)));ui

輸入是Long,this

方法體是eventLoop.post(GenerateJobs(new Time(longTime)))

// JobGenerator.scala line 58
  private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
    longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

只要線程狀態不是stopped,一直循環。

  1. 初始化的時候將上面的方法傳進來,  callback: (Long) => Unit 對應的就是  longTime => eventLoop.post(GenerateJobs(new Time(longTime)))
  2. start的時候 thread.run啓動,裏面的loop方法被執行。
  3. loop中調用的是 triggerActionForNextInterval。
  4. triggerActionForNextInterval調用構造傳入的callback,也就是上面的 longTime => eventLoop.post(GenerateJobs(new Time(longTime))) 
private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
  extends Logging {
// RecurringTimer.scala line 27
  private val thread = new Thread("RecurringTimer - " + name) {
    setDaemon(true)
    override def run() { loop }
  }
// RecurringTimer.scala line 56
  /**
   * Start at the given start time.
   */
  def start(startTime: Long): Long = synchronized {
    nextTime = startTime
    thread.start()
    logInfo("Started timer for " + name + " at time " + nextTime)
    nextTime
  }
// RecurringTimer.scala line 92
  private def triggerActionForNextInterval(): Unit = {
    clock.waitTillTime(nextTime)
    callback(nextTime)
    prevTime = nextTime
    nextTime += period
    logDebug("Callback for " + name + " called at time " + prevTime)
  }

// RecurringTimer.scala line 100
  /**
   * Repeatedly call the callback every interval.
   */
  private def loop() {
    try {
      while (!stopped) {
        triggerActionForNextInterval()
      }
      triggerActionForNextInterval()
    } catch {
      case e: InterruptedException =>
    }
  }
// ...一些代碼
}

定時發送GenerateJobs 類型的事件消息,eventLoop.post中將事件消息加入到eventQueue中

// EventLoop.scala line 102
  def post(event: E): Unit = {
    eventQueue.put(event)
  }

同時,此EventLoop中的另外一個成員變量 eventThread。會一直從隊列中取事件消息,將此事件做爲參數調用onReceive。而此onReceive在實例化時被override了。

// JobGenerator.scala line 86
    eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
      override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = {
        jobScheduler.reportError("Error in job generator", e)
      }
    }
    eventLoop.start()

onReceive調用的是

// JobGenerator.scala line 177
  /** Processes all events */
  private def processEvent(event: JobGeneratorEvent) {
    logDebug("Got event " + event)
    event match {
      case GenerateJobs(time) => generateJobs(time)
      // 其餘case class
    }
  }

GenerateJobs case class 是匹配到 generateJobs(time:Time) 來處理

  1. 獲取當前時間批次ReceiverTracker收集到的全部的Blocks,若開啓WAL會執行WAL
  2. DStreamGraph生產任務
  3. 提交任務
  4. 若設置checkpoint,則checkpoint
// JobGenerator.scala line 240
  /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
      jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
      graph.generateJobs(time) // generate jobs using allocated block
    } match {
      case Success(jobs) =>
        val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
        jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
      case Failure(e) =>
        jobScheduler.reportError("Error generating jobs for time " + time, e)
    }
    eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
  }

上述代碼不是特別容易理解。細細拆分:咋一看覺得是try{} catch{case ... },仔細一看,是Try{}match{}

追蹤下代碼,原來Try是大寫的,是一個伴生對象,apply接收的參數是一個方法,返回Try的實例。在scala.util.Try.scala 代碼以下:

// scala.util.Try.scala line 155
object Try {
  /** Constructs a `Try` using the by-name parameter.  This
   * method will ensure any non-fatal exception is caught and a
   * `Failure` object is returned.
   */
  def apply[T](r: => T): Try[T] =
    try Success(r) catch {
      case NonFatal(e) => Failure(e)
    }

}

Try有兩個子類,都是case class 。分別是Success和Failure。如圖

再返回調用處,Try中的代碼塊最後執行的是 graph.generateJobs(time) 。跟蹤下:

返回的是outputStream.generateJob(time)。

// DStreamGraph.scala line 111
  def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

從前文可知,outputStream其實都是ForEachDStream。進入ForEachDStream,override了generateJob。

  1. parent.getOrCompute(time) 返回一個Option[Job]。
  2. 如有rdd,則返回多是new Job(time,jobFunc)
// ForEachDStream.scala line 46
  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

那麼ForEachDStream的parent是什麼呢?看下咱們的案例:

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Durations, StreamingContext}

object StreamingWordCountSelfScala {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setMaster("spark://master:7077").setAppName("StreamingWordCountSelfScala")
    val ssc = new StreamingContext(sparkConf, Durations.seconds(5)) // 每5秒收割一次數據
    val lines = ssc.socketTextStream("localhost", 9999) // 監聽 本地9999 socket 端口
    val words = lines.flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _) // flat map 後 reduce
    words.print() // 打印結果
    ssc.start() // 啓動
    ssc.awaitTermination()
    ssc.stop(true)
  }
}

按照前文的描述:本例中 DStream的依賴是 SocketInputDStream << FlatMappedDStream << MappedDStream << ShuffledDStream << ForEachDStream

筆者掃描了下DStream及其全部子類,發現只有DStream有 getOrCompute,沒有一個子類override了此方法。如此一來,是ShuffledDStream.getorCompute

在通常狀況下,是RDD不存在,執行orElse代碼快,

// DStream.scala line 338
  /**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details. We need to have this call here because
          // compute() might cause Spark jobs to be launched.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)  // line 352
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

ShuffledDStream.compute 

又調用parent.getOrCompute

// ShuffledDStream.scala line 40
  override def compute(validTime: Time): Option[RDD[(K, C)]] = {
    parent.getOrCompute(validTime) match {
      case Some(rdd) => Some(rdd.combineByKey[C](
          createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine))
      case None => None
    }
  }

MappedDStream的compute,又是父類的getOrCompute,結果又調用compute,如此循環。

// MappedDStream.scala line 34
  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }

FlatMappedDStream的compute,又是父類的getOrCompute。結果又調用compute,如此循環。

// FlatMappedDStream.scala line 34
  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
  }

直到DStreamshi SocketInputDStream,也就是inputStream時,compute是繼承自父類。

先不考慮if中的邏輯,直接else代碼塊。

進入createBlockRDD

// ReceiverInputDStream.scala line 69
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }
new BlockRDD[T](ssc.sc, validBlockIds) line 127,RDD實例化成功
// ReceiverInputDStream.scala line 94
  private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {

    if (blockInfos.nonEmpty) {
      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

      // Are WAL record handles present with all the blocks
      val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

      if (areWALRecordHandlesPresent) {
        // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
        val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
        val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
      } else {
        // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
        // others then that is unexpected and log a warning accordingly.
        if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
          if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
            logError("Some blocks do not have Write Ahead Log information; " +
              "this is unexpected and data may not be recoverable after driver failures")
          } else {
            logWarning("Some blocks have Write Ahead Log information; this is unexpected")
          }
        }
        val validBlockIds = blockIds.filter { id =>
          ssc.sparkContext.env.blockManager.master.contains(id)
        }
        if (validBlockIds.size != blockIds.size) {
          logWarning("Some blocks could not be recovered as they were not found in memory. " +
            "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
            "for more details.")
        }
        new BlockRDD[T](ssc.sc, validBlockIds) // line 127
      }
    } else {
      // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
      // according to the configuration
      if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, Array.empty, Array.empty, Array.empty)
      } else {
        new BlockRDD[T](ssc.sc, Array.empty)
      }
    }
  }

此BlockRDD是Spark Core的RDD的子類,且沒有依賴的RDD。至此,RDD的實例化已經完成。

// BlockRDD.scala line 30
private[spark]
class BlockRDD[T: ClassTag](sc: SparkContext, @transient val blockIds: Array[BlockId])
  extends RDD[T](sc, Nil) 

// RDd.scala line 74
abstract class RDD[T: ClassTag](
    @transient private var _sc: SparkContext,
    @transient private var deps: Seq[Dependency[_]]
  ) extends Serializable with Logging

至此,最終還原回來的RDD:

new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(flatMapFunc)).map(_.map[U](mapFunc)).combineByKey[C](createCombiner, mergeValue, mergeCombiner, partitioner, mapSideCombine)。

在本例中則爲

new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true)

而最終的print爲

() => foreachFunc(new BlockRDD[T](ssc.sc, validBlockIds).map(_.flatMap(t=>t.split(" "))).map(_.map[U](t=>(t,1))).combineByKey[C](t=>t, (t1,t2)=>t1+t2, (t1,t2)=>t1+t2,partitioner, true),time)

其中foreachFunc爲 DStrean.scala line 766

至此,RDD已經經過DStream實例化完成,如今再回顧下,是否能夠理解DStream是RDD的模版。

不過別急,回到ForEachDStream.scala line 46 ,將上述函數做爲構造參數,傳入Job。

 

-------------分割線--------------

補充下Job建立的流程圖,來源於版本定製班學員博客,略有修改。

 

 

補充下RDD按照lineage從 OutputDStream 回溯 建立RDD Dag的流程圖,來源於版本定製班學員博客

 

 

補充案例中 RDD按照lineage從 OutputDStream 回溯 建立RDD Dag的流程圖,來源於版本定製班學員博客

 

 

下節內容從源碼分析Job提交,敬請期待。

相關文章
相關標籤/搜索