Spark版本定製八:Spark Streaming源碼解讀之RDD生成全生命週期完全研究和思考

本期內容:緩存

一、DStream與RDD關係完全研究微信

二、Streaming中RDD的生成完全研究數據結構

1、DStream與RDD關係完全研究

課前思考:app

RDD是怎麼生成的?socket

RDD依靠什麼生成?根據DStream來的ide

RDD生成的依據是什麼?函數

Spark Streaming中RDD的執行是否和Spark Core中的RDD執行有所不一樣?oop

運行以後咱們對RDD怎麼處理?post

ForEachDStream不必定會觸發Job的執行,可是它必定會觸發job的產生,和Job是否執行沒有關係;性能

 

問:RDD依靠什麼生成的?

      下面以官方自帶的案例來研究RDD是依靠DStream產生的:  

object NetworkWordCount {
  def main(args: Array[String]) {
    // Create the context with a 1 second batch size
    val sparkConf = new SparkConf().setAppName("NetworkWordCount")
    val ssc = new StreamingContext(sparkConf, Seconds(1))

    val lines = ssc.socketTextStream("Master", 9999)//輸入的DStream
    val words = lines.flatMap(_.split(" ")) //輸入和輸出之間的都是transformation的DStream
    val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
    wordCounts.print() // 內部會致使Action級別的觸發  print()輸出的DStream
    ssc.start()
    ssc.awaitTermination()
  }
}

從上面的紅色代碼中分析出此案例依次產生了以下DStream,而且它們是從後往前依賴的:

ReceiverInputDStream-->FlatMappedDStream-->MappedDStream-->ShuffledDStream-->ForEachDStream
如何證實DStream之間是相互依賴的呢,咱們隨便挑一個子DStream做爲入口進行分析,好比MappedDStream:
/** Return a new DStream by applying a function to all elements of this DStream. */
  def map[U: ClassTag](mapFunc: T => U): DStream[U] = ssc.withScope {
    new MappedDStream(this, context.sparkContext.clean(mapFunc))
  }
MappedDStream類:
private[streaming]
class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent) 
  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

MappedDStream中的compute方法,會先獲取parent Dstream.而後基於其結果進行map操做,其中mapFunc就是咱們傳入的業務邏輯,這就證實了它們的依賴關係!
問:DStream爲何要從後往前依賴呢?
由於DStream表明Spark Streaming業務邏輯,RDD是從後往前依賴的,DStream是lazy級別的。DStream的依賴關係必須和RDD的依賴關係保持高度一致

上面產生的子DStream都繼承自DStream,因此咱們從DStream入手:
/*
 * DStreams internally is characterized by a few basic properties:
 * - A list of other DStreams that the DStream depends on * - A time interval at which the DStream generates an RDD * - A function that is used to generate an RDD after each time interval

    大體意思是:

   1.DStream依賴於其餘DStream,除了第一個DStream,由於第一個DStream基於數據源產生,用於接收數據,因此無其餘依賴;進一步證實了DStream是從後往前依賴!!

   2.基於DStream怎麼產生RDD?每隔BatchDuration,DStream生成一個RDD;

   3.每隔BatchDuration,DStream內部函數會生成RDD;

 */

abstract class DStream[T: ClassTag] (
    @transient private[streaming] var ssc: StreamingContext
  ) extends Serializable with Logging {


  // RDDs generated, marked as private[streaming] so that testsuites can access it
//DStream是RDD的模板,每隔一個batchInterval會根據DStream模板生成一個對應的RDD。而後將RDD存儲到DStream中的generatedRDDs數據結構中 @transient private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

generatedRDDs是DStream的成員,說明DStream的實例中均有此成員,但實際運行的時候,只須要知道最好一個DStream便可,由於能夠從最後一個推導出以前因此的DStream!!

到此,咱們驗證了RDD是DStream是產生的結論!

下一節咱們分析DStream是到底怎麼生存RDD的?

2、Streaming中RDD的生成完全研究

 //DStream是RDD的模板,每隔一個batchInterval會根據DStream模板生成一個對應的RDD。而後將RDD存儲到DStream中的generatedRDDs數據結構中 @transient private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]] ()

generatedRDDs在哪裏被實例化的?搞清楚了這裏的HashMap在哪裏被實例化的話,就知道RDD是怎麼產生的!

 1.直接切入主題,進入DStream的getOrCompute方法:

  /**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
* 先根據時間判斷HashMap中是否已存在該時間對應的RDD,若是沒有則調用compute獲得RDD,並放入到HashMap中 */ private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = { // If RDD was already generated, then retrieve it from HashMap, // or else compute the RDD //看緩存中是否有,有的話直接獲取 generatedRDDs.get(time).orElse { // Compute the RDD if time is valid (e.g. correct time in a sliding window) // of RDD generation, else generate nothing. if (isTimeValid(time)) { val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) { // Disable checks for existing output directories in jobs launched by the streaming // scheduler, since we may need to write output to an existing directory during checkpoint // recovery; see SPARK-4835 for more details. We need to have this call here because // compute() might cause Spark jobs to be launched. PairRDDFunctions.disableOutputSpecValidation.withValue(true) { compute(time) //根據時間計算產生RDD } } //rddOption裏面有RDD生成的邏輯,而後生成的RDD,會put到generatedRDDs中 rddOption.foreach { case newRDD => // Register the generated RDD for caching and checkpointing if (storageLevel != StorageLevel.NONE) { newRDD.persist(storageLevel) logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel") } if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) { newRDD.checkpoint() logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing") } generatedRDDs.put(time, newRDD) } rddOption } else { None } } }

進入compute方法,發現其並無具體的實現,說明在其子類中有重寫並生成rdd

  /** Method that generates a RDD for the given time */
  def compute(validTime: Time): Option[RDD[T]]

2.進入ReceiverInputDStream的compute方法:/**   * Generates RDDs with blocks received by the receiver of this stream. */

  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
// receiverTracker跟蹤數據的產生 val receiverTracker = ssc.scheduler.receiverTracker val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty) // Register the input blocks information into InputInfoTracker val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum) ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo) // Create the BlockRDD
// 建立並返回BlockRDD,因爲ReceiverInputDStream沒有父依賴,因此本身生成RDD。
// 若是沒有輸入數據會產生一系列空的RDD
createBlockRDD(validTime, blockInfos) } } Some(blockRDD) }

注意:Spark Streaming實際上在沒有輸入數據的時候仍然會產生RDD(空的BlockRDD),因此能夠在此修改源碼,提高性能。反過來仔細思考一下,流處理實際上就是時間極短的狀況下完成的批處理!!

 

3.再進入MappedDStream的compute方法:

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {
  
//除了第一個DStream產生RDD以外,其餘的DStream都是從前面DStream產生的RDD開始計算 override def dependencies: List[DStream[_]] = List(parent) override def slideDuration: Duration = parent.slideDuration override def compute(validTime: Time): Option[RDD[U]] = {

       //getOrCompute是對RDD進行操做,後面的map就是對RDD進行操做
       //DStream裏面的計算實際上是對RDD進行計算,而mapFunc就是咱們要操做的具體業務邏輯

    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

4.進入ForEachDStream的compute的方法:

  發現其compute方法沒有任何操做,可是重寫了generateJob方法!

 

private[streaming]
class ForEachDStream[T: ClassTag] (
    parent: DStream[T],
    foreachFunc: (RDD[T], Time) => Unit,
    displayInnerRDDOps: Boolean
  ) extends DStream[Unit](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[Unit]] = None

  override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }

              //此時考慮jobFunc中必定有action操做
              //所以jobFunc被調用的時候就會觸發action操做

        Some(new Job(time, jobFunc))
      case None => None
    }
  }
}

5.從Job生成入手,JobGenerator的generateJobs方法,內部調用的DStreamGraph的generateJobs方法:

  /** Generate jobs and perform checkpoint for the given `time`.  */
  private def generateJobs(time: Time) {
    // Set the SparkEnv in this thread, so that job generation code can access the environment
    // Example: BlockRDDs are created in this thread, and it needs to access BlockManager
    // Update: This is probably redundant after threadlocal stuff in SparkEnv has been removed.
    SparkEnv.set(ssc.env)
    Try {
//根據特定的時間獲取具體的數據 jobScheduler.receiverTracker.allocateBlocksToBatch(time) // allocate received blocks to batch
//調用DStreamGraph的generateJobs生成Job graph.generateJobs(time) // generate jobs using allocated block } match { case Success(jobs) => val streamIdToInputInfos = jobScheduler.inputInfoTracker.getInfo(time) jobScheduler.submitJobSet(JobSet(time, jobs, streamIdToInputInfos)) case Failure(e) => jobScheduler.reportError("Error generating jobs for time " + time, e) } eventLoop.post(DoCheckpoint(time, clearCheckpointDataLater = false)) }

  

 DStreamGraph的generateJobs方法調用了OutputStream的generateJob方法,OutputStream就是ForEachDStream:

 def generateJobs(time: Time): Seq[Job] = {
    logDebug("Generating jobs for time " + time)
    val jobs = this.synchronized {
      outputStreams.flatMap { outputStream =>
        val jobOption = outputStream.generateJob(time)
        jobOption.foreach(_.setCallSite(outputStream.creationSite))
        jobOption
      }
    }
    logDebug("Generated " + jobs.length + " jobs for time " + time)
    jobs
  }

  

總結:DStream是RDD的模板,其內部generatedRDDs 保存了每一個BatchDuration時間生成的RDD對象實例。DStream的依賴構成了RDD依賴關係,即從後往前計算時,只要對最後一個DStream計算便可。JobGenerator每隔BatchDuration調用DStreamGraph的generateJobs方法,調用了ForEachDStream的generateJob方法,其內部先調用父DStream的getOrCompute方法來獲取RDD,而後在進行計算,從後往前推,第一個DStream是ReceiverInputDStream,其comput方法中從receiverTracker中獲取對應時間段的metadata信息,而後生成BlockRDD對象,並放入到generatedRDDs中!!

 

特別感謝王家林老師的獨具一格的講解:

王家林老師名片:

中國Spark第一人

新浪微博:http://weibo.com/ilovepains

微信公衆號:DT_Spark

博客:http://blog.sina.com.cn/ilovepains

QQ:1740415547

YY課堂:天天20:00現場授課頻道68917580

相關文章
相關標籤/搜索