Spark sql支持流式處理,流式處理有Source,Sink。Source定義了流的源頭,Sink定義了流的目的地,流的執行是從Sink開始觸發的。sql
Dataset的writeStream定義了流的目的地並觸發流的真正執行,因此分析就從writeStream開始。session
writeStream = new DataStreamWriter[T](this)app
DataStreamWriterdom
DataStreamWriter的做用是將入參的dataset寫入到外部存儲,好比kafka,database,txt等。oop
主要觸發方法是start方法,返回一個StreamingQuery對象,代碼:this
def start(): StreamingQuery = { if (source == "memory") { assertNotPartitioned("memory") val (sink, resultDf) = trigger match { case _: ContinuousTrigger => val s = new MemorySinkV2() val r = Dataset.ofRows(df.sparkSession, new MemoryPlanV2(s, df.schema.toAttributes)) (s, r) case _ => val s = new MemorySink(df.schema, outputMode) val r = Dataset.ofRows(df.sparkSession, new MemoryPlan(s)) (s, r) } val chkpointLoc = extraOptions.get("checkpointLocation") val recoverFromChkpoint = outputMode == OutputMode.Complete() val query = df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), chkpointLoc, df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = true, recoverFromCheckpointLocation = recoverFromChkpoint, trigger = trigger) resultDf.createOrReplaceTempView(query.name) query } else if (source == "foreach") { assertNotPartitioned("foreach") val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc) df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = true, trigger = trigger) } else { val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf) val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",") val sink = ds.newInstance() match { case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w case _ => val ds = DataSource( df.sparkSession, className = source, options = extraOptions.toMap, partitionColumns = normalizedParCols.getOrElse(Nil)) ds.createSink(outputMode) } df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = source == "console", recoverFromCheckpointLocation = true, trigger = trigger) } }
咱們這裏看最後一個條件分支的代碼,ds是對應的DataSource,sink有時候就是ds。最後經過streamingQueryManager的startQuery啓動流的計算,返回計算中的StreamingQuery對象。spa
streamingQueryManager的startQuery方法裏主要調用createQuery方法建立StreamingQueryWrapper對象,這是個私有方法:線程
private def createQuery( userSpecifiedName: Option[String], userSpecifiedCheckpointLocation: Option[String], df: DataFrame, extraOptions: Map[String, String], sink: BaseStreamingSink, outputMode: OutputMode, useTempCheckpointLocation: Boolean, recoverFromCheckpointLocation: Boolean, trigger: Trigger, triggerClock: Clock): StreamingQueryWrapper = { var deleteCheckpointOnStop = false val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified => new Path(userSpecified).toUri.toString }.orElse { df.sparkSession.sessionState.conf.checkpointLocation.map { location => new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString } }.getOrElse { if (useTempCheckpointLocation) { // Delete the temp checkpoint when a query is being stopped without errors. deleteCheckpointOnStop = true Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath } else { throw new AnalysisException( "checkpointLocation must be specified either " + """through option("checkpointLocation", ...) or """ + s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""") } } // If offsets have already been created, we trying to resume a query. if (!recoverFromCheckpointLocation) { val checkpointPath = new Path(checkpointLocation, "offsets") val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf()) if (fs.exists(checkpointPath)) { throw new AnalysisException( s"This query does not support recovering from checkpoint location. " + s"Delete $checkpointPath to start over.") } } val analyzedPlan = df.queryExecution.analyzed df.queryExecution.assertAnalyzed() if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) } if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) { logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " + "is not supported in streaming DataFrames/Datasets and will be disabled.") } (sink, trigger) match { case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) => UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) new StreamingQueryWrapper(new ContinuousExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, v2Sink, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop)) case _ => new StreamingQueryWrapper(new MicroBatchExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, sink, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop)) } }
它根據是否連續流操做仍是微批處理操做分紅ContinuousExecution和MicroBatchExecution,他們都是StreamExecution的子類,StreamExecution是流處理的抽象類。稍後會分析StreamExecution的類結構。code
ContinuousExecution和MicroBatchExecution二者的代碼結構和功能實際上是很相似的,咱們先拿ContinuousExecution舉例吧。orm
ContinuousExecution
首先ContinuousExecution是沒有結束的,是沒有結束的流,當暫時流沒有數據時,ContinuousExecution會阻塞線程等待新數據的到來,這是經過awaitEpoch方法來控制的。
其實,commit方法在每條數據處理完後被觸發,commit方法將當前處理完成的偏移量(offset)寫到commitLog中。
再看logicalPlan,在ContinuousExecution中入參的邏輯計劃是StreamingRelationV2類型,會被轉換成ContinuousExecutionRelation類型的LogicalPlan:
analyzedPlan.transform {
case r @ StreamingRelationV2(
source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
toExecutionRelationMap.getOrElseUpdate(r, {
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
})
}
還有addOffset方法,在每次讀取完offset以後會將當前的讀取offset寫入到offsetLog中,以便下次恢復時知道從哪裏開始。addOffset和commit兩個方法一塊兒保證了Exactly-once語義的執行。
最後看看runActivatedStream方法,這是私有方法,它又調用runContinuous方法來讀取及處理無限流,具體的流程就不看了,感興趣的能夠慢慢看代碼。
MicroBatchExecution
MicroBatchExecution也不看了,流程相似,直接看代碼吧。