端到端的有且僅有一次保證,是結構化流設計的關鍵目標之一.session
結構化流設計了 Structured Streaming sources,sinks等等,來跟蹤確切的處理進度,並讓其重啓或重運行來處理任何故障socket
streaming source是相似kafka的偏移量(offsets)來跟蹤流的讀取位置.執行引擎使用檢查點(checkpoint)和預寫日誌(write ahead logs)來記錄每一個執行其的偏移範圍值ide
streaming sinks 是設計用來保證處理的冪等性ui
這樣,依靠可回放的數據源(streaming source)和處理冪等(streaming sinks),結構流來作到任何故障下的端到端的有且僅有一次保證spa
val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // Split the lines into words val words = lines.as[String].flatMap(_.split(" ")) // Generate running word count val wordCounts = words.groupBy("value").count()
其中,spark是SparkSession,lines是DataFrame,DataFrame就是Dataset[Row]。設計
DataSet日誌
看看Dataset的觸發因子的代碼實現,好比foreach操做:code
def foreach(f: T => Unit): Unit = withNewRDDExecutionId { rdd.foreach(f) } private def withNewRDDExecutionId[U](body: => U): U = { SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) { rddQueryExecution.executedPlan.foreach { plan => plan.resetMetrics() } body } }
接着看:orm
def withNewExecutionId[T]( sparkSession: SparkSession, queryExecution: QueryExecution, name: Option[String] = None)(body: => T): T = { val sc = sparkSession.sparkContext val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY) val executionId = SQLExecution.nextExecutionId sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString) executionIdToQueryExecution.put(executionId, queryExecution) try { withSQLConfPropagated(sparkSession) { try { body } catch { } finally { } } } finally { executionIdToQueryExecution.remove(executionId) sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId) } }
執行的真正代碼就是 queryExecution: QueryExecution。 對象
@transient private lazy val rddQueryExecution: QueryExecution = { val deserialized = CatalystSerde.deserialize[T](logicalPlan) sparkSession.sessionState.executePlan(deserialized) }
看到了看到了,是sessionState.executePlan執行logicalPlan而獲得了QueryExecution
這裏的sessionState.executePlan其實就是建立了一個QueryExecution對象。而後執行QueryExecution的executedPlan方法獲得SparkPlan這個物理計劃。怎麼生成的呢?
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) { SparkSession.setActiveSession(sparkSession) planner.plan(ReturnAnswer(optimizedPlan.clone())).next() }
經過planner.plan方法生成。
planner是SparkPlanner。在BaseSessionStateBuilder類中定義。
protected def planner: SparkPlanner = { new SparkPlanner(session.sparkContext, conf, experimentalMethods) { override def extraPlanningStrategies: Seq[Strategy] = super.extraPlanningStrategies ++ customPlanningStrategies } }
SparkPlanner類
SparkPlanner對LogicalPlan執行各類策略,返回對應的SparkPlan。好比對於流應用來講,有這樣的策略:DataSourceV2Strategy。
典型的幾個邏輯計劃到物理計劃的映射關係以下:
StreamingDataSourceV2Relation-》ContinuousScanExec
StreamingDataSourceV2Relation-》MicroBatchScanExec
前一種對應與Offset沒有endOffset的狀況,後一種對應於有endOffset的狀況。前一種是沒有結束的連續流,後一種是有區間的微批處理流。
前一種的時延能夠達到1ms,後一種的時延只能達到100ms。
【代碼】:
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined => val microBatchStream = r.stream.asInstanceOf[MicroBatchStream] val scanExec = MicroBatchScanExec( r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get) val withProjection = if (scanExec.supportsColumnar) { scanExec } else { // Add a Project here to make sure we produce unsafe rows. ProjectExec(r.output, scanExec) } withProjection :: Nil case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty => val continuousStream = r.stream.asInstanceOf[ContinuousStream] val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get) val withProjection = if (scanExec.supportsColumnar) { scanExec } else { // Add a Project here to make sure we produce unsafe rows. ProjectExec(r.output, scanExec) } withProjection :: Nil