基於spark 2.2.0html
提供 end-to-end exactly-once semantics (端到端的徹底一次性語義)是 Structured Streaming 設計背後的關鍵目標之一java
自從 Spark 2.0 , DataFrame 和 Datasets 能夠表示 static (靜態), bounded data(有界數據),以及 streaming , unbounded data (無界數據)sql
在 Spark 2.0 中,有一些內置的 sources 。apache
每一個事件有一個event-time,假設運行Word count,計算10分鐘內的window Word count,每5分鐘更新一次,延遲超過10分鐘纔到達的數據再也不從新接收編程
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }
// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
.withWatermark("timestamp", "10 minutes")
functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
在 non-streaming Dataset (非流數據集)上使用 withWatermark 是不可行的json
event中有unique identifier (惟一標識符),可用於刪除重複記錄服務器
Dataset<Row> streamingDf = spark.readStream. ...; // columns: guid, eventTime, ...
// 按照惟一標識符對全部數據去重
// 以惟一標識符刪除10s內的重複數據
.withWatermark("eventTime", "10 seconds")
.dropDuplicates("guid", "eventTime");
自從 Spark 2.2 ,可使用 mapGroupsWithState 操做和更強大的操做 flatMapGroupsWithState 來完成,這兩個操做都容許在分組的數據集上應用用戶定義的代碼來更新用戶定義的狀態異步
不一樣類型的 streaming queries 支持不一樣的 output modeside
查詢類型 | 查詢細分 | 支持的輸出模式 | 說明 |
Queries with aggregation (聚合) | Aggregation on event-time with watermark | Append, Update, Complete | Append mode使用 watermark 來刪除舊聚合狀態。Update mode 使用 watermark 刪除舊的聚合狀態。Complete mode 不會刪除舊的聚合狀態,由於在定義上Complete mode會保存Result Table 中的全部數據。 |
Queries with aggregation (聚合) | Other aggregations | Complete, Update | 因爲沒有定義 watermark,舊的聚合狀態不會刪除。不支持 Append mode ,由於aggregates聚合能夠更新,違反了模式語義。 |
Queries with mapGroupsWithState | Update | ||
Queries with flatMapGroupsWithState | Append operation mode | Append | Aggregations are allowed after flatMapGroupsWithState |
Queries with flatMapGroupsWithState | Update operation mode | Update | Aggregations not allowed after flatMapGroupsWithState. |
Other queries | Append, Update | 不支持 Complete mode ,由於將全部未分組數據保存在 Result Table 中是不可行的 。 |
val streamingQuery =writeStream
.format("parquet") // can be "orc", "json", "csv", etc.
.queryName("aggregatestable") // this query name will be the table name
.option("checkpointLocation", "path/to/checkpoint/dir")
.option("path", "path/to/destination/dir")
.start() //call start() to actually start the execution of the query
spark.sql("select * from aggregatestable").show();
一樣,不一樣sink支持不一樣的output modes
Sink | 支持的輸出模式 | 選項 | Fault-tolerant 容錯 | 說明 |
File Sink | Append | path: 必須指定輸出目錄的路徑。 | Yes | 支持對 partitioned tables的寫入。Partitioning by time may be useful. |
Foreach Sink | Append, Update, Compelete | None | 取決於 ForeachWriter 的實現。 | |
Console Sink | Append, Update, Complete | numRows: 每一個觸發器須要打印的行數(默認:20) truncate: 若是輸出太長是否截斷(默認: true) | No | |
Memory Sink | Append, Complete | None | 否。可是在 Complete Mode 模式從新啓動查詢將重建 full table。 | Table name is the query name |
new ForeachWriter<String>() {
// version 是每一個觸發器增長的單調遞增的 id
public boolean open(long partitionId, long version) {
// open connection or init
public void process(String value) {
// write string to connection
//當 open 被調用時(不管是返回true/false), close 也將被調用
public void close(Throwable errorOrNull) {
// close the connection
SparkSession spark = ...
spark.streams().addListener(new StreamingQueryListener() {
public void onQueryStarted(QueryStartedEvent queryStarted) {
System.out.println("Query started: " + queryStarted.id());
public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
System.out.println("Query terminated: " + queryTerminated.id());
public void onQueryProgress(QueryProgressEvent queryProgress) {
System.out.println("Query made progress: " + queryProgress.progress());