from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate()
接下來,讓咱們建立一個流式DataFrame,它表示從偵聽localhost:9999的服務器接收的文本數據,並轉換DataFrame以計算字數。html
# Create DataFrame representing the stream of input lines from connection to localhost:9999 lines = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() # Split the lines into words words = lines.select( explode( split(lines.value, " ") ).alias("word") ) # Generate running word count wordCounts = words.groupBy("word").count()
# Start running the query that prints the running counts to the console query = wordCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination()
$ nc -lk 9999
而後,在不一樣的終端中,您可使用啓動示例java
$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
# TERMINAL 1: # Running Netcat $ nc -lk 9999 apache spark apache hadoop ... |
# TERMINAL 2: RUNNING structured_network_wordcount.py $ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999 ------------------------------------------- Batch: 0 ------------------------------------------- +------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ...
|
請注意,Structured Streaming不會實現整個表。·它從流數據源讀取最新的可用數據,逐步處理以更新結果,而後丟棄源數據。·它只保留更新結果所需的最小中間狀態數據(例如前面例子中的中間計數)python
Source | Options | Fault-tolerant | Notes |
---|---|---|---|
File source | path : path to the input directory, and common to all file formats. maxFilesPerTrigger : maximum number of new files to be considered in every trigger (default: no max) latestFirst : whether to process the latest new files first, useful when there is a large backlog of files (default: false) fileNameOnly : whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: "file:///dataset.txt" "s3://a/dataset.txt" "s3n://a/b/dataset.txt" "s3a://a/b/c/dataset.txt" For file-format-specific options, see the related methods in DataStreamReader (Scala/Java/Python/R). E.g. for "parquet" format options see DataStreamReader.parquet() . In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section. |
Yes | Supports glob paths, but does not support multiple comma-separated paths/globs. |
Socket Source | host : host to connect to, must be specifiedport : port to connect to, must be specified |
No | |
Rate Source | rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second.rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond . Using finer granularities than seconds will be truncated to integer seconds. numPartitions (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. The source will try its best to reach rowsPerSecond , but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed. |
Yes | |
Kafka Source | See the Kafka Integration Guide. | Yes | |
Here are some examples.sql
spark = SparkSession. ... # Read text from socket socketDF = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() socketDF.isStreaming() # Returns True for DataFrames that have streaming sources socketDF.printSchema() # Read all the csv files written atomically in a directory userSchema = StructType().add("name", "string").add("age", "integer") csvDF = spark \ .readStream \ .option("sep", ";") \ .schema(userSchema) \ .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType } # Select the devices which have signal more than 10 df.select("device").where("signal > 10") # Running count of the number of updates for each device type df.groupBy("deviceType").count()
您還能夠將流式DataFrame / Dataset註冊爲臨時視圖,而後在其上應用SQL命令。apache
df.createOrReplaceTempView("updates") spark.sql("select count(*) from updates") # returns another streaming DF
注意,您可使用df.isStreaming來識別DataFrame / Dataset是否具備流數據。編程
df.isStreaming()
結果表看起來以下所示。json
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, "10 minutes", "5 minutes"), words.word ).count()
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } # Group the data by window and word and compute the count of each group windowedCounts = words \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(words.timestamp, "10 minutes", "5 minutes"), words.word) \ .count()
staticDf = spark.read. ... streamingDf = spark.readStream. ... streamingDf.join(staticDf, "type") # inner equi-join with a static DF streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames. The challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results. Furthermore, similar to streaming aggregations, we automatically handle late, out-of-order data and can limit the state using watermarks. Let’s discuss the different types of supported stream-stream joins and how to use them.bootstrap
from pyspark.sql.functions import expr impressions = spark.readStream. ... clicks = spark.readStream. ... # Apply watermarks on event-time columns impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours") clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours") # Join with event-time constraints impressionsWithWatermark.join( clicksWithWatermark, expr(""" clickAdId = impressionAdId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 1 hour """) )
impressionsWithWatermark.join( clicksWithWatermark, expr(""" clickAdId = impressionAdId AND clickTime >= impressionTime AND clickTime <= impressionTime + interval 1 hour """), "leftOuter" # can be "inner", "leftOuter", "rightOuter" )
外鏈接與內部鏈接具備相同的保證,關於水印延遲以及數據是否會被丟棄。api
Left Input | Right Input | Join Type | |
---|---|---|---|
Static | Static | All types | Supported, since its not on streaming data even though it can be present in a streaming query |
Stream | Static | Inner | Supported, not stateful |
Left Outer | Supported, not stateful | ||
Right Outer | Not supported | ||
Full Outer | Not supported | ||
Static | Stream | Inner | Supported, not stateful |
Left Outer | Not supported | ||
Right Outer | Supported, not stateful | ||
Full Outer | Not supported | ||
Stream | Stream | Inner | Supported, optionally specify watermark on both sides + time constraints for state cleanup |
Left Outer | Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup | ||
Right Outer | Conditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup | ||
Full Outer | Not supported | ||
streamingDf = spark.readStream. ... # Without watermark using guid column streamingDf.dropDuplicates("guid") # With watermark using guid and eventTime columns streamingDf \ .withWatermark("eventTime", "10 seconds") \ .dropDuplicates("guid", "eventTime")
Query Type | Supported Output Modes | Notes | |
---|---|---|---|
Queries with aggregation | Aggregation on event-time with watermark | Append, Update, Complete | Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details. Update mode uses watermark to drop old aggregation state. Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table. |
Other aggregations | Complete, Update | Since no watermark is defined (only defined in other category), old aggregation state is not dropped. Append mode is not supported as aggregates can update thus violating the semantics of this mode. |
|
Queries with mapGroupsWithState |
Update | ||
Queries with flatMapGroupsWithState |
Append operation mode | Append | Aggregations are allowed after flatMapGroupsWithState . |
Update operation mode | Update | Aggregations not allowed after flatMapGroupsWithState . |
|
Queries with joins |
Append | Update and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported. | |
Other queries | Append, Update | Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table. | |
有幾種類型的內置輸出接收器數組
writeStream .format("parquet") // can be "orc", "json", "csv", etc. .option("path", "path/to/destination/dir") .start()
writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .start()
writeStream .foreach(...) .start()
writeStream .format("console") .start()
writeStream .format("memory") .queryName("tableName") .start()
Sink | Supported Output Modes | Options | Fault-tolerant | Notes |
---|---|---|---|---|
File Sink | Append | path : path to the output directory, must be specified. For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet() |
Yes (exactly-once) | Supports writes to partitioned tables. Partitioning by time may be useful. |
Kafka Sink | Append, Update, Complete | See the Kafka Integration Guide | Yes (at-least-once) | More details in the Kafka Integration Guide |
Foreach Sink | Append, Update, Complete | None | Depends on ForeachWriter implementation | More details in the next section |
ForeachBatch Sink | Append, Update, Complete | None | Depends on the implementation | More details in the next section |
Console Sink | Append, Update, Complete | numRows : Number of rows to print every trigger (default: 20) truncate : Whether to truncate the output if too long (default: true) |
No | |
Memory Sink | Append, Complete | None | No. But in Complete Mode, restarted query will recreate the full table. | Table name is the query name. |
# ========== DF with no aggregations ========== noAggDF = deviceDataDf.select("device").where("signal > 10") # Print new data to console noAggDF \ .writeStream \ .format("console") \ .start() # Write new data to Parquet files noAggDF \ .writeStream \ .format("parquet") \ .option("checkpointLocation", "path/to/checkpoint/dir") \ .option("path", "path/to/destination/dir") \ .start() # ========== DF with aggregation ========== aggDF = df.groupBy("device").count() # Print updated aggregations to console aggDF \ .writeStream \ .outputMode("complete") \ .format("console") \ .start() # Have all the aggregates in an in-memory table. The query name will be the table name aggDF \ .writeStream \ .queryName("aggregates") \ .outputMode("complete") \ .format("memory") \ .start() spark.sql("select * from aggregates").show() # interactively query in-memory table
def foreach_batch_function(df, epoch_id): # Transform and write batchDF pass streamingDF.writeStream.foreachBatch(foreach_batch_function).start()
使用foreachBatch,您能夠執行如下操做。
streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.persist() batchDF.write.format(…).save(…) // location 1 batchDF.write.format(…).save(…) // location 2 batchDF.unpersist() }
Note:
def process_row(row): # Write row to storage pass query = streamingDF.writeStream.foreach(process_row).start()
class ForeachWriter: def open(self, partition_id, epoch_id): # Open connection. This method is optional in Python. pass def process(self, row): # Write row to connection. This method is NOT optional in Python. pass def close(self, error): # Close the connection. This method in optional in Python. pass query = streamingDF.writeStream.foreach(ForeachWriter()).start()
執行語義啓動流式查詢時,Spark如下列方式調用函數或對象的方法:
方法的生命週期以下:
對於partition_id的每一個分區:
對於epoch_id的流數據的每一個批次/紀元:
方法open(partitionId,epochId)被調用。
若是open(...)返回true,則對於分區和批處理/紀元中的每一行,將調用方法進程(行)
調用方法close(錯誤),在處理行時看到錯誤(若是有)。
若是open()方法存在而且成功返回(無論返回值),則調用close()方法(若是存在),除非JVM或Python進程在中間崩潰。
Note: 當失敗致使某些輸入數據的從新處理時,open()方法中的partitionId和epochId可用於對生成的數據進行重複數據刪除。這取決於查詢的執行模式。若是以微批處理模式執行流式查詢,則保證由惟一元組(partition_id,epoch_id)表示的每一個分區具備相同的數據。所以,(partition_id,epoch_id)可用於對數據進行重複數據刪除和/或事務提交,並實現一次性保證。可是,若是正在以連續模式執行流式查詢,則此保證不成立,所以不該用於重複數據刪除。
Trigger Type | Description |
---|---|
unspecified (default) | 若是未明確指定觸發設置,則默認狀況下,查詢將以微批處理模式執行,一旦前一個微批處理完成處理,將當即生成微批處理。 |
Fixed interval micro-batches | 查詢將以微批處理模式執行,其中微批處理將以用戶指定的間隔啓動。
|
One-time micro-batch |
查詢將執行*僅一個*微批處理全部可用數據,而後自行中止。這在您但願按期啓動集羣,處理自上一個時間段以來可用的全部內容,而後關閉集羣的方案中很是有用。在某些狀況下,這可能會顯着節省成本。
|
Continuous with fixed checkpoint interval (experimental) |
查詢將以新的低延遲,連續處理模式執行。
·在下面的連續處理部分中閱讀更多相關信息
|
如下是一些代碼示例:
# Default trigger (runs micro-batch as soon as it can) df.writeStream \ .format("console") \ .start() # ProcessingTime trigger with two-seconds micro-batch interval df.writeStream \ .format("console") \ .trigger(processingTime='2 seconds') \ .start() # One-time trigger df.writeStream \ .format("console") \ .trigger(once=True) \ .start() # Continuous trigger with one-second checkpointing interval df.writeStream .format("console") .trigger(continuous='1 second') .start()
啓動查詢時建立的StreamingQuery對象可用於監視和管理查詢
query = df.writeStream.format("console").start() # get the query object query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data query.runId() # get the unique id of this run of the query, which will be generated at every start/restart query.name() # get the name of the auto-generated or user-specified name query.explain() # print detailed explanations of the query query.stop() # stop the query query.awaitTermination() # block until query is terminated, with stop() or with error query.exception() # the exception if the query has been terminated with error query.recentProgress() # an array of the most recent progress updates for this query query.lastProgress() # the most recent progress update of this streaming query
spark = ... # spark session spark.streams().active # get the list of currently active streaming queries spark.streams().get(id) # get a query object by its unique id spark.streams().awaitAnyTermination() # block until any one of them terminates
query = ... # a StreamingQuery print(query.lastProgress) ''' Will print something like the following. {u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}} ''' print(query.status) ''' Will print something like the following. {u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False} '''
Not available in Python.
spark.conf.set("spark.sql.streaming.metricsEnabled", "true") # or spark.sql("SET spark.sql.streaming.metricsEnabled=true")
All queries started in the SparkSession after this configuration has been enabled will report metrics through Dropwizard to whatever sinks have been configured (e.g. Ganglia, Graphite, JMX, etc.).
aggDF \ .writeStream \ .outputMode("complete") \ .option("checkpointLocation", "path/to/HDFS/dir") \ .format("memory") \ .start()
術語「容許」意味着您能夠執行指定的更改,但其效果的語義是否明肯定義取決於查詢和更改.
Types of changes(變化的類型)
輸入源的數量或類型(即不一樣來源)的變化:這是不容許的。
spark.readStream.format("kafka").option("subscribe", "topic")
to
spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
不容許對訂閱的主題/文件進行更改,由於結果是不可預測:spark.readStream.format("kafka").option("subscribe", "topic")
to
spark.readStream.format("kafka").option("subscribe", "newTopic")
File sink to Kafka sink is allowed. Kafka will see only the new data.
Kafka sink to file sink is not allowed.
Kafka sink changed to foreach, or vice versa is allowed.
Changes in the parameters of output sink: Whether this is allowed and whether the semantics of the change are well-defined depends on the sink and the query. Here are a few examples.
Changes to output directory of a file sink is not allowed: sdf.writeStream.format("parquet").option("path", "/somePath")
to sdf.writeStream.format("parquet").option("path", "/anotherPath")
Changes to output topic is allowed: sdf.writeStream.format("kafka").option("topic", "someTopic")
to sdf.writeStream.format("kafka").option("topic", "anotherTopic")
Changes to the user-defined foreach sink (that is, the ForeachWriter
code) is allowed, but the semantics of the change depends on the code.
*Changes in projection / filter / map-like operations**: Some cases are allowed. For example:
Addition / deletion of filters is allowed: sdf.selectExpr("a")
to sdf.where(...).selectExpr("a").filter(...)
.
Changes in projections with same output schema is allowed: sdf.selectExpr("stringColumn AS json").writeStream
to sdf.selectExpr("anotherStringColumn AS json").writeStream
Changes in projections with different output schema are conditionally allowed: sdf.selectExpr("a").writeStream
to sdf.selectExpr("b").writeStream
is allowed only if the output sink allows the schema change from "a"
to "b"
.
Changes in stateful operations: Some operations in streaming queries need to maintain state data in order to continuously update the result. Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, HDFS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts. This means that any changes (that is, additions, deletions, or schema modifications) to the stateful operations of a streaming query are not allowed between restarts. Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery:
Streaming aggregation: For example, sdf.groupBy("a").agg(...)
. Any change in number or type of grouping keys or aggregates is not allowed.
Streaming deduplication: For example, sdf.dropDuplicates("a")
. Any change in number or type of grouping keys or aggregates is not allowed.
Stream-stream join: For example, sdf1.join(sdf2, ...)
(i.e. both inputs are generated with sparkSession.readStream
). Changes in the schema or equi-joining columns are not allowed. Changes in join type (outer or inner) not allowed. Other changes in the join condition are ill-defined.
Arbitrary stateful operation: For example, sdf.groupByKey(...).mapGroupsWithState(...)
or sdf.groupByKey(...).flatMapGroupsWithState(...)
. Any change to the schema of the user-defined state and the type of timeout is not allowed. Any change within the user-defined state-mapping function are allowed, but the semantic effect of the change depends on the user-defined logic. If you really want to support state schema changes, then you can explicitly encode/decode your complex state data structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully.
spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1") \ .load() \ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("topic", "topic1") \ .trigger(continuous="1 second") \ # only change in query .start()
A checkpoint interval of 1 second means that the continuous processing engine will records the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.
從Spark 2.3開始,連續處理模式僅支持如下類型的查詢。
numPartitions
and rowsPerSecond
.Further Reading(進一步閱讀)
Talks