信念,你拿它沒辦法,可是沒有它你什麼也作不成。—— 撒姆爾巴特勒html
對於spark streaming而言,大的batch任務會致使後續batch任務積壓,對於structured streaming任務影響如何,本篇文章主要來作一下簡單的說明。java
本篇文章的全稱爲設置trigger後,運行時間長的 query 對後續 query 的submit time的影響sql
首先trigger有三種類型,分別爲 OneTimeTrigger ,ProcessingTime 以及 ContinuousTrigger 三種。這三種解釋能夠參照 spark 集羣優化 中對 trigger的解釋說明。apache
OneTimeTrigger只執行一次query就結束了,不存在對後續batch影響。 json
經過上面兩個面板截圖中的submitted列能夠看出,此時每個batch的query 提交時間是根據前驅query的結束時間來肯定的。ide
它是當新數據到達時在後臺連續執行的查詢的句柄。管理在單獨線程中發生的流式Spark SQL查詢的執行。 與標準查詢不一樣,每次新數據到達查詢計劃中存在的任何 Source 時,流式查詢都會重複執行。每當新數據到達時,都會建立一個 QueryExecution,並將結果以事務方式提交給給定的 Sink 。
1 (sink, trigger) match { 2 case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) => 3 if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) { 4 UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode) 5 } 6 // 使用 ContinuousTrigger 則爲 ContinuousExecution 7 new StreamingQueryWrapper(new ContinuousExecution( 8 sparkSession, 9 userSpecifiedName.orNull, 10 checkpointLocation, 11 analyzedPlan, 12 v2Sink, 13 trigger, 14 triggerClock, 15 outputMode, 16 extraOptions, 17 deleteCheckpointOnStop)) 18 case _ => 19 // 使用 ProcessingTrigger 則爲 MicroBatchExecution 20 new StreamingQueryWrapper(new MicroBatchExecution( 21 sparkSession, 22 userSpecifiedName.orNull, 23 checkpointLocation, 24 analyzedPlan, 25 sink, 26 trigger, 27 triggerClock, 28 outputMode, 29 extraOptions, 30 deleteCheckpointOnStop)) 31 }
OneTimeTrigger | MicroBatchExecution |
ProcessingTrigger | MicroBatchExecution |
ContinuousTrigger | ContinuousExecution |
另外,StreamExecution構造參數中的analyzedPlan是指LogicalPlan,也就是說在第一個query啓動以前,LogicalPlan已經生成,此時的LogicalPlan是 UnResolved LogicalPlan,由於此時每個AST依賴的數據節點的source信息還未知,還沒法優化LogicalPlan。
class full name
console | org.apache.spark.sql.execution.streaming.ConsoleSinkProvider |
kafka | org.apache.spark.sql.kafka010.KafkaSourceProvider |
ForeachSink | org.apache.spark.sql.execution.streaming.sources.ForeachWriterProvider |
MemorySinkV2 | org.apache.spark.sql.execution.streaming.sources.MemorySinkV2 |
不然會匹配到 MicroBatchExecution, 可是在初始化 triggerExecution成員變量時,只支持ProcessingTrigger,不支持 ContinuousTrigger,會拋出以下異常:
1 try { 2 // When starting a query, it will call `StreamingQueryListener.onQueryStarted` synchronously. 3 // As it's provided by the user and can run arbitrary codes, we must not hold any lock here. 4 // Otherwise, it's easy to cause dead-lock, or block too long if the user codes take a long 5 // time to finish. 6 query.streamingQuery.start() 7 } catch { 8 case e: Throwable => 9 activeQueriesLock.synchronized { 10 activeQueries -= query.id 11 } 12 throw e 13 }
這裏的query.streamingQuery就是StreamExecution,即爲MicroBatchExecution 或 ContinuousExecution。
StreamExecution的start 方法以下:
1 /** 2 * Starts the execution. This returns only after the thread has started and [[QueryStartedEvent]] 3 * has been posted to all the listeners. 4 */ 5 def start(): Unit = { 6 logInfo(s"Starting $prettyIdString. Use $resolvedCheckpointRoot to store the query checkpoint.") 7 queryExecutionThread.setDaemon(true) 8 queryExecutionThread.start() 9 startLatch.await() // Wait until thread started and QueryStart event has been posted 10 }
1 /** 2 * The thread that runs the micro-batches of this stream. Note that this thread must be 3 * [[org.apache.spark.util.UninterruptibleThread]] to workaround KAFKA-1894: interrupting a 4 * running `KafkaConsumer` may cause endless loop. 5 */ 6 val queryExecutionThread: QueryExecutionThread = 7 new QueryExecutionThread(s"stream execution thread for $prettyIdString") { 8 override def run(): Unit = { 9 // To fix call site like "run at <unknown>:0", we bridge the call site from the caller 10 // thread to this micro batch thread 11 sparkSession.sparkContext.setCallSite(callSite) 12 runStream() 13 } 14 }
其中,QueryExecutionThread 是 UninterruptibleThread 的子類,UninterruptibleThread 是 Thread的子類,即QueryExecutionThread 是一個線程類。他會運行runStream方法,runStream關鍵代碼以下:
try { // 運行Stream query的準備工做,send QueryStartedEvent event, countDown latch,streaming configure等操做 runActivatedStream(sparkSessionForStream) // 運行 stream } catch { // 異常處理 } finally { // 運行完Stream query的收尾工做,stop source,send stream stop event,刪除checkpoint(若是啓用deleteCheckpointOnStop)等等操做 }
runActivatedStream 說明:Run the activated stream until stopped. :它是抽象方法,由子類實現。
MicroBatchExecution 的 runActivatedStream的方法邏輯描述以下:
1 triggerExecutor.execute(() =>{ 2 提交執行每個query的操做 3 })
triggerExecution 的定義以下:
private val triggerExecutor = trigger match { case t: ProcessingTime => ProcessingTimeExecutor(t, triggerClock) case OneTimeTrigger => OneTimeExecutor() case _ => throw new IllegalStateException(s"Unknown type of trigger: $trigger") }
即便用 ProcessingTime 會使用 ProcessingTimeExecutor 來週期性生成 batch query,其 execution 方法代碼以下:
1 override def execute(triggerHandler: () => Boolean): Unit = { 2 while (true) { 3 val triggerTimeMs = clock.getTimeMillis 4 val nextTriggerTimeMs = nextBatchTime(triggerTimeMs) 5 val terminated = !triggerHandler() 6 if (intervalMs > 0) { 7 val batchElapsedTimeMs = clock.getTimeMillis - triggerTimeMs 8 if (batchElapsedTimeMs > intervalMs) { 9 notifyBatchFallingBehind(batchElapsedTimeMs) 10 } 11 if (terminated) { 12 return 13 } 14 clock.waitTillTime(nextTriggerTimeMs) 15 } else { 16 if (terminated) { 17 return 18 } 19 } 20 } 21 }
def execute(triggerHandler: () => Boolean): Unit = { while(true) { 獲取current_time 根據current_time和interval獲取下一個批次start_time 執行query任務獲取並獲取是否結束stream的標誌位 if(interval > 0) { query使用時間 = 新獲取的current_time - 舊的current_time if(query使用時間 > interval) { notifyBatchFallingBehind // 目前只是打印warn日誌 } if(stream終止標誌位爲true){ return // 結束這個while循環退出方法 } // Clock.waitTillTime SystemClock子類經過while + sleep(ms)實現,其他子類經過while + wait(ms) 來實現,使用while是爲了防止外部中斷致使wait時間不夠 } else { if(stream終止標誌位爲true){ return // 結束這個while循環退出方法 } } } }
即stream沒有中止狀況下,下一個batch的提交時間爲 = 當前batch使用時間 > interval ? 當前batch結束時間:本批次開始時間 / interval * interval + interval
1 override protected def runActivatedStream(sparkSessionForStream: SparkSession): Unit = { 2 val stateUpdate = new UnaryOperator[State] { 3 override def apply(s: State) = s match { 4 // If we ended the query to reconfigure, reset the state to active. 5 case RECONFIGURING => ACTIVE 6 case _ => s 7 } 8 } 9 10 do { 11 runContinuous(sparkSessionForStream) 12 } while (state.updateAndGet(stateUpdate) == ACTIVE) 13 }
其中,runContinuous 源碼以下:
/** * Do a continuous run. * @param sparkSessionForQuery Isolated [[SparkSession]] to run the continuous query with. */ private def runContinuous(sparkSessionForQuery: SparkSession): Unit = { // A list of attributes that will need to be updated. val replacements = new ArrayBuffer[(Attribute, Attribute)] // Translate from continuous relation to the underlying data source. var nextSourceId = 0 continuousSources = logicalPlan.collect { case ContinuousExecutionRelation(dataSource, extraReaderOptions, output) => val metadataPath = s"$resolvedCheckpointRoot/sources/$nextSourceId" nextSourceId += 1 dataSource.createContinuousReader( java.util.Optional.empty[StructType](), metadataPath, new DataSourceOptions(extraReaderOptions.asJava)) } uniqueSources = continuousSources.distinct val offsets = getStartOffsets(sparkSessionForQuery) var insertedSourceId = 0 val withNewSources = logicalPlan transform { case ContinuousExecutionRelation(source, options, output) => val reader = continuousSources(insertedSourceId) insertedSourceId += 1 val newOutput = reader.readSchema().toAttributes assert(output.size == newOutput.size, s"Invalid reader: ${Utils.truncatedString(output, ",")} != " + s"${Utils.truncatedString(newOutput, ",")}") replacements ++= output.zip(newOutput) val loggedOffset = offsets.offsets(0) val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json)) reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull)) StreamingDataSourceV2Relation(newOutput, source, options, reader) } // Rewire the plan to use the new attributes that were returned by the source. val replacementMap = AttributeMap(replacements) val triggerLogicalPlan = withNewSources transformAllExpressions { case a: Attribute if replacementMap.contains(a) => replacementMap(a).withMetadata(a.metadata) case (_: CurrentTimestamp | _: CurrentDate) => throw new IllegalStateException( "CurrentTimestamp and CurrentDate not yet supported for continuous processing") } val writer = sink.createStreamWriter( s"$runId", triggerLogicalPlan.schema, outputMode, new DataSourceOptions(extraOptions.asJava)) val withSink = WriteToContinuousDataSource(writer, triggerLogicalPlan) val reader = withSink.collect { case StreamingDataSourceV2Relation(_, _, _, r: ContinuousReader) => r }.head reportTimeTaken("queryPlanning") { lastExecution = new IncrementalExecution( sparkSessionForQuery, withSink, outputMode, checkpointFile("state"), runId, currentBatchId, offsetSeqMetadata) lastExecution.executedPlan // Force the lazy generation of execution plan } sparkSessionForQuery.sparkContext.setLocalProperty( StreamExecution.IS_CONTINUOUS_PROCESSING, true.toString) sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.START_EPOCH_KEY, currentBatchId.toString) // Add another random ID on top of the run ID, to distinguish epoch coordinators across // reconfigurations. val epochCoordinatorId = s"$runId--${UUID.randomUUID}" currentEpochCoordinatorId = epochCoordinatorId sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.EPOCH_COORDINATOR_ID_KEY, epochCoordinatorId) sparkSessionForQuery.sparkContext.setLocalProperty( ContinuousExecution.EPOCH_INTERVAL_KEY, trigger.asInstanceOf[ContinuousTrigger].intervalMs.toString) // Use the parent Spark session for the endpoint since it's where this query ID is registered. val epochEndpoint = EpochCoordinatorRef.create( writer, reader, this, epochCoordinatorId, currentBatchId, sparkSession, SparkEnv.get) val epochUpdateThread = new Thread(new Runnable { override def run: Unit = { try { triggerExecutor.execute(() => { startTrigger() if (reader.needsReconfiguration() && state.compareAndSet(ACTIVE, RECONFIGURING)) { if (queryExecutionThread.isAlive) { queryExecutionThread.interrupt() } false } else if (isActive) { currentBatchId = epochEndpoint.askSync[Long](IncrementAndGetEpoch) logInfo(s"New epoch $currentBatchId is starting.") true } else { false } }) } catch { case _: InterruptedException => // Cleanly stop the query. return } } }, s"epoch update thread for $prettyIdString") try { epochUpdateThread.setDaemon(true) epochUpdateThread.start() reportTimeTaken("runContinuous") { SQLExecution.withNewExecutionId(sparkSessionForQuery, lastExecution) { lastExecution.executedPlan.execute() } } } catch { case t: Throwable if StreamExecution.isInterruptionException(t) && state.get() == RECONFIGURING => logInfo(s"Query $id ignoring exception from reconfiguring: $t") // interrupted by reconfiguration - swallow exception so we can restart the query } finally { // The above execution may finish before getting interrupted, for example, a Spark job having // 0 partitions will complete immediately. Then the interrupted status will sneak here. // // To handle this case, we do the two things here: // // 1. Clean up the resources in `queryExecutionThread.runUninterruptibly`. This may increase // the waiting time of `stop` but should be minor because the operations here are very fast // (just sending an RPC message in the same process and stopping a very simple thread). // 2. Clear the interrupted status at the end so that it won't impact the `runContinuous` // call. We may clear the interrupted status set by `stop`, but it doesn't affect the query // termination because `runActivatedStream` will check `state` and exit accordingly. queryExecutionThread.runUninterruptibly { try { epochEndpoint.askSync[Unit](StopContinuousExecutionWrites) } finally { SparkEnv.get.rpcEnv.stop(epochEndpoint) epochUpdateThread.interrupt() epochUpdateThread.join() stopSources() // The following line must be the last line because it may fail if SparkContext is stopped sparkSession.sparkContext.cancelJobGroup(runId.toString) } } Thread.interrupted() } }
// TODO 僞代碼,後續整理