SparkSession結構化流處理最後實際上是經過DataSet的writeStream觸發執行的。這點與傳統的spark sql方式是不同的。writeStream會找到StreamingQueryManager的startQuery方法,而後一步步到MicroBatchExecution和ContinuousExecution。java
核心點:MicroBatchExecution和ContinuousExecution裏面會對StreamingRelationV2進行轉換,轉換成StreamingDataSourceV2Relation。而MicroBatchExecution和ContinuousExecution只有在StreamingQueryManager的createQuery方法中才會被使用到。那麼這個StreamingQueryManager的createQuery方法會在哪裏被使用到呢?跟蹤代碼會發現是DataStreamWriter中調用StreamingQueryManager的startQuery方法進而調用到createQuery方法的。sql
而DataStreamWriter是Dataset的writeStream建立的。session
【以上說的是寫入流的過程】。框架
關鍵類:BaseSessionStateBuilder,裏面有analyzer的定義。ide
protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) { override val extendedResolutionRules: Seq[Rule[LogicalPlan]] = new FindDataSourceTable(session) +: new ResolveSQLOnFile(session) +: new FallBackFileSourceV2(session) +: DataSourceResolution(conf, this.catalogManager) +: customResolutionRules override val postHocResolutionRules: Seq[Rule[LogicalPlan]] = new DetectAmbiguousSelfJoin(conf) +: PreprocessTableCreation(session) +: PreprocessTableInsertion(conf) +: DataSourceAnalysis(conf) +: customPostHocResolutionRules override val extendedCheckRules: Seq[LogicalPlan => Unit] = PreWriteCheck +: PreReadCheck +: HiveOnlyCheck +: TableCapabilityCheck +: customCheckRules }
這裏沒有特別須要關注的,先忽略。post
DataSourceV2是指spark中V2版本的結構化流處理引擎框架。這裏說的邏輯計劃就是StreamingDataSourceV2Relation,對應的物理計劃分紅兩類:MicroBatchScanExec和ContinuousScanExec,二者的應用場景從取名上就能夠分辨出來,一個是微批處理模式;另外一個則是連續流模式。ui
咱們先從物理計劃開始解析。this
這兩個物理計劃基於同一個父類:DataSourceV2ScanExecBase,先看看父類的代碼:spa
關鍵代碼:code
override def doExecute(): RDD[InternalRow] = { val numOutputRows = longMetric("numOutputRows") inputRDD.map { r => numOutputRows += 1 r } }
子類須要重寫inputRDD。
兩種重要的checkpoint屬性:
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
offsetLog是當前讀取到哪一個offset了,commitLog是當前處理到哪一個Offset了。這兩個Log很是重要,合在一塊兒保證了Exactly-once語義。
好了,先看看MicroBatchScanExec是怎麼重寫inputRDD的。
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end) override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory() override lazy val inputRDD: RDD[InternalRow] = { new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar) }
有三個地方,第一個是重寫Seq[InputPartition],調用stream的planInputPartitions方法,注意下這裏的stream類型是MicroBatchStream;第二個是重寫readerFactory,得到讀取器工廠類;第三個重寫是inputRDD,建立DataSourceRDD做爲inputRDD,而前兩步重寫的Seq[InputPartition]和readerFactory做爲DataSourceRDD的構造參數。
這裏首先大概看下DataSourceRDD的功能是什麼。
DataSourceRDD這個類的代碼很短,很容易看清楚。最重要的就是compute方法,先給出所有代碼:
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = { val inputPartition = castPartition(split).inputPartition val reader: PartitionReader[_] = if (columnarReads) { partitionReaderFactory.createColumnarReader(inputPartition) } else { partitionReaderFactory.createReader(inputPartition) } context.addTaskCompletionListener[Unit](_ => reader.close()) val iter = new Iterator[Any] { private[this] var valuePrepared = false override def hasNext: Boolean = { if (!valuePrepared) { valuePrepared = reader.next() } valuePrepared } override def next(): Any = { if (!hasNext) { throw new java.util.NoSuchElementException("End of stream") } valuePrepared = false reader.get() } } // TODO: SPARK-25083 remove the type erasure hack in data source scan new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]]) }
先根據讀取器工廠類建立一個PartitionReader,而後調用PartitionReader的get方法獲取數據。就是這麼簡單了!
最後再看下ContinuousScanExec的定義。
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start) override lazy val readerFactory: ContinuousPartitionReaderFactory = { stream.createContinuousReaderFactory() } override lazy val inputRDD: RDD[InternalRow] = { EpochCoordinatorRef.get( sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY), sparkContext.env) .askSync[Unit](SetReaderPartitions(partitions.size)) new ContinuousDataSourceRDD( sparkContext, sqlContext.conf.continuousStreamingExecutorQueueSize, sqlContext.conf.continuousStreamingExecutorPollIntervalMs, partitions, schema, readerFactory.asInstanceOf[ContinuousPartitionReaderFactory]) }
和微批處理模式MicroBatchScanExec相似,也有三個地方重寫,第一個是重寫Seq[InputPartition],調用stream的planInputPartitions方法,注意下這裏的stream類型是ContinuousStream;第二個是重寫readerFactory,得到讀取器工廠類ContinuousPartitionReaderFactory;第三個重寫是inputRDD,建立ContinuousDataSourceRDD做爲inputRDD,而前兩步重寫的Seq[InputPartition]和readerFactory做爲ContinuousDataSourceRDD的構造參數。
這裏首先大概看下ContinuousDataSourceRDD的功能是什麼。
ContinuousDataSourceRDD的代碼和DataSourceRDD的基本差很少,直接看源碼吧,這裏就不細說了,也沒啥好細說的,顯得囉裏囉唆。
對於Kafka來講,ContinuousDataSourceRDD和DataSourceRDD其實最終是同樣的,具體代碼能夠看工程:spark-sql-kafka-0-10裏的代碼。