Spark Streaming源碼解讀之RDD生成全生命週期完全研究和思考

思考:一、 RDD是怎麼生成的,依靠什麼生成
二、執行時是否與Spark Core上的RDD執行有什麼不一樣的
三、 運行以後咱們要怎麼處理。爲何有第三點 : 是由於Spark Streaming 中會隨着相關觸發條件,窗口Window滑動的時候都會不斷的產生RDD ,從最基本的層次考慮,RDD也是基本對象,每秒會產生RDD ,內存能不能徹底容納,每一個處理完成後怎麼進行管理?案例代碼:

import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by hadoop on 2016/4/18.
  * 背景描述
  *
  * 一、DT大數據夢工廠微信公衆號DT_Spark
  * 二、IMF晚8點大數據實戰YY直播頻道號:68917580
  * 三、新浪微博:http://www.weibo.com/ilovepains
  */
object HostSerachTop extends App{

  val conf = new SparkConf().setAppName("SparkStreamingOnHDFS")
  if(args.length == 0) conf.setMaster("local[2]")
  val ssc = new StreamingContext(conf, Seconds(30))
  val blackList = Array(("hadoop", true) , ("mahout", true), ("spark", false))
  val backListRDD = ssc.sparkContext.parallelize(blackList)
  val hosttedStream = ssc.socketTextStream("192.168.74.132", 9000, StorageLevel.MEMORY_AND_DISK_SER_2)

  val searchWord =  hosttedStream.map(_.split(" ")(1)).map(item => (item, 1))

  //val hostedWords = searchWord.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2, Seconds(60))
  val hostedWords = searchWord.reduceByKeyAndWindow((v1: Int, v2: Int) => v1 + v2,(v1: Int, v2: Int) => v1 - v2, Seconds(60), Seconds(20))
  hostedWords.transform(item => {
    val top5 = item.map(pair => (pair._2, pair._1))
      .sortByKey(false)
      .map(pair => (pair._1, pair._2))
    top5
  }).print()
  /**
    * 用戶搜索的格式簡化爲 name item
    */
  ssc.start(10)
  ssc.awaitTermination()
  ssc.stop(true, true)
}

一 、Dstream與RDD關係完全研究

咱們從print方法入口,從源碼出發:apache

/**
   * Print the first num elements of each RDD generated in this DStream. This is an output
   * operator, so this DStream will be registered as an output stream and there materialized.
   */
  def print(num: Int): Unit = ssc.withScope {
    //做用於RDD上的函數
    def foreachFunc: (RDD[T], Time) => Unit = {
      (rdd: RDD[T], time: Time) => {
        val firstNum = rdd.take(num + 1)
        // scalastyle:off println
        println("-------------------------------------------")
        println("Time: " + time)
        println("-------------------------------------------")
        firstNum.take(num).foreach(println)
        if (firstNum.length > num) println("...")
        println()
        // scalastyle:on println
      }
    }
    //生成ForEachDStream而且註冊輸出流
    foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
  }

繼續跟蹤源碼:微信

private def foreachRDD(
      foreachFunc: (RDD[T], Time) => Unit,
      displayInnerRDDOps: Boolean): Unit = {
    //生成ForEachDStream,而且傳入foreachFunc(就是以前咱們調用print方法中的foreachFunc,以後會做用於RDD上)
    new ForEachDStream(this,
      context.sparkContext.clean(foreachFunc, false), displayInnerRDDOps).register()
  }

咱們繼續看ForEachDStream中的generateJob中的方法,能夠看到這裏是做用於RDD之上的,對RDD調用action算子就會觸發job的提交,到此咱們有了初步的印象,ForEachDStream是最終的RDD,咱們傳入的行動算子會做用於ForEachDStream中的RDD,下面咱們深刻的探討這個RDD是怎麼生成的。app

2、Streaming中的RDD的產生完全研究

從上面的探討咱們知道Dstream最終生成了RDD用來觸發做業的執行:socket

override def generateJob(time: Time): Option[Job] = {
    parent.getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => createRDDWithLocalProperties(time, displayInnerRDDOps) {
          foreachFunc(rdd, time)
        }
        //將做用於RDD上的函數封裝成一個JOB,因此咱們若是不對RDD調用RDD的行動算子是不會向SparkContext提交任務的
        //Dstream只負責生成RDD並封裝成JOB
        Some(new Job(time, jobFunc))
      case None => None
    }
  }

前面咱們探討了generateJob會被JobScheduler調用並執行JOB中的函數,也就是jobFunc這個函數,也就是說生成任務的時候RDD已經生成了而且是最後一個RDD。咱們關注parent.getOrCompute這個方法:ide

/**
   * Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
   */
  private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
    // If RDD was already generated, then retrieve it from HashMap,
    // or else compute the RDD
    generatedRDDs.get(time).orElse {
      // Compute the RDD if time is valid (e.g. correct time in a sliding window)
      // of RDD generation, else generate nothing.
      if (isTimeValid(time)) {

        val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
          //繼續回溯調用compute方法
          PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
            compute(time)
          }
        }

        rddOption.foreach { case newRDD =>
          // Register the generated RDD for caching and checkpointing
          if (storageLevel != StorageLevel.NONE) {
            newRDD.persist(storageLevel)
            logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
          }
          if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
            newRDD.checkpoint()
            logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
          }
          //生成RDD,注意每一個Dstream都有本身的RDD
          generatedRDDs.put(time, newRDD)
        }
        rddOption
      } else {
        None
      }
    }
  }

 以咱們的代碼爲爲例最後一個Dstream是ForEachDStream,而它依賴的Dstream是ShuffledDStream..圖:函數

最終落到SocketInputDstream,而每一個Dstream調用compute方法時都是調用Dstream中的getOrCompute,這樣回溯Dstream,也就是每一個Dstream都有本身的generatedRDDs,也就是說咱們對Dstream上的操做,最終生成了RDD,從最開始的RDD開始,做用一個一個算子,例如map、flatMap等函數,生成了對應的RDD,而咱們最終只須要最後一個RDD就能計算了,RDD記錄了Lineage。下面咱們研究generatedRDDs究竟是怎麼被實例化的?oop

因爲數據來源的Dstream是ReceiverInputDStream,咱們從ReceiverInputDStream開始,當咱們對ReceiverInputDStream調用compute方法時,會調用它的父類的ReceiverInputDStream中的compute方法:大數據

/**
   * Generates RDDs with blocks received by the receiver of this stream. */
  override def compute(validTime: Time): Option[RDD[T]] = {
    val blockRDD = {

      if (validTime < graph.startTime) {
        // If this is called for any time before the start time of the context,
        // then this returns an empty RDD. This may happen when recovering from a
        // driver failure without any write ahead log to recover pre-failure data.
        new BlockRDD[T](ssc.sc, Array.empty)
      } else {
        // Otherwise, ask the tracker for all the blocks that have been allocated to this stream
        // for this batch
        val receiverTracker = ssc.scheduler.receiverTracker
        
        val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)

        // Register the input blocks information into InputInfoTracker
        val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
        //根據BlockInfos 生成RDD,BlockInfos 信息是receiverTracker中保存的
        // Create the BlockRDD
        createBlockRDD(validTime, blockInfos)
      }
    }
    Some(blockRDD)
  }

 createBlockRDD:ui

private[streaming] def createBlockRDD(time: Time, blockInfos: Seq[ReceivedBlockInfo]): RDD[T] = {

    if (blockInfos.nonEmpty) {
      val blockIds = blockInfos.map { _.blockId.asInstanceOf[BlockId] }.toArray

      // Are WAL record handles present with all the blocks
      val areWALRecordHandlesPresent = blockInfos.forall { _.walRecordHandleOption.nonEmpty }

      if (areWALRecordHandlesPresent) {
        // If all the blocks have WAL record handle, then create a WALBackedBlockRDD
        val isBlockIdValid = blockInfos.map { _.isBlockIdValid() }.toArray
        val walRecordHandles = blockInfos.map { _.walRecordHandleOption.get }.toArray
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, blockIds, walRecordHandles, isBlockIdValid)
      } else {
        // Else, create a BlockRDD. However, if there are some blocks with WAL info but not
        // others then that is unexpected and log a warning accordingly.
        if (blockInfos.find(_.walRecordHandleOption.nonEmpty).nonEmpty) {
          if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
            logError("Some blocks do not have Write Ahead Log information; " +
              "this is unexpected and data may not be recoverable after driver failures")
          } else {
            logWarning("Some blocks have Write Ahead Log information; this is unexpected")
          }
        }
        val validBlockIds = blockIds.filter { id =>
          ssc.sparkContext.env.blockManager.master.contains(id)
        }
        if (validBlockIds.size != blockIds.size) {
          logWarning("Some blocks could not be recovered as they were not found in memory. " +
            "To prevent such data loss, enabled Write Ahead Log (see programming guide " +
            "for more details.")
        }
        new BlockRDD[T](ssc.sc, validBlockIds)
      }
    } else {
      // If no block is ready now, creating WriteAheadLogBackedBlockRDD or BlockRDD
      // according to the configuration
      if (WriteAheadLogUtils.enableReceiverLog(ssc.conf)) {
        new WriteAheadLogBackedBlockRDD[T](
          ssc.sparkContext, Array.empty, Array.empty, Array.empty)
      } else {
        new BlockRDD[T](ssc.sc, Array.empty)
      }
    }
  }

上面就是最開始的RDD了,咱們思考一個問題算子怎麼做用到最開始的RDD之上?this

class MappedDStream[T: ClassTag, U: ClassTag] (
    parent: DStream[T],
    mapFunc: T => U
  ) extends DStream[U](parent.ssc) {

  override def dependencies: List[DStream[_]] = List(parent)

  override def slideDuration: Duration = parent.slideDuration

  override def compute(validTime: Time): Option[RDD[U]] = {
    parent.getOrCompute(validTime).map(_.map[U](mapFunc))
  }
}

也就是說每次調用父類的compute函數的時候都會生成當前Dstream的RDD,而且做用於咱們傳入的算子,返回子類須要的RDD,最終咱們對最後一個RDD調用行動算子來,觸發做業的提交!

相關文章
相關標籤/搜索