Structured Streaming入門

基於spark 2.2.0html

Structured Streaming 編程指南

容錯語義

提供 end-to-end exactly-once semantics (端到端的徹底一次性語義)是 Structured Streaming 設計背後的關鍵目標之一java

Datasets 和 DataFrames

自從 Spark 2.0 , DataFrame 和 Datasets 能夠表示 static (靜態), bounded data(有界數據),以及 streaming , unbounded data (無界數據)sql

Input Sources

在 Spark 2.0 中,有一些內置的 sources 。apache

  1. File source
    以文件流的形式讀取目錄中寫入的文件。支持的文件格式爲 text , csv , json , parquet 。文件必須以 atomically (原子方式)放置在給定的目錄中
  2. Kafka source
    來自 Kafka 的 Poll 數據。它與 Kafka broker 的 0.10.0 或者更高的版本兼容。
  3. Socket source (for testing)
    從一個 socket 鏈接中讀取 UTF8 文本數據。 監聽服務器 socket位於 driver 。只能用於測試,由於它不提供 端到端的容錯保證。

Window Operations on Event Time

每一個事件有一個event-time,假設運行Word count,計算10分鐘內的window Word count,每5分鐘更新一次,延遲超過10分鐘纔到達的數據再也不從新接收編程

Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String }

// Group the data by window and word and compute the count of each group
Dataset<Row> windowedCounts = words
//watermarking:超過該值到達的數據再也不更新
    .withWatermark("timestamp", "10 minutes")
    .groupBy(
        functions.window(words.col("timestamp"), "10 minutes", "5 minutes"),
        words.col("word"))
    .count();
複製代碼

Window Operations on Event Time

在 non-streaming Dataset (非流數據集)上使用 withWatermark 是不可行的json

Streaming 去重

event中有unique identifier (惟一標識符),可用於刪除重複記錄服務器

Dataset<Row> streamingDf = spark.readStream. ...;  // columns: guid, eventTime, ...

// 按照惟一標識符對全部數據去重
streamingDf.dropDuplicates("guid");

// 以惟一標識符刪除10s內的重複數據
streamingDf
  .withWatermark("eventTime", "10 seconds")
  .dropDuplicates("guid", "eventTime");
複製代碼

有狀態操做

自從 Spark 2.2 ,可使用 mapGroupsWithState 操做和更強大的操做 flatMapGroupsWithState 來完成,這兩個操做都容許在分組的數據集上應用用戶定義的代碼來更新用戶定義的狀態異步

Streaming Queries

得到窗口數據後,須要進行流計算,需指定如下一個或多個:socket

  1. output sink 的詳細信息: Data format, location, etc.
  2. Output mode : 指定寫入 output sink 的內容。
  3. Query name : 可選,指定用於標識的查詢的惟一名稱。
  4. Trigger interval (觸發間隔): 可選,指定觸發間隔。 若是未指定,則系統將在上一次處理完成後當即檢查新數據的可用性。 若是因爲先前的處理還沒有完成而致使觸發時間錯誤,則系統將嘗試在下一個觸發點觸發,而不是在處理完成後當即觸發。
  5. Checkpoint location : 對於能夠保證端對端容錯能力的某些 output sinks ,定系統將寫入全部 checkpoint 信息的位置。 這應該是與 HDFS 兼容的容錯文件系統中的目錄。

Output Modes

不一樣類型的 streaming queries 支持不一樣的 output modeside

查詢類型 查詢細分 支持的輸出模式 說明
Queries with aggregation (聚合) Aggregation on event-time with watermark Append, Update, Complete Append mode使用 watermark 來刪除舊聚合狀態。Update mode 使用 watermark 刪除舊的聚合狀態。Complete mode 不會刪除舊的聚合狀態,由於在定義上Complete mode會保存Result Table 中的全部數據。
Queries with aggregation (聚合) Other aggregations Complete, Update 因爲沒有定義 watermark,舊的聚合狀態不會刪除。不支持 Append mode ,由於aggregates聚合能夠更新,違反了模式語義。
Queries with mapGroupsWithState Update
Queries with flatMapGroupsWithState Append operation mode Append Aggregations are allowed after flatMapGroupsWithState
Queries with flatMapGroupsWithState Update operation mode Update Aggregations not allowed after flatMapGroupsWithState.
Other queries Append, Update 不支持 Complete mode ,由於將全部未分組數據保存在 Result Table 中是不可行的 。

Output Sinks

//streamingQuery對象有stop(),awaitTermination(),explain()等多個方法
val streamingQuery  =writeStream
    .format("parquet")        // can be "orc", "json", "csv", etc.
    .queryName("aggregatestable")    // this query name will be the table name
    .option("checkpointLocation", "path/to/checkpoint/dir")
    .option("path", "path/to/destination/dir")
    .start() //call start() to actually start the execution of the query
spark.sql("select * from aggregatestable").show(); 
複製代碼

一樣,不一樣sink支持不一樣的output modes

Sink 支持的輸出模式 選項 Fault-tolerant 容錯 說明
File Sink Append path: 必須指定輸出目錄的路徑。 Yes 支持對 partitioned tables的寫入。Partitioning by time may be useful.
Foreach Sink Append, Update, Compelete None 取決於 ForeachWriter 的實現。
Console Sink Append, Update, Complete numRows: 每一個觸發器須要打印的行數(默認:20) truncate: 若是輸出太長是否截斷(默認: true) No
Memory Sink Append, Complete None 否。可是在 Complete Mode 模式從新啓動查詢將重建 full table。 Table name is the query name

Foreach

datasetOfString.writeStream().foreach(
  //writer必須是可序列化的,發送到executors執行
  new ForeachWriter<String>() {

    @Override
    // version 是每一個觸發器增長的單調遞增的 id
    public boolean open(long partitionId, long version) {
      // open connection or init
      //返回fasle,則process不會被調用
    }

    @Override
    public void process(String value) {
      // write string to connection
    }

    @Override
    //當 open 被調用時(不管是返回true/false), close 也將被調用
    public void close(Throwable errorOrNull) {
      // close the connection
    }
  });
複製代碼

異步監視

添加StreamingQueryListener對象,有相關任務時將收到callback

SparkSession spark = ...

spark.streams().addListener(new StreamingQueryListener() {
    @Override
    public void onQueryStarted(QueryStartedEvent queryStarted) {
        System.out.println("Query started: " + queryStarted.id());
    }
    @Override
    public void onQueryTerminated(QueryTerminatedEvent queryTerminated) {
        System.out.println("Query terminated: " + queryTerminated.id());
    }
    @Override
    public void onQueryProgress(QueryProgressEvent queryProgress) {
        System.out.println("Query made progress: " + queryProgress.progress());
    }
});
複製代碼

參考資料

  1. Structured Streaming Programming Guide
相關文章
相關標籤/搜索