Spark sql流式處理的主流程分析

Spark sql支持流式處理,流式處理有Source,Sink。Source定義了流的源頭,Sink定義了流的目的地,流的執行是從Sink開始觸發的。sql

Dataset的writeStream定義了流的目的地並觸發流的真正執行,因此分析就從writeStream開始。session

writeStream = new DataStreamWriter[T](this)app

DataStreamWriterdom

DataStreamWriter的做用是將入參的dataset寫入到外部存儲,好比kafka,database,txt等。oop

主要觸發方法是start方法,返回一個StreamingQuery對象,代碼:this

def start(): StreamingQuery = {    
    if (source == "memory") {
      assertNotPartitioned("memory")
      val (sink, resultDf) = trigger match {
        case _: ContinuousTrigger =>
          val s = new MemorySinkV2()
          val r = Dataset.ofRows(df.sparkSession, new MemoryPlanV2(s, df.schema.toAttributes))
          (s, r)
        case _ =>
          val s = new MemorySink(df.schema, outputMode)
          val r = Dataset.ofRows(df.sparkSession, new MemoryPlan(s))
          (s, r)
      }
      val chkpointLoc = extraOptions.get("checkpointLocation")
      val recoverFromChkpoint = outputMode == OutputMode.Complete()
      val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
        extraOptions.get("queryName"),
        chkpointLoc,
        df,
        extraOptions.toMap,
        sink,
        outputMode,
        useTempCheckpointLocation = true,
        recoverFromCheckpointLocation = recoverFromChkpoint,
        trigger = trigger)
      resultDf.createOrReplaceTempView(query.name)
      query
    } else if (source == "foreach") {
      assertNotPartitioned("foreach")
      val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
      df.sparkSession.sessionState.streamingQueryManager.startQuery(
        extraOptions.get("queryName"),
        extraOptions.get("checkpointLocation"),
        df,
        extraOptions.toMap,
        sink,
        outputMode,
        useTempCheckpointLocation = true,
        trigger = trigger)
    } else {
      val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
      val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
      val sink = ds.newInstance() match {
        case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
        case _ =>
          val ds = DataSource(
            df.sparkSession,
            className = source,
            options = extraOptions.toMap,
            partitionColumns = normalizedParCols.getOrElse(Nil))
          ds.createSink(outputMode)
      }

      df.sparkSession.sessionState.streamingQueryManager.startQuery(
        extraOptions.get("queryName"),
        extraOptions.get("checkpointLocation"),
        df,
        extraOptions.toMap,
        sink,
        outputMode,
        useTempCheckpointLocation = source == "console",
        recoverFromCheckpointLocation = true,
        trigger = trigger)
    }
  }

咱們這裏看最後一個條件分支的代碼,ds是對應的DataSource,sink有時候就是ds。最後經過streamingQueryManager的startQuery啓動流的計算,返回計算中的StreamingQuery對象。spa

streamingQueryManager的startQuery方法裏主要調用createQuery方法建立StreamingQueryWrapper對象,這是個私有方法:線程

private def createQuery(
      userSpecifiedName: Option[String],
      userSpecifiedCheckpointLocation: Option[String],
      df: DataFrame,
      extraOptions: Map[String, String],
      sink: BaseStreamingSink,
      outputMode: OutputMode,
      useTempCheckpointLocation: Boolean,
      recoverFromCheckpointLocation: Boolean,
      trigger: Trigger,
      triggerClock: Clock): StreamingQueryWrapper = {
    var deleteCheckpointOnStop = false
    val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
      new Path(userSpecified).toUri.toString
    }.orElse {
      df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
        new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
      }
    }.getOrElse {
      if (useTempCheckpointLocation) {
        // Delete the temp checkpoint when a query is being stopped without errors.
        deleteCheckpointOnStop = true
        Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
      } else {
        throw new AnalysisException(
          "checkpointLocation must be specified either " +
            """through option("checkpointLocation", ...) or """ +
            s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
      }
    }

    // If offsets have already been created, we trying to resume a query.
    if (!recoverFromCheckpointLocation) {
      val checkpointPath = new Path(checkpointLocation, "offsets")
      val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
      if (fs.exists(checkpointPath)) {
        throw new AnalysisException(
          s"This query does not support recovering from checkpoint location. " +
            s"Delete $checkpointPath to start over.")
      }
    }

    val analyzedPlan = df.queryExecution.analyzed
    df.queryExecution.assertAnalyzed()

    if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
      UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
    }

    if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
      logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
          "is not supported in streaming DataFrames/Datasets and will be disabled.")
    }

    (sink, trigger) match {
      case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
        UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
        new StreamingQueryWrapper(new ContinuousExecution(
          sparkSession,
          userSpecifiedName.orNull,
          checkpointLocation,
          analyzedPlan,
          v2Sink,
          trigger,
          triggerClock,
          outputMode,
          extraOptions,
          deleteCheckpointOnStop))
      case _ =>
        new StreamingQueryWrapper(new MicroBatchExecution(
          sparkSession,
          userSpecifiedName.orNull,
          checkpointLocation,
          analyzedPlan,
          sink,
          trigger,
          triggerClock,
          outputMode,
          extraOptions,
          deleteCheckpointOnStop))
    }
  }

它根據是否連續流操做仍是微批處理操做分紅ContinuousExecution和MicroBatchExecution,他們都是StreamExecution的子類,StreamExecution是流處理的抽象類。稍後會分析StreamExecution的類結構。code

ContinuousExecution和MicroBatchExecution二者的代碼結構和功能實際上是很相似的,咱們先拿ContinuousExecution舉例吧。orm

ContinuousExecution

首先ContinuousExecution是沒有結束的,是沒有結束的流,當暫時流沒有數據時,ContinuousExecution會阻塞線程等待新數據的到來,這是經過awaitEpoch方法來控制的。

其實,commit方法在每條數據處理完後被觸發,commit方法將當前處理完成的偏移量(offset)寫到commitLog中。

再看logicalPlan,在ContinuousExecution中入參的邏輯計劃是StreamingRelationV2類型,會被轉換成ContinuousExecutionRelation類型的LogicalPlan:

 analyzedPlan.transform {
      case r @ StreamingRelationV2(
          source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
        toExecutionRelationMap.getOrElseUpdate(r, {
          ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
        })

}

還有addOffset方法,在每次讀取完offset以後會將當前的讀取offset寫入到offsetLog中,以便下次恢復時知道從哪裏開始。addOffset和commit兩個方法一塊兒保證了Exactly-once語義的執行。

最後看看runActivatedStream方法,這是私有方法,它又調用runContinuous方法來讀取及處理無限流,具體的流程就不看了,感興趣的能夠慢慢看代碼。

MicroBatchExecution

MicroBatchExecution也不看了,流程相似,直接看代碼吧。

相關文章
相關標籤/搜索