流的讀取是從DataStreamReader和DataStreamWriter開始的。sql
DataStreamReader是生成流讀取者的入口所在,關鍵方法是load。這段代碼很關鍵,因此把所有代碼先貼出來,慢慢分析。apache
def load(): DataFrame = { val ds = DataSource.lookupDataSource(source, sparkSession.sqlContext.conf). getConstructor().newInstance() val v1DataSource = DataSource( sparkSession, userSpecifiedSchema = userSpecifiedSchema, className = source, options = extraOptions.toMap) val v1Relation = ds match { case _: StreamSourceProvider => Some(StreamingRelation(v1DataSource)) case _ => None } ds match { case provider: TableProvider => val sessionOptions = DataSourceV2Utils.extractSessionConfigs( source = provider, conf = sparkSession.sessionState.conf) val options = sessionOptions ++ extraOptions val dsOptions = new CaseInsensitiveStringMap(options.asJava) val table = userSpecifiedSchema match { case Some(schema) => provider.getTable(dsOptions, schema) case _ => provider.getTable(dsOptions) } import org.apache.spark.sql.execution.datasources.v2.DataSourceV2Implicits._ table match { case _: SupportsRead if table.supportsAny(MICRO_BATCH_READ, CONTINUOUS_READ) => Dataset.ofRows( sparkSession, StreamingRelationV2( provider, source, table, dsOptions, table.schema.toAttributes, v1Relation)( sparkSession)) // fallback to v1 // TODO (SPARK-27483): we should move this fallback logic to an analyzer rule. case _ => Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource)) } case _ => // Code path for data source v1. Dataset.ofRows(sparkSession, StreamingRelation(v1DataSource)) } }
有好多分支,重要的是區分開V1和V2。session
V1用的邏輯關係是StreamingRelation;而V2用的邏輯關係是StreamingRelationV2。這裏先看看他們對應的物理計劃是什麼?app
在SparkStrategies.scala文件中,定義了物理計劃:ide
/** * This strategy is just for explaining `Dataset/DataFrame` created by `spark.readStream`. * It won't affect the execution, because `StreamingRelation` will be replaced with * `StreamingExecutionRelation` in `StreamingQueryManager` and `StreamingExecutionRelation` will * be replaced with the real relation using the `Source` in `StreamExecution`. */ object StreamingRelationStrategy extends Strategy { def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match { case s: StreamingRelation => StreamingRelationExec(s.sourceName, s.output) :: Nil case s: StreamingExecutionRelation => StreamingRelationExec(s.toString, s.output) :: Nil case s: StreamingRelationV2 => StreamingRelationExec(s.sourceName, s.output) :: Nil case _ => Nil } }
物理計劃都是StreamingRelationExec,StreamingRelationExec的代碼其實啥都沒實現,因此最後其實看代碼註釋StreamingRelationExec也不是真正的物理計劃。函數
這裏先記得相關的類ContinuousExecution和MicroBatchExecution。一時找不到怎麼執行到具體的物理計劃ContinuousExecution和MicroBatchExecution的,咱們就試試反推把。先看看ContinuousExecution的代碼。oop
StreamExecution是抽象類。其抽象方法runActivatedStream是執行具體的連續流讀取任務的,子類會重寫該函數。post
runStream方法封裝了runActivatedStream方法,額外加了些事件通知等處理機制,知道這一點就好了。this
這裏先嚐試看看StreamingQueryManager是幹什麼用的,看註釋應該是管理全部的StreamingQuery的。spa
private def createQuery(...): StreamingQueryWrapper ={ (sink, trigger) match { case (table: SupportsWrite, trigger: ContinuousTrigger) => new StreamingQueryWrapper(new ContinuousExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, table, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop)) case _ => if (operationCheckEnabled) { UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode) } new StreamingQueryWrapper(new MicroBatchExecution( sparkSession, userSpecifiedName.orNull, checkpointLocation, analyzedPlan, sink, trigger, triggerClock, outputMode, extraOptions, deleteCheckpointOnStop)) } }
對於連續流,返回一個:
new StreamingQueryWrapper(new ContinuousExecution))
StreamingQueryWrapper的做用,就是將StreamingQuery封裝成可序列化的,別的和StreamingQuery沒什麼區別。這裏對於連續流就是包裝了ContinuousExecution。
ContinuousExecution看名稱應該是對應連續流的物理執行計劃的,繼承自StreamExecution(抽象類)。看看主要代碼其實就是重寫了runActivatedStream方法。
override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { val stateUpdate = new UnaryOperator[State] { override def apply(s: State) = s match { // If we ended the query to reconfigure, reset the state to active. case RECONFIGURING => ACTIVE case _ => s } } do { runContinuous(sparkSessionForStream) } while (state.updateAndGet(stateUpdate) == ACTIVE) stopSources() }
真正的執行邏輯代碼在私有方法runContinuous中,這裏就不詳細展開了,知道了主要流程就能夠了。
下面就是要看看ContinuousExecution究竟是在哪裏被從邏輯計劃轉換到物理計劃的。
搜索全文,找到了StreamingQueryManager.scala這個文件。對了,就是從上面的StreamingQueryManager找到這個ContinuousExecution。
DataStreamWriter是真正觸發流計算開始啓動執行的地方。
start()方法獲得要給StreamingQuery,方法裏的關鍵代碼片斷:
df.sparkSession.sessionState.streamingQueryManager.startQuery( extraOptions.get("queryName"), extraOptions.get("checkpointLocation"), df, extraOptions.toMap, sink, outputMode, useTempCheckpointLocation = source == "console" || source == "noop", recoverFromCheckpointLocation = true, trigger = trigger)
跟蹤進去到了StreamingQueryManager,看它的startQuery方法。
startQuery方法分爲幾步:
調用createQuery方法返回StreamingQuery。
val query = createQuery( userSpecifiedName, userSpecifiedCheckpointLocation, df, extraOptions, sink, outputMode, useTempCheckpointLocation, recoverFromCheckpointLocation, trigger, triggerClock)
query就是StreamingQueryWrapper,就是相似這樣的代碼:
new StreamingQueryWrapper(new ContinuousExecution))
二、啓動上一步的query
try { query.streamingQuery.start() } catch { }
這裏的代碼直接調用到StreamingQuery的父類StreamExecution的start方法。代碼定義:
def start(): Unit = { logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.") queryExecutionThread.setDaemon(true) queryExecutionThread.start() startLatch.await() // Wait until thread started and QueryStart event has been posted }
queryExecutionThread線程的定義又是這樣的:
val queryExecutionThread: QueryExecutionThread = new QueryExecutionThread(s"stream execution thread for $prettyIdString") { override def run(): Unit = { sparkSession.sparkContext.setCallSite(callSite) runStream() } }
最後在線程中啓動runStream這個私有方法。
三、返回query
最後返回query,注意這裏的query在上面的代碼中已經start運行了。