Structured Streaming提供一些API來管理Streaming對象。用戶能夠經過這些API來手動管理已經啓動的Streaming,保證在系統中的Streaming有序執行。html
在調用DataStreamWriter方法的start啓動Streaming後,會返回一個StreamingQuery對象。因此用戶就能夠經過這個對象來管理Streaming。apache
以下所示:ide
val query = df.writeStream.format("console").start() // get the query object ui
query.id // get the unique identifier of the running query that persists across restarts from checkpoint data this
query.runId // get the unique id of this run of the query, which will be generated at every start/restart spa
query.name // get the name of the auto-generated or user-specified name rest
query.explain() // print detailed explanations of the query orm
query.stop() // stop the query htm
query.awaitTermination() // block until query is terminated, with stop() or with error 對象
query.exception // the exception if the query has been terminated with error
query.recentProgress // an array of the most recent progress updates for this query
query.lastProgress // the most recent progress update of this streaming query |
Structured Streaming提供了另一個管理Streaming的接口是:StreamingQueryManager。用戶能夠經過SparkSession對象的streams方法得到。
以下所示:
val spark: SparkSession = ... val streamManager = spark.streams() streamManager.active // get the list of currently active streaming queries
streamManager.get(id) // get a query object by its unique id
streamManager.awaitAnyTermination() // block until any one of them terminates |
[2]. Kafka Integration Guide.