StreamingContext啓動流程及Dtream 模板源碼剖析-SparkStreaming商業環境實戰

本套系列博客從真實商業環境抽取案例進行總結和分享,並給出Spark源碼解讀及商業實戰指導,請持續關注本套博客。版權聲明:本套Spark源碼解讀及商業實戰歸做者(秦凱新)全部,禁止轉載,歡迎學習。算法

1 SparkStreaming is What?

SparkStreaming 是基於批處理的流式計算平臺,目前默認是200ms的間隔。SparkStreaming 會把數據流封裝成一個個批次,而後把多個批次的數據轉換成RDD,並交由BlockManger管理,最終以任務的方式進行提交DAG有向無環圖。apache

  • 離散流DStrem : DStrem能夠認爲是沒有邊界的集合,沒有大小的限制。由於包含了時間的維度,所以能夠看作是時空模型。固然DStrem也是一個邏輯的概念,由於SparkStreaming徹底基於Dstream來構建算子體系,依賴關係。可是請注意,Dstream 又是抽象的,計算過程最終都會轉化爲RDD來實現,因此RDD做爲最底層的基礎,是具備物理意義的。
  • 隨着時間的流逝,Dstrem會不斷的產生RDD,所以對Dstrem的操做就是在固定時間上對RDD的操做。

2 StreamingContext入口類的真面目

2.1 StreamingContext英文詳細解釋

* Main entry point for Spark Streaming functionality. It provides methods used to create
 * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either
 * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
 * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
 * The associated SparkContext can be accessed using `context.sparkContext`. After
 * creating and transforming DStreams, the streaming computation can be started and stopped
 * using `context.start()` and `context.stop()`, respectively.
 * `context.awaitTermination()` allows the current thread to wait for the termination
 * of the context by `stop()` or by an exception.
複製代碼

2.1 SparkStreaming基本案例與深層次剖析

  • SparkStreaming基本案例緩存

    object NetworkWordCount {
        def main(args: Array[String]) {
          if (args.length < 2) {
            System.err.println("Usage: NetworkWordCount <hostname> <port>")
            System.exit(1)
          }
      
          StreamingExamples.setStreamingLogLevels()
          
          // Create the context with a 1 second batch size
          val sparkConf = new SparkConf().setAppName("NetworkWordCount")
          val ssc = new StreamingContext(sparkConf, Seconds(1))
      
          // Create a socket stream on target ip:port and count the
          // words in input stream of \n delimited text (eg. generated by 'nc')
          // Note that no duplication in storage level only for running locally.
          // Replication necessary in distributed scenario for fault tolerance.
          val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
          val words = lines.flatMap(_.split(" "))
          val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
          wordCounts.print()
          ssc.start()
          ssc.awaitTermination()
        }
      }
    複製代碼
  • StreamingContext到SparkContext轉換架構

    def this(conf: SparkConf, batchDuration: Duration) = {
          this(StreamingContext.createNewSparkContext(conf), null, batchDuration)
        }
        
      private[streaming] def createNewSparkContext(conf: SparkConf): SparkContext = {
          new SparkContext(conf)
        }
    複製代碼
  • socketTextStream依賴關係app

(1)超級父類邏輯模板英文專業講解:框架

* A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
     * sequence of RDDs (of the same type) representing a continuous stream of data (see
     * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
     * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
     * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
     * transforming existing DStreams using operations such as `map`,
     * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
     * periodically generates a RDD, either from live data or by transforming the RDD generated by a
     * parent DStream.
     *
     * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
     * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
     * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
     * `join`. These operations are automatically available on any DStream of pairs
     * (e.g., DStream[(Int, Int)] through implicit conversions.
     *
     * A DStream internally is characterized by a few basic properties:
     *  - A list of other DStreams that the DStream depends on
     *  - A time interval at which the DStream generates an RDD
     *  - A function that is used to generate an RDD after each time interval
複製代碼

(2) Dstream源碼段摘錄socket

除了第一個Dstream,後面的Dstream都要依賴前面的Dstream。
Dstream在每個時間間隔(intrval)都會生成一個RDD
abstract class DStream[T: ClassTag] (
        @transient private[streaming] var ssc: StreamingContext
      ) extends Serializable with Logging {
    
      validateAtInit()
    
      // =======================================================================
      // Methods that should be implemented by subclasses of DStream
      // =======================================================================
    
      /** Time interval after which the DStream generates an RDD */
      def slideDuration: Duration
    
      /** List of parent DStreams on which this DStream depends on */
      def dependencies: List[DStream[_]]
    
      /** Method that generates an RDD for the given time */
      def compute(validTime: Time): Option[RDD[T]]
    
      // =======================================================================
      // Methods and fields available on all DStreams
      // =======================================================================
複製代碼
generatedRDD表示Dstream在每個批次所相應生成的RDD
// RDDs generated, marked as private[streaming] so that testsuites can access it
      @transient
      private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
    
      // Time zero for the DStream
      private[streaming] var zeroTime: Time = null
    
      // Duration for which the DStream will remember each RDD created
      private[streaming] var rememberDuration: Duration = null
    
      // Storage level of the RDDs in the stream
      private[streaming] var storageLevel: StorageLevel = StorageLevel.NONE
複製代碼
計算函數getOrCompute
* Get the RDD corresponding to the given time; either retrieve it from cache
   * or compute-and-cache it.
      private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
        // If RDD was already generated, then retrieve it from HashMap,
        // or else compute the RDD
        generatedRDDs.get(time).orElse {
          // Compute the RDD if time is valid (e.g. correct time in a sliding window)
          // of RDD generation, else generate nothing.
          if (isTimeValid(time)) {
    
            val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
              // Disable checks for existing output directories in jobs launched by the streaming
              // scheduler, since we may need to write output to an existing directory during checkpoint
              // recovery; see SPARK-4835 for more details. We need to have this call here because
              // compute() might cause Spark jobs to be launched.
              SparkHadoopWriterUtils.disableOutputSpecValidation.withValue(true) {
                compute(time)
              }
            }
            rddOption.foreach { case newRDD =>
              // Register the generated RDD for caching and checkpointing
              if (storageLevel != StorageLevel.NONE) {
                newRDD.persist(storageLevel)
                logDebug(s"Persisting RDD ${newRDD.id} for time $time to $storageLevel")
              }
              if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
                newRDD.checkpoint()
                logInfo(s"Marking RDD ${newRDD.id} for time $time for checkpointing")
              }
              generatedRDDs.put(time, newRDD)
            }
            rddOption
          } else {
            None
          }
        }
      }
複製代碼
Job生成函數generateJob
* Generate a SparkStreaming job for the given time. This is an internal method that
   * should not be called directly. This default implementation creates a job
   * that materializes the corresponding RDD. Subclasses of DStream may override this
   * to generate their own jobs.
   
  private[streaming] def generateJob(time: Time): Option[Job] = {
    getOrCompute(time) match {
      case Some(rdd) =>
        val jobFunc = () => {
          val emptyFunc = { (iterator: Iterator[T]) => {} }
          context.sparkContext.runJob(rdd, emptyFunc)
        }
        Some(new Job(time, jobFunc))
      case None => None
    }
  }
複製代碼

2.2 socketTextStream依賴關係

2.3 DStream及其全部子類及Dstream的Action觸發

目前Dstream的輸出觸發操做有:print,saveAsTextFiles,saveAsObjectFiles,saveAsHadoopFiles, foreachRDD。而這些輸出觸發操做會生成ForeachDStream對象。並註冊到DStreamGraph的成員outputStreams中。ide

final private[streaming] class DStreamGraph extends Serializable with Logging {
  private val inputStreams = new ArrayBuffer[InputDStream[_]]()
  private val outputStreams = new ArrayBuffer[DStream[_]]()
複製代碼

而StreamingContext中,DStreamGraph 是重要的成員,專門負責action操做。函數

private[streaming] val graph: DStreamGraph = {
    if (isCheckpointPresent) {
      _cp.graph.setContext(this)
      _cp.graph.restoreCheckpointData()
      _cp.graph
    } else {
      require(_batchDur != null, "Batch duration for StreamingContext cannot be null")
      val newGraph = new DStreamGraph()
      newGraph.setBatchDuration(_batchDur)
      newGraph
    }
  }
複製代碼

2.3 DStreamGraph的回溯

各個Dstream對象的依賴關係和操做算子最終如何串成一條串呢?DStreamGraph會利用outputStreams進行回溯並生成Job,當StreamingContext啓動的時候,纔會真正執行算法鏈條。oop

3 ForeachDStream 與ForeachRDD 生死相依

能夠看到Dstream抽象父類模板的print函數實際上會定義ForeachFuc 和 ForeachRDD ,ForeachRDD中包含了ForeachDstream,而這個ForeachDstream最終會註冊到StreamingContext.

  • print 函數

    * Print the first num elements of each RDD generated in this DStream. This is an output
       * operator, so this DStream will be registered as an output stream and there materialized.
        def print(num: Int): Unit = ssc.withScope {
          def foreachFunc: (RDD[T], Time) => Unit = {
            (rdd: RDD[T], time: Time) => {
              val firstNum = rdd.take(num + 1)
              // scalastyle:off println
              println("-------------------------------------------")
              println(s"Time: $time")
              println("-------------------------------------------")
              firstNum.take(num).foreach(println)
              if (firstNum.length > num) println("...")
              println()
              // scalastyle:on println
            }
          }
          foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
        }
    複製代碼
  • saveAsTextFiles 函數

    def saveAsTextFiles(prefix: String, suffix: String = ""): Unit = ssc.withScope {
          val saveFunc = (rdd: RDD[T], time: Time) => {
            val file = rddToFileName(prefix, suffix, time)
            rdd.saveAsTextFile(file)
          }
          this.foreachRDD(saveFunc, displayInnerRDDOps = false)
        }
    複製代碼
  • register 函數

    * Register this streaming as an output stream. This would ensure that RDDs of this
       * DStream will be generated.
        private[streaming] def register(): DStream[T] = {
          ssc.graph.addOutputStream(this)
          this
        }
    複製代碼

4 總結 InputStream 和 OutputStream

  • InputStream 定義了業務部分的數據源的處理邏輯。可是接收器Receiver纔是最終的水流的開關,在Executor上Receiver接收流數據,而後緩衝起來,積累成塊,而後交由BlockManager管理。最終分配給相應處理時間間隔的Job。
  • 在Driver端須要有一個跟蹤器ReceiverTracker,而這個跟蹤器會不斷監督Executor啓動Receiver,(好比:發送Receiver Rdd 給Executor, 注意是把Receiver做爲RDD給Executor),同時管理待分配給Job的數據Block的元數據。

5 StreamingContext 啓動祕籍

5.1 StreamingContext 啓動使得JobScheduler聯動(好基友)

private[streaming] val scheduler = new JobScheduler(this)

def start(): Unit = synchronized {
    state match {
      case INITIALIZED =>
        startSite.set(DStream.getCreationSite())
        StreamingContext.ACTIVATION_LOCK.synchronized {
          StreamingContext.assertNoOtherContextIsActive()
          try {
            validate()
            // Start the streaming scheduler in a new thread, so that thread local properties
            // like call sites and job groups can be reset without affecting those of the
            // current thread.
            ThreadUtils.runInNewThread("streaming-start") {
              sparkContext.setCallSite(startSite.get)
              sparkContext.clearJobGroup()
              sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL, "false")
              savedProperties.set(SerializationUtils.clone(sparkContext.localProperties.get()))
              
              scheduler.start()     <= 神來之筆
              
            }
            state = StreamingContextState.ACTIVE
            scheduler.listenerBus.post(
              StreamingListenerStreamingStarted(System.currentTimeMillis()))
          } catch {
            case NonFatal(e) =>
              logError("Error starting the context, marking it as stopped", e)
              scheduler.stop(false)
              state = StreamingContextState.STOPPED
              throw e
          }
          StreamingContext.setActiveContext(this)
        }
        logDebug("Adding shutdown hook") // force eager creation of logger
        shutdownHookRef = ShutdownHookManager.addShutdownHook(
          StreamingContext.SHUTDOWN_HOOK_PRIORITY)(() => stopOnShutdown())
        // Registering Streaming Metrics at the start of the StreamingContext
        assert(env.metricsSystem != null)
        env.metricsSystem.registerSource(streamingSource)
        uiTab.foreach(_.attach())
        logInfo("StreamingContext started")
      case ACTIVE =>
        logWarning("StreamingContext has already been started")
      case STOPPED =>
        throw new IllegalStateException("StreamingContext has already been stopped")
    }
  }
複製代碼

5.2 JobScheduler聯動後坐擁乾坤

  • Driver端:啓動receiverTracker => 用於數據接收,數據緩存,Block生成

  • Driver端:啓動jobGenerator => 用於DstreamGraph初始化,Dstream與RDD的轉換,生成Job,提交執行

    def start(): Unit = synchronized {
          if (eventLoop != null) return // scheduler has already been started
      
      logDebug("Starting JobScheduler")
      eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
        override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)
    
        override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
      }
      
      eventLoop.start()                                <= 神來之筆,耳聽八方
    
      // attach rate controllers of input streams to receive batch completion updates
      for { 
      
        inputDStream <- ssc.graph.getInputStreams      <= 神來之筆
       
        rateController <- inputDStream.rateController
      } ssc.addStreamingListener(rateController)
    
      listenerBus.start()
      
      receiverTracker = new ReceiverTracker(ssc)        <= 神來之筆
      
      inputInfoTracker = new InputInfoTracker(ssc)  <= 神來之筆 用於管理全部的輸入流以及輸入的數據統計
    
      val executorAllocClient: ExecutorAllocationClient = ssc.sparkContext.schedulerBackend match {
        case b: ExecutorAllocationClient => b.asInstanceOf[ExecutorAllocationClient]
        case _ => null
      }
    
      executorAllocationManager = ExecutorAllocationManager.createIfEnabled(
        executorAllocClient,
        receiverTracker,
        ssc.conf,
        ssc.graph.batchDuration.milliseconds,
        clock)
      executorAllocationManager.foreach(ssc.addStreamingListener)
      
      receiverTracker.start()                         <= 神來之筆
      jobGenerator.start()                            <= 神來之筆
      
      executorAllocationManager.foreach(_.start())
      logInfo("Started JobScheduler")
    }
    複製代碼

5.3 JobScheduler的耳聽八方

private var eventLoop: EventLoop[JobSchedulerEvent] = null

    eventLoop = new EventLoop[JobSchedulerEvent]("JobScheduler") {
      override protected def onReceive(event: JobSchedulerEvent): Unit = processEvent(event)

      override protected def onError(e: Throwable): Unit = reportError("Error in job scheduler", e)
    }
複製代碼
  • JobStarted

  • JobCompleted

  • ErrorReported

    private def processEvent(event: JobSchedulerEvent) {
          try {
            event match {
              case JobStarted(job, startTime) => handleJobStart(job, startTime)
              case JobCompleted(job, completedTime) => handleJobCompletion(job, completedTime)
              case ErrorReported(m, e) => handleError(m, e)
            }
          } catch {
            case e: Throwable =>
              reportError("Error in job scheduler", e)
          }
        }
    複製代碼

6 總結

本文重點解剖了StreamingContext啓動流程及Dtream 模板源碼,沒有參考任何網上博客,鄭重聲明爲原創內容,禁止轉載或用於商業用途。

秦凱新 於深圳 2018

相關文章
相關標籤/搜索