第6課:Spark Streaming源碼解讀之Job動態生成和深度思考

本期內容:java

1,Spark Streaming Job生成深度思考git

2,Spark Streaming Job生成源碼解析github

 

先來看下JobGenerator類,其構造函數中須要傳入JobScheduler對象,而JobScheduler類是Spark Streaming Job生成和提交Job到集羣的核心。JobGenerator基於DStreamGraph 來生成Job,再次強調這裏的Job至關於Java中Runnable接口對業務邏輯的封裝,他和Spark Core中Job不是同一個概念,Spark Core中的Job就是運行的做業,Spark Streaming中的Job是更高層的抽象。數組

/**
 * This class generates jobs from DStreams as well as drives checkpointing and cleaning
 * up DStream metadata.
 */
private[streaming]
class JobGenerator(jobScheduler: JobScheduler) extends Logging {

  private val ssc = jobScheduler.ssc
  private val conf = ssc.conf
  private val graph = ssc.graph微信

 

Spark Streaming中的Job,只是一個Java Bean,業務邏輯在func這個函數中。併發

/**
 * Class representing a Spark computation. It may contain multiple Spark jobs.
 */
private[streaming]
class Job(val time: Time, func: () => _) {
  private var _id: String = _
  private var _outputOpId: Int = _
  private var isSet = false
  private var _result: Try[_] = null
  private var _callSite: CallSite = null
  private var _startTime: Option[Long] = None
  private var _endTime: Option[Long] = Noneapp

 

DStream有三種類型,第一種是不一樣的輸入來源構建的Stream,例如來自Socket,Kafka,Flume,第二種是輸出,outputStreams 是邏輯級別的Action,因爲仍是Spark Streaming框架級別的,最終還要變爲物理級別的Action,第三種是Transforms操做從一種DStream轉變爲另外一種DStream,即基於其餘DStream產生的。其中DStreamGraph 類記錄了數據來源的DStream,和輸出類型的DStream。框架

//DStreamGraph是RDD的靜態的模板,表示RDD依賴關係構成的具體處理邏輯步驟
final private[streaming] class DStreamGraph extends Serializable with Logging {

  // InputDStream類型的動態數組
  //輸入流:數據來源
  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  //輸出流:具體Action的輸出操做
  private val outputStreams = new ArrayBuffer[DStream[_]]()ide

 

JobGenerator會根據BatchDuration時間間隔,隨着時間的推移,會不斷的產生做業,驅使checkpoint操做和清理以前DStream的數據。函數

對於流處理和批處理的思考。批處理間隔時間足夠短的話就是流處理。Spark Streaming的流處理是以時間爲觸發器的,Strom的流處理是事件爲觸發器的。定時任務,流處理,J2EE觸發做業。

思考一個問題:DStreamGraph邏輯級別翻譯成物理級別的RDD Graph,最後一個操做是RDD的action操做,是否會當即觸發Job?

JobGenerator產生的Job是Runnable的封裝,對DStream的依賴關係生成RDD之間的依賴關係,最後的操做就是Action,因爲這些操做都是在方法中,尚未被調用因此並無在翻譯時觸發Job。若是在翻譯時就觸發Job,這樣整個Spark Streaming的Jon提交就不受管理了。

當JobScheduler要調度Job的時候,就從線程池中拿出一個線程來執行封裝Dstream到RDD的方法。

 

 

接下來從JobGeneratorJobScheduler,ReceiverTracker這三個角度來說Job的生成。其中JobGenerator是負責Job的生成,JobScheduler是負責Job的調度,ReceiverTracker是記錄數據的來源。JobGenerator和ReceiverTracker是JobScheduler的成員。

/**
 * This class schedules jobs to be run on Spark. It uses the JobGenerator to generate
 * the jobs and runs them using a thread pool.
 * 本類對運行在Spark上的job進行調度。使用JobGenerator來生成Jobs,而且在線程池運行。
 * 說的很清楚了。由JobGenerator生成Job,在線程池中運行。
 */
private[streaming]
class JobScheduler(val ssc: StreamingContext) extends Logging {

  // Use of ConcurrentHashMap.keySet later causes an odd runtime problem due to Java 7/8 diff
  // https://gist.github.com/AlainODea/1375759b8720a3f9f094
  private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]
  // 默認併發Jobs數爲1
  private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1)
  // 使用線程池方式執行
  private val jobExecutor =
    ThreadUtils.newDaemonFixedThreadPool(numConcurrentJobs, "streaming-job-executor")
  // 建立JobGenerator,後續會詳細分析
  private val jobGenerator = new JobGenerator(this)
  val clock = jobGenerator.clock
  val listenerBus = new StreamingListenerBus()

  // These two are created only when scheduler starts.
  // eventLoop not being null means the scheduler has been started and not stopped
  var receiverTracker: ReceiverTracker = null
  // A tracker to track all the input stream information as well as processed record number
  var inputInfoTracker: InputInfoTracker = null

 

在JobScheduler的start方法中,分別調用了ReceiverTracker和JobGenerator的start方法。

def start(): Unit = synchronized {
  if (eventLoop != null) return // scheduler has already been started

  logDebug("Starting JobScheduler")
  //消息驅動系統
  eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
    override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
  }
  //啓動消息循環處理線程
  eventLoop.start()

  // attach rate controllers of input streams to receive batch completion updates
  for {
    inputDStream <- ssc.graph.getInputStreams
    rateController <- inputDStream.rateController
  } ssc.addStreamingListener(rateController)

  listenerBus.start(ssc.sparkContext)
  receiverTracker = new ReceiverTracker(ssc)
  inputInfoTracker = new InputInfoTracker(ssc)
  //啓動receiverTracker
  receiverTracker.start()
  //啓動Job生成器
  jobGenerator.start()
  logInfo("Started JobScheduler")
}

 

先看下JobGenerator的start方法,checkpoint的初始化操做,實例化並啓動消息循環體EventLoop,開啓定時生成Job的定時器。

/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter

  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
  //啓動消息循環處理線程
  eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    //開啓定時生成Job的定時器
    startFirstTime()
  }
}

EvenLoop類中有存儲消息的LinkedBlockingDeque和後臺線程,後臺線程從隊列中獲取消息,而後調用onReceive方法對該消息進行處理,這裏的onReceive方法即匿名內部類中重寫onReceive方法的processEvent方法。

private[spark] abstract class EventLoop[E](name: String) extends Logging {

  private val eventQueue: BlockingQueue[E] = new LinkedBlockingDeque[E]()

  private val stopped = new AtomicBoolean(false)

  private val eventThread = new Thread(name) {
    setDaemon(true)

    override def run(): Unit = {
      try {
        while (!stopped.get) {
          val event = eventQueue.take()
          try {
            onReceive(event)
          } catch {
            case NonFatal(e) => {
              try {
                onError(e)
              } catch {
                case NonFatal(e) => logError("Unexpected error in " + name, e)
              }
            }
          }
        }
      } catch {
        case ie: InterruptedException => // exit even if eventQueue is not empty
        case NonFatal(e) => logError("Unexpected error in " + name, e)
      }
    }

  }

  def start(): Unit = {
    if (stopped.get) {
      throw new IllegalStateException(name + " has already been stopped")
    }
    // Call onStart before starting the event thread to make sure it happens before onReceive
    onStart()
    eventThread.start()
  }

processEvent方法是對消息類型進行模式匹配,而後路由到對應處理該消息的方法中。消息的處理通常是發給另一個線程來處理的,消息循環器不處理耗時的業務邏輯。

/** Processes all events */
private def processEvent(event: JobGeneratorEvent) {
  logDebug("Got event " + event)
  event match {
    case GenerateJobs(time) => generateJobs(time)
    case ClearMetadata(time) => clearMetadata(time)
    case DoCheckpoint(time, clearCheckpointDataLater) =>
      doCheckpoint(time, clearCheckpointDataLater)
    case ClearCheckpointData(time) => clearCheckpointData(time)
  }
}

GenerateJobs消息的處理函數generateJobs爲例,在獲取到數據後調用DStreamGraph的generateJobs方法來生成Job。

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    //根據特定的時間獲取具體的數據
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //調用DStreamGraph的generateJobs生成Job
    graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

generateJobs方法中outputStreams是整個DStream中的最後一個DStream。這裏outputStream.generateJob(time)相似於RDD中從後往前推。

def generateJobs(time: Time): Seq[Job] = {
  logDebug("Generating jobs for time " + time)
  val jobs = this.synchronized {
    outputStreams.flatMap { outputStream =>
      val jobOption = outputStream.generateJob(time)
      jobOption.foreach(_.setCallSite(outputStream.creationSite))
      jobOption
    }
  }
  logDebug("Generated " + jobs.length + " jobs for time " + time)
  jobs
}

generateJob方法中jobFunc 封裝了context.sparkContext.runJob(rdd, emptyFunc)

private[streaming] def generateJob(time: Time): Option[Job] = {
  getOrCompute(time) match {
    case Some(rdd) => {

      //用函數封裝了Job自己,該方法如今沒有執行
      val jobFunc = () => {
        val emptyFunc = { (iterator: Iterator[T]) => {} }
        context.sparkContext.runJob(rdd, emptyFunc)
      }
      Some(new Job(time, jobFunc))
    }
    case None => None
  }
}

Job對象,方法run會致使傳入的func被調用。

/**
 * Class representing a Spark computation. It may contain multiple Spark jobs.
 */
private[streaming]
class Job(val time: Time, func: () => _) {
  private var _id: String = _
  private var _outputOpId: Int = _
  private var isSet = false
  private var _result: Try[_] = null
  private var _callSite: CallSite = null
  private var _startTime: Option[Long] = None
  private var _endTime: Option[Long] = None

  def run() {
    _result = Try(func())
  }

 

getOrCompute方法,先根據傳入的時間在HashMap中查找下RDD是否存在,若是不存在則調用compute方法計算獲取RDD,再根據storageLevel 是否須要persist,是否到了checkpoint時間點進行checkpoint操做,最後把該RDD放入到HashMap中。

/**
 * Get the RDD corresponding to the given time; either retrieve it from cache
 * or compute-and-cache it.
 */
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
  // If RDD was already generated, then retrieve it from HashMap,
  // or else compute the RDD
  generatedRDDs.get(time).orElse {
    // Compute the RDD if time is valid (e.g. correct time in a sliding window)
    // of RDD generation, else generate nothing.
    if (isTimeValid(time)) {

      val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
        // Disable checks for existing output directories in jobs launched by the streaming
        // scheduler, since we may need to write output to an existing directory during checkpoint
        // recovery; see SPARK-4835 for more details. We need to have this call here because
        // compute() might cause Spark jobs to be launched.
        PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
          compute(time)
        }
      }

      rddOption.foreach { case newRDD =>
        // Register the generated RDD for caching and checkpointing
        if (storageLevel != StorageLevel.NONE) {
          newRDD.persist(storageLevel)
          logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
        }
        if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
          newRDD.checkpoint()
          logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
        }
        generatedRDDs.put(time, newRDD)
      }
      rddOption
    } else {
      None
    }
  }
}

再次回到JobGenerator類中,看下start方法中在消息循環體啓動後,先判斷以前是否進行checkpoint操做,若是是從checkpoint目錄中讀取而後再調用restart重啓JobGenerator,若是是第一次則調用startFirstTime方法。

/** Start generation of jobs */
def start(): Unit = synchronized {
  if (eventLoop != null) return // generator has already been started

  // Call checkpointWriter here to initialize it before eventLoop uses it to avoid a deadlock.
  // See SPARK-10125
  checkpointWriter

  eventLoop = new EventLoop[JobGeneratorEvent]("JobGenerator") {
    override protected def onReceive(event: JobGeneratorEvent): Unit = processEvent(event)

    override protected def onError(e: Throwable): Unit = {
      jobScheduler.reportError("Error in job generator", e)
    }
  }
  //啓動消息循環處理線程
  eventLoop.start()

  if (ssc.isCheckpointPresent) {
    restart()
  } else {
    //開啓定時生成Job的定時器
    startFirstTime()
  }
}

JobGenerator類中的startFirstTime方法,啓動定時生成Job的Timer

/** Starts the generator for the first time */
private def startFirstTime() {
  val startTime = new Time(timer.getStartTime())
  graph.start(startTime - graph.batchDuration)
  timer.start(startTime.milliseconds)
  logInfo("Started JobGenerator at " + startTime)
}

timer對象爲RecurringTimer,其start方法內部啓動一個線程,在線程中不斷調用triggerActionForNextInterval方法

// 循環定時器,定時回調 eventLoop.post(GenerateJobs(new Time(longTime)))。
// 定義了定時觸發的函數,此函數就是將 發送 類型爲"GenerateJobs"的事件
// 值得注意的事,這裏只是定義了回調函數。
//根據建立StreamContext時傳入的batchInterval,定時發送GenerateJobs消息
private val timer = new RecurringTimer(clock, ssc.graph.batchDuration.milliseconds,
  longTime => eventLoop.post(GenerateJobs(new Time(longTime))), "JobGenerator")

 

/**
 * Start at the given start time.
 */
def start(startTime: Long): Long = synchronized {
  nextTime = startTime
  thread.start()
  logInfo("Started timer for " + name + " at time " + nextTime)
  nextTime
}

 

// 這裏建立了一個守護線程
private val thread = new Thread("RecurringTimer - " + name) {
  setDaemon(true)
  override def run() { loop }
}

 

/**
 * Repeatedly call the callback every interval.
 */
private def loop() {
  try {
    while (!stopped) {
      triggerActionForNextInterval()
    }
    triggerActionForNextInterval()
  } catch {
    case e: InterruptedException =>
  }
}

triggerActionForNextInterval方法,等待BatchDuration後回調callback這個方法,這裏的callback方法是構造RecurringTimer對象時傳入的方法,即longTime => eventLoop.post(GenerateJobs(new Time(longTime))),不斷向消息循環體發送GenerateJobs消息

private def triggerActionForNextInterval(): Unit = {
  clock.waitTillTime(nextTime)
  callback(nextTime)
  prevTime = nextTime
  nextTime += period
  logDebug("Callback for " + name + " called at time " + prevTime)
}

 

private[streaming]
class RecurringTimer(clock: Clock, period: Long, callback: (Long) => Unit, name: String)
  extends Logging {

 

咱們再次聚焦generateJobs這個方法生成Job的步驟,

第一步:獲取當前時間段內的數據。

第二步:生成Job,RDD之間的依賴關係。

第三步:獲取生成Job對應的StreamId的信息。

第四步:封裝成JobSet交給JobScheduler。

第五步:進行checkpoint操做。

/** Generate jobs and perform checkpoint for the given `time`.  */
private def generateJobs(time: Time) {
  // Set the SparkEnv in this thread, so that job generation code can access the environment
  // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
  // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
  SparkEnv.set(ssc.env)
  Try {
    //第一步:獲取當前時間段內的數據。
    jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
    //第二步:生成Job,RDD之間的依賴關係。

graph.generateJobs(time) // generate jobs using allocated block
  } match {
    case Success(jobs) =>

  //第三步:獲取生成Job對應的StreamId的信息。
      val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time)

     //第四步:封裝成JobSet交給JobScheduler。
      jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos))
    case Failure(e) =>
      jobScheduler.reportError("Error generating jobs for time " + time, e)
  }

  //第五步:進行checkpoint操做。
  eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false))
}

其中submitJobSet方法,只是把JobSet放到ConcurrentHashMap中,把Job封裝爲JobHandler提交到jobExecutor線程池中

def submitJobSet(jobSet: JobSet) {
  if (jobSet.jobs.isEmpty) {
    logInfo("No jobs added for time " + jobSet.time)
  } else {
    listenerBus.post(StreamingListenerBatchSubmitted(jobSet.toBatchInfo))
    jobSets.put(jobSet.time, jobSet)
    jobSet.jobs.foreach(job => jobExecutor.execute(new JobHandler(job)))
    logInfo("Added jobs for time " + jobSet.time)
  }
}

 

private val jobSets: java.util.Map[Time, JobSet] = new ConcurrentHashMap[Time, JobSet]

JobHandler對象爲實現Runnable 接口,job的run方法致使了func的調用,即基於DStream的業務邏輯

private class JobHandler(job: Job) extends Runnable with Logging {
    import JobScheduler._

    def run() {
      try {
        val formattedTime = UIUtils.formatBatchTime(
          job.time.milliseconds, ssc.graph.batchDuration.milliseconds, showYYYYMMSS = false)
        val batchUrl = s"/streaming/batch/?id=${job.time.milliseconds}"
        val batchLinkText = s"[output operation ${job.outputOpId}, batch time ${formattedTime}]"

        ssc.sc.setJobDescription(
          s"""Streaming job from <a href="$batchUrl">$batchLinkText</a>""")
        ssc.sc.setLocalProperty(BATCH_TIME_PROPERTY_KEY, job.time.milliseconds.toString)
        ssc.sc.setLocalProperty(OUTPUT_OP_ID_PROPERTY_KEY, job.outputOpId.toString)

        // We need to assign `eventLoop` to a temp variable. Otherwise, because
        // `JobScheduler.stop(false)` may set `eventLoop` to null when this method is running, then
        // it's possible that when `post` is called, `eventLoop` happens to null.
        var _eventLoop = eventLoop
        if (_eventLoop != null) {
          _eventLoop.post(JobStarted(job, clock.getTimeMillis()))
          // Disable checks for existing output directories in jobs launched by the streaming
          // scheduler, since we may need to write output to an existing directory during checkpoint
          // recovery; see SPARK-4835 for more details.
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            job.run()
          }
          _eventLoop = eventLoop
          if (_eventLoop != null) {
            _eventLoop.post(JobCompleted(job, clock.getTimeMillis()))
          }
        } else {
          // JobScheduler has been stopped.
        }
      } finally {
        ssc.sc.setLocalProperty(JobScheduler.BATCH_TIME_PROPERTY_KEY, null)
        ssc.sc.setLocalProperty(JobScheduler.OUTPUT_OP_ID_PROPERTY_KEY, null)
      }
    }
  }
}

備註:

一、DT大數據夢工廠微信公衆號DT_Spark  二、IMF晚8點大數據實戰YY直播頻道號:68917580 三、新浪微博: http://www.weibo.com/ilovepains

相關文章
相關標籤/搜索