結構化流是一種基於Spark SQL引擎構建的可擴展且容錯的流處理引擎。您可使用與表達靜態數據的批量計算相同的方式來表達流式計算。Spark SQL引擎會逐步持續運行,並在流數據持續到達時更新最終結果。您可使用Scala,Java或Python中的Dataset / DataFrame API來表達流聚合,事件時間窗口,流到批處理鏈接等。計算在相同的優化的Spark SQL引擎上執行。最後,系統經過檢查點和預寫日誌確保端到端的一次容錯保證。簡而言之,結構化流提供快速,可擴展,容錯,端到端的一次性流處理,而無需用戶理解流。java
結構化Streaming仍然是Spark 2.1中的ALPHA, API仍然是實驗性的。在本指南中,咱們將介紹編程模型和API。首先,咱們從一個簡單的例子開始 - 一個流字數。python
假設您但願保持從監聽TCP套接字的數據服務器接收的文本數據的運行字數。咱們來看看如何使用結構化流式表達這一點。您能夠在Scala / Java /Python中看到完整的代碼 。若是您下載Spark,能夠直接運行該示例。在任何狀況下,讓咱們逐步瞭解示例並瞭解它的工做原理。首先,咱們必須導入必要的類並建立一個本地SparkSession,這是與Spark相關的全部功能的起點。git
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; import java.util.Arrays; import java.util.Iterator; SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate();
接下來,咱們建立一個流DataFrame,它表示從本地主機偵聽的服務器接收的文本數據:9999,並轉換DataFrame以計算字數。github
// Create DataFrame representing the stream of input lines from connection to localhost:9999 Dataset<Row> lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load(); // Split the lines into words Dataset<String> words = lines .as(Encoders.STRING()) .flatMap( new FlatMapFunction<String, String>() { @Override public Iterator<String> call(String x) { return Arrays.asList(x.split(" ")).iterator(); } }, Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = words.groupBy("value").count();
該lines
DataFrame表示包含流文本數據的無界表。此表包含一列名爲「value」的字符串,而且流文本數據中的每一行都將成爲表中的一行。請注意,這並非正在收到任何數據,由於咱們只是設置轉換,尚未開始。接下來,咱們已經將DataFrame轉換爲String的數據集.as(Encoders.STRING())
,所以咱們能夠應用該flatMap
操做將每行劃分紅多個單詞。所得words
數據集包含全部單詞。最後,咱們已經wordCounts
經過數據集中惟一的值進行分組並對它們進行計數來定義了DataFrame。請注意,這是一個流式DataFrame,它表示流的運行字數。sql
咱們如今已經設置了關於流數據的查詢。剩下的就是實際開始接收數據並計算計數。爲此,咱們將其設置爲outputMode("complete")
每次更新時將完整的計數(由指定)計算到控制檯。而後開始使用流計算start()
。shell
// Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination();
執行此代碼後,流式計算將在後臺啓動。該query
對象是該活動流查詢的句柄,咱們已經決定等待查詢的終止query.awaitTermination()
以阻止進程在查詢處於活動狀態時退出。數據庫
要實際執行此示例代碼,您能夠在本身的Spark應用程序中編譯代碼 ,或者一旦下載了Spark 便可 運行該示例。咱們正在展現後者。您將首先須要運行Netcat(大多數類Unix系統中的一個小型實用程序)做爲數據服務器express
$ nc -lk 9999
而後,在不一樣的終端中,您可使用啓動示例apache
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
而後,在運行netcat服務器的終端中輸入的任何行將每秒計數並打印在屏幕上。它會看起來像下面這樣。
# TERMINAL 1: # Running Netcat $ nc -lk 9999 apache spark apache hadoop ... |
# TERMINAL 2: RUNNING JavaStructuredNetworkWordCount $ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999 ------------------------------------------- Batch: 0 ------------------------------------------- +------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ... |
結構化流式傳輸的關鍵思想是將實時數據流視爲不斷附加的表格。這致使與批處理模型很是類似的新流處理模型。您將在靜態表格上將流式計算表示爲標準批量查詢,而Spark會在無界輸入表上將其做爲增量查詢來運行。讓咱們更詳細地瞭解這個模型。
將輸入數據流視爲「輸入表」。到達流的每一個數據項都像追加到輸入表的新行同樣。
對輸入的查詢將生成「結果表」。每一個觸發間隔(例如,每1秒),新行將附加到輸入表,最終更新結果表。不管什麼時候更新結果表,咱們都但願將更改的結果行寫入外部接收器。
「輸出」定義爲寫入外部存儲器的內容。能夠以不一樣的模式定義輸出
完整模式 - 整個更新的結果表將寫入外部存儲。由存儲鏈接器決定如何處理整個表的寫入。
附加模式 - 只有結果表中自上次觸發後附加的新行將被寫入外部存儲。這僅適用於不指望更改結果表中現有行的查詢。
更新模式 - 只有在上次觸發後,結果表中更新的行將被寫入外部存儲(Spark 2.0中尚不可用)。請注意,這與徹底模式不一樣,由於該模式不會輸出未更改的行。
請注意,每一個模式適用於某些類型的查詢。這將在後面詳細討論。
爲了說明這個模型的使用,讓咱們在上面的快速示例的上下文中瞭解模型。第一個lines
DataFrame是輸入表,最後的wordCounts
DataFrame是結果表。須要注意的是在流媒體的查詢lines
數據幀生成wordCounts
是徹底同樣的,由於它是一個靜態的數據幀。可是,當該查詢啓動時,Spark將不斷地從套接字鏈接檢查新數據。若是有新數據,Spark將運行一個「增量」查詢,將先前的運行計數與新數據相結合,以計算更新的計數,以下所示。
這種模式與許多其餘流處理引擎顯着不一樣。許多流媒體系統須要用戶本身維護運行聚合,所以必須對容錯和數據一致性(至少一次,或最多一次,或一次)進行說明。在這個模型中,當有新數據時,Spark負責更新結果表,從而減輕用戶對它的推理。例如,咱們來看看這個模型如何處理基於事件時間的處理和遲到的數據。
事件時間是嵌入數據自己的時間。對於許多應用程序,您可能但願在此事件時間進行操做。例如,若是要每分鐘獲取IoT設備生成的事件數,那麼您可能但願使用數據生成的時間(即數據中的事件時間),而不是Spark接收到的時間他們。這個事件時間在這個模型中很是天然地表現出來 - 來自設備的每一個事件都是表中的一行,事件時間是行中的列值。這容許基於窗口的聚合(例如,每分鐘的事件數)只是偶數列上的特殊類型的分組和聚合 - 每一個時間窗口是一個組,每行能夠屬於多個窗口/組。所以,能夠在靜態數據集(例如來自收集的設備事件日誌)以及數據流上一致地定義基於事件時間窗口的聚合查詢,從而使用戶的使用壽命更加容易。
此外,該模型天然地處理基於事件時間晚於預期的數據。因爲Spark正在更新結果表,它能夠徹底控制更新舊的聚合,當有遲到的數據,以及清理舊的聚合以限制中間狀態數據的大小。因爲Spark 2.1,咱們支持水印,容許用戶指定後期數據的閾值,並容許引擎相應地清理舊狀態。稍後將在「窗口操做」部分中詳細介紹這些。
提供端到端的一次性語義是結構化流媒體設計背後的關鍵目標之一。爲了實現這一點,咱們設計告終構化流源,匯和執行引擎,以即可靠地跟蹤處理的確切進度,以便它能夠經過從新啓動和/或從新處理來處理任何類型的故障。假設每一個流源具備偏移量(相似於Kafka偏移量或Kinesis序列號)以跟蹤流中的讀取位置。引擎使用檢查點和寫入日誌記錄每一個觸發器中正在處理的數據的偏移範圍。流宿設計爲處理後處理的冪等。一塊兒使用可重放源和冪等接收器,結構化流能夠在任何故障下確保端到端徹底一次的語義。
因爲Spark 2.0,DataFrames和Datasets能夠表示靜態,有界數據,以及流式傳輸,無界數據。與靜態數據集/ DataFrames相似,您可使用通用入口點SparkSession
(Scala / Java / Python文檔)從流源建立流式DataFrames / Datasets,並將其做爲靜態DataFrames / Datasets應用相同的操做。若是您不熟悉Datasets / DataFrames,強烈建議您使用DataFrame / Dataset編程指南來熟悉它們 。
Streaming DataFrames能夠經過返回的DataStreamReader
接口(Scala / Java / Python文檔)建立SparkSession.readStream()
。與建立靜態DataFrame的讀取界面相似,您能夠指定源 - 數據格式,模式,選項等的詳細信息。
在Spark 2.0中,有幾個內置源代碼。
文件源 - 將目錄中寫入的文件做爲數據流讀取。支持的文件格式爲text,csv,json,parquet。有關更多最新列表,請參閱DataStreamReader界面的文檔,以及每一個文件格式的支持選項。請注意,文件必須以原子方式放置在給定的目錄中,這在大多數文件系統中能夠經過文件移動操做實現。
卡夫卡來源 - 來自卡夫卡的投票數據。它與Kafka代理商版本0.10.0或更高版本兼容。有關詳細信息,請參閱「 卡夫卡集成指南」。
套接字源(用於測試) - 從套接字鏈接讀取UTF8文本數據。偵聽服務器插座位於驅動程序中。請注意,這僅應用於測試,由於這不提供端到端容錯保證。
這裏有些例子。
SparkSession spark = ... // Read text from socket Dataset[Row] socketDF = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load(); socketDF.isStreaming(); // Returns True for DataFrames that have streaming sources socketDF.printSchema(); // Read all the csv files written atomically in a directory StructType userSchema = new StructType().add("name", "string").add("age", "integer"); Dataset[Row] csvDF = spark .readStream() .option("sep", ";") .schema(userSchema) // Specify schema of the csv files .csv("/path/to/directory"); // Equivalent to format("csv").load("/path/to/directory")
這些示例生成無類型的流DataFrames,這意味着DataFrame的架構在編譯時未被檢查,只在運行時在查詢提交時進行檢查。像某些操做map
,flatMap
等須要在編譯時已知的類型。要作到這一點,您可使用與靜態DataFrame相同的方法將這些未類型化的流DataFrames轉換爲類型化的流數據集。有關詳細信息,請參閱SQL編程指南。此外,有關支持的流媒體源的更多詳細信息將在文檔後面討論。
默認狀況下,基於文件的源的結構化流式傳輸須要您指定模式,而不是依靠Spark自動推斷。這種限制確保了即便在出現故障的狀況下,一致的模式將被用於流式查詢。對於特殊用例,您能夠經過設置spark.sql.streaming.schemaInference
來從新啓用模式推斷true
。
當存在命名的子目錄/key=value/
而且列表將自動遞歸到這些目錄中時,會發生分區發現。若是這些列顯示在用戶提供的模式中,則它們將根據正在讀取的文件的路徑由Spark填充。構成分區方案的目錄必須在查詢啓動時存在,而且必須保持靜態。例如,能夠添加/data/year=2016/
什麼時候/data/year=2015/
存在,可是更改分區列(即經過建立目錄/data/date=2016-04-17/
)無效。
您能夠將各類操做上的流DataFrames /數據集-從無類型,相似於SQL的操做(例如select
,where
,groupBy
),爲鍵入RDD般的操做(例如map
,filter
,flatMap
)。有關詳細信息,請參閱SQL編程指南。咱們來看一下可使用的幾個示例操做。
DataFrame / Dataset的大多數常見操做都支持流式傳輸。本節稍後將討論不支持的少數操做。
import org.apache.spark.api.java.function.*; import org.apache.spark.sql.*; import org.apache.spark.sql.expressions.javalang.typed; import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder; public class DeviceData { private String device; private String type; private Double signal; private java.sql.Date time; ... // Getter and setter methods for each field } Dataset<Row> df = ...; // streaming DataFrame with IOT device data with schema { device: string, type: string, signal: double, time: DateType } Dataset<DeviceData> ds = df.as(ExpressionEncoder.javaBean(DeviceData.class)); // streaming Dataset with IOT device data // Select the devices which have signal more than 10 df.select("device").where("signal > 10"); // using untyped APIs ds.filter(new FilterFunction<DeviceData>() { // using typed APIs @Override public boolean call(DeviceData value) throws Exception { return value.getSignal() > 10; } }).map(new MapFunction<DeviceData, String>() { @Override public String call(DeviceData value) throws Exception { return value.getDevice(); } }, Encoders.STRING()); // Running count of the number of updates for each device type df.groupBy("type").count(); // using untyped API // Running average signal for each device type ds.groupByKey(new MapFunction<DeviceData, String>() { // using typed API @Override public String call(DeviceData value) throws Exception { return value.getType(); } }, Encoders.STRING()).agg(typed.avg(new MapFunction<DeviceData, Double>() { @Override public Double call(DeviceData value) throws Exception { return value.getSignal(); } }));
經過結構化流式,滑動事件時間窗口的聚合很簡單。瞭解基於窗口的聚合的關鍵思想與分組聚合很是類似。在分組聚合中,爲用戶指定的分組列中的每一個惟一值維護聚合值(例如計數)。在基於窗口的聚合的狀況下,對於行的事件時間的每一個窗口,維護聚合值。讓咱們以一個例證瞭解這一點。
想象一下,咱們的快速示例被修改,而且流如今包含生成行的時間和行。咱們不想運行字數,而是要在10分鐘的窗口內計數單詞,每5分鐘更新一次。也就是說,在10分鐘的窗口12:00 - 12:10,12:05 - 12:15,12:10 - 12:20等之間收到的單詞計數。請注意,12:00 - 12:10表示數據12點後到12點10分抵達。如今,考慮在12:07收到的一個字。這個詞應該增長對應於兩個窗口的計數12:00 - 12:10和12:05 - 12:15。因此計數將由二者分組鍵(即單詞)和窗口(能夠從事件時間計算)進行索引。
結果表將以下所示。
因爲此窗口相似於分組,所以在代碼中,您可使用groupBy()
和window()
操做來表達窗口聚合。您能夠在Scala / Java / Python中看到如下示例的完整代碼 。
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } // Group the data by window and word and compute the count of each group Dataset<Row> windowedCounts = words.groupBy( functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), words.col("word") ).count();
如今考慮若是一個事件遲到應用程序會發生什麼。例如,說在12:04(即事件時間)生成的一個字能夠在12:11被應用程序收到。應用程序應該使用12:04而不是12:11來更新窗口的較舊計數12:00 - 12:10
。這在咱們基於窗口的分組中天然發生 - 結構化流能夠長時間維持部分聚合的中間狀態,以便後期數據能夠正確地更新舊窗口的聚合,以下所示。
可是,爲了運行這個查詢幾天,系統必須綁定其累積的內存中間狀態的數量。這意味着系統須要知道什麼時候能夠從內存狀態中刪除舊聚合,由於應用程序不會再爲該集合接收到較晚的數據。爲了實現這一點,在Spark 2.1中,咱們引入了 水印,讓引擎自動跟蹤數據中的當前事件時間,並嘗試相應地清除舊狀態。您能夠經過指定事件時間列來定義查詢的水印,並根據事件時間來肯定數據預計的延遲。對於從時間開始的特定窗口T
,引擎將保持狀態,並容許延遲數據更新狀態,直到(max event time seen by the engine - late threshold > T)
。換句話說,閾值內的晚期數據將被聚合,可是晚於閾值的數據將被丟棄。讓咱們以一個例子來理解這一點。咱們能夠很容易地定義上一個例子中的水印withWatermark()
,以下所示。
Dataset<Row> words = ... // streaming DataFrame of schema { timestamp: Timestamp, word: String } // Group the data by window and word and compute the count of each group Dataset<Row> windowedCounts = words .withWatermark("timestamp", "10 minutes") .groupBy( functions.window(words.col("timestamp"), "10 minutes", "5 minutes"), words.col("word")) .count();
在這個例子中,咱們正在定義「timestamp」列的查詢的水印,並將「10分鐘」定義爲容許數據延遲的閾值。若是這個查詢運行在追加輸出模式(稍後在輸出模式部分中討論),引擎將從列「時間戳」跟蹤當前事件時間,並在事件時間以前等待額外的「10分鐘」,而後才能完成加窗計數並添加他們到結果表。這是一個例子。
如圖所示,發動機跟蹤的最大事件時間是 藍色虛線,而且(max event time - '10 mins')
每一個觸發開始時設置的水印是紅線。例如,當引擎觀察數據時 (12:14, dog)
,它將爲下一個觸發器爲12:04
。對於窗口12:00 - 12:10
,當系統等待後期數據時,部分計數被保持爲內部狀態。在系統發現(12:21, owl)
水印超過12:10的數據(即)以後,部分計數被肯定並附加到表中。因爲全部超過12:10的「太晚」數據將被忽略,所以此計數將不會改變。
請注意,在附加輸出模式下,系統必須等待「晚期門檻」時間才能輸出窗口的聚合。若是數據可能很是遲,(如1天),而且您喜歡在沒有等待一天的狀況下進行部分計數,這可能不是理想的。未來,咱們將添加更新輸出模式,這將容許每一個更新聚合被寫入以吸取每一個觸發。
水印清除聚合狀態 的條件重要的是要注意,爲了清理聚合查詢中的狀態,必須知足如下條件(從Spark 2.1開始,未來會有變化)。
輸出模式必須爲「追加」。完整模式要求保留全部聚合數據,所以不能使用水印來中斷狀態。有關每種輸出模式的語義的詳細說明,請參見「 輸出模式」部分。
聚合必須具備事件時間列或window
事件時間列上的一個。
withWatermark
必須在與聚合中使用的時間戳列相同的列上調用。例如, df.withWatermark("time", "1 min").groupBy("time2").count()
在附加輸出模式中無效,由於水印在不一樣的列上定義爲聚合列。
withWatermark
必須在聚合以前調用要使用的水印細節。例如,df.groupBy("time").count().withWatermark("time", "1 min")
在追加輸出模式中無效。
流數據幀能夠與靜態DataFrames一塊兒建立新的流數據幀。這裏有幾個例子。
Dataset<Row> staticDf = spark.read. ...; Dataset<Row> streamingDf = spark.readStream. ...; streamingDf.join(staticDf, "type"); // inner equi-join with a static DF streamingDf.join(staticDf, "type", "right_join"); // right outer join with a static DF
可是請注意,靜態DataFrames / Datasets中的全部操做都不支持流式傳輸DataFrames / Datasets。雖然其中一些不受支持的操做將在將來版本的Spark中獲得支持,但還有一些在基本上很難在流數據上高效地實現。例如,輸入流數據集不支持排序,由於它須要跟蹤流中接收到的全部數據。所以,從根本上難以有效執行。從Spark 2.0開始,一些不受支持的操做以下
流數據集不支持多個流聚合(即流DF上的聚合鏈)。
流數據集不支持限制並採起前N行。
不支持流數據集上的不一樣操做。
只有在聚合和徹底輸出模式下,流數據集才支持排序操做。
有條件地支持流和靜態數據集之間的外鏈接。
不支持與流數據集徹底外鏈接
不支持左側外鏈接與右側的流數據集
右側外鏈接與左側的流數據集不支持
兩個流數據集之間的任何形式的鏈接都不支持。
此外,還有一些Dataset方法將不適用於流數據集。它們是當即運行查詢並返回結果的操做,這在流數據集上沒有意義。相反,這些功能能夠經過顯式啓動流式查詢來完成(參見下一節)。
count()
- 不能從流數據集返回單個計數。而是使用ds.groupBy.count()
它返回包含運行計數的流數據集。
foreach()
- 而是使用ds.writeStream.foreach(...)
(見下一節)。
show()
- 而是使用控制檯接收器(見下一節)。
若是您嘗試任何這些操做,您將看到一個AnalysisException,如「操做XYZ不支持流數據幀/數據集」。
一旦定義了最終結果DataFrame / Dataset,剩下的就是開始流計算。爲此,您必須使用返回的DataStreamWriter
(Scala / Java / Python文檔)Dataset.writeStream()
。您將必須在此界面中指定如下一個或多個。
輸出接收器的詳細信息:數據格式,位置等
輸出模式:指定寫入輸出接收器的內容。
查詢名稱:可選地,指定用於標識的查詢的惟一名稱。
觸發間隔:可選,指定觸發間隔。若是未指定,系統將在上一次處理完成後當即檢查新數據的可用性。若是因爲先前的處理還沒有完成而致使觸發時間錯誤,則系統將嘗試在下一個觸發點觸發,而不是在處理完成後當即觸發。
檢查點位置:對於能夠保證端到端容錯的某些輸出接收器,請指定系統將寫入全部檢查點信息的位置。這應該是與HDFS兼容的容錯文件系統中的目錄。檢查點的語義將在下一節中進行更詳細的討論。
有幾種類型的輸出模式。
附加模式(默認) - 這是默認模式,只有從上次觸發後添加到結果表的新行將被輸出到接收器。只有那些添加到結果表中的行從不會改變的查詢才支持這一點。所以,該模式保證每行只能輸出一次(假定容錯接收器)。例如,僅查詢select
, where
,map
,flatMap
,filter
,join
,等會支持追加模式。
完成模式 - 每次觸發後,整個結果表將被輸出到接收器。聚合查詢支持這一點。
更新模式 - (Spark 2.1中不可用)只有結果表中自上次觸發後更新的行纔會被輸出到接收器。更多信息將在之後的版本中添加。
不一樣類型的流式查詢支持不一樣的輸出模式。如下是兼容性矩陣。
查詢類型 | 支持的輸出模式 | 筆記 | |
---|---|---|---|
沒有聚合的查詢 |
附加 | 支持完整模式備註,由於將全部數據保存在結果表中是不可行的。 | |
聚合查詢 | 使用水印對事件時間進行聚合 | 追加,完成 | 附加模式使用水印來刪除舊的聚合狀態。可是,窗口聚合的輸出會延遲「withWatermark()」中指定的晚期閾值,由於模式語義能夠在結果表中定義後將其添加到結果表中一次(即在水印被交叉以後)。有關詳細信息,請參閱延遲數據部分。 徹底模式不會下降舊的聚合狀態,由於根據定義,此模式保留結果表中的全部數據。 |
其餘彙總 | 完成 | 不支持附加模式,由於聚合能夠更新,從而違反了此模式的語義。 徹底模式不會下降舊的聚合狀態,由於根據定義,此模式保留結果表中的全部數據。 |
|
有幾種類型的內置輸出接收器。
文件宿 - 將輸出存儲到目錄中。
Foreach sink - 對輸出中的記錄運行任意計算。有關詳細信息,請參閱本節後面部分。
控制檯接收器(用於調試) - 每次觸發時,將輸出打印到控制檯/ stdout。都支持「追加」和「完成」輸出模式。這應該用於低數據量的調試目的,由於在每次觸發後,整個輸出被收集並存儲在驅動程序的存儲器中。
存儲器接收器(用於調試) - 輸出做爲內存表存儲在存儲器中。都支持「追加」和「完成」輸出模式。這應該用於低數據量的調試目的,由於在每次觸發後,整個輸出被收集並存儲在驅動程序的存儲器中。
這是一個表的全部的接收器和相應的設置。
水槽 | 支持的輸出模式 | 用法 | 容錯 | 筆記 |
---|---|---|---|---|
文件槽 | 附加 | writeStream .format(「parquet」).start () |
是 | 支持寫入分區表。按時間劃分多是有用的。 |
Foreach水槽 | 全部模式 | writeStream .foreach(...).start () |
取決於ForeachWriter的實現 | 更多細節在下一節 |
控制檯接收器 | 追加,完成 | writeStream .format(「console」).start () |
沒有 | |
內存槽 | 追加,完成 | writeStream .format(「memory」). queryName(「table」). start() |
沒有 | 將輸出數據保存爲表,進行交互式查詢。表名是查詢名。 |
最後,您必須調用start()
實際啓動查詢的執行。這將返回一個StreamingQuery對象,它是持續運行的執行的句柄。您可使用此對象來管理查詢,咱們將在下一小節中討論。如今,讓咱們經過幾個例子瞭解全部這些。
// ========== DF with no aggregations ========== Dataset<Row> noAggDF = deviceDataDf.select("device").where("signal > 10"); // Print new data to console noAggDF .writeStream() .format("console") .start(); // Write new data to Parquet files noAggDF .writeStream() .parquet("path/to/destination/directory") .start(); // ========== DF with aggregation ========== Dataset<Row> aggDF = df.groupBy("device").count(); // Print updated aggregations to console aggDF .writeStream() .outputMode("complete") .format("console") .start(); // Have all the aggregates in an in-memory table aggDF .writeStream() .queryName("aggregates") // this query name will be the table name .outputMode("complete") .format("memory") .start(); spark.sql("select * from aggregates").show(); // interactively query in-memory table
該foreach
操做容許在輸出數據上計算任意操做。從Spark 2.1起,這隻適用於Scala和Java。要使用它,您將必須實現接口ForeachWriter
(Scala /Java文檔),該接口具備在觸發器以後生成做爲輸出的行序列時調用的方法。請注意如下要點。
做者必須是可序列化的,由於它將被序列化併發送給執行者執行。
全部這三種方法,open
,process
而且close
將在執行者調用。
只有在open
調用該方法時,寫入程序才能執行全部的初始化(例如打開鏈接,啓動事務等)。請注意,若是在建立對象以後,類中有任何初始化,那麼該初始化將在驅動程序中發生(由於這是正在建立的實例),這可能不是您打算的。
version
而且partition
是兩個參數,open
其中惟一地表示須要被推出的一組行。version
是每一個觸發器增長的單調遞增的id。partition
是表示輸出分區的id,由於輸出是分佈式的,而且將在多個執行器上進行處理。
open
可使用version
和partition
選擇是否須要寫入行的順序。所以,它能夠返回true
(繼續寫入),或false
(不須要寫入)。若是false
返回,則process
不會在任何行上調用。例如,在部分故障以後,失敗的觸發器的一些輸出分區可能已經被提交到數據庫。基於存儲在數據庫中的元數據,做者能夠識別已經被提交的分區,並所以返回false以跳過再次提交它們。
不管什麼時候open
被調用,close
也將被調用(除非JVM因爲某些錯誤而退出)。即便open
返回false 也是如此。若是在處理和寫入數據時有任何錯誤,close
將被調用帶有錯誤。清理已經建立的狀態(例如鏈接,事務等)是您的責任open
,由於沒有資源泄漏。
StreamingQuery
查詢啓動時建立的對象可用於監視和管理查詢。
StreamingQuery query = df.writeStream().format("console").start(); // get the query object query.id(); // get the unique identifier of the running query query.name(); // get the name of the auto-generated or user-specified name query.explain(); // print detailed explanations of the query query.stop(); // stop the query query.awaitTermination(); // block until query is terminated, with stop() or with error query.exception(); // the exception if the query has been terminated with error query.sourceStatus(); // progress information about data has been read from the input sources query.sinkStatus(); // progress information about data written to the output sink
您能夠在單個SparkSession中啓動任意數量的查詢。他們都將同時運行共享羣集資源。您可使用sparkSession.streams()
來獲取StreamingQueryManager
(斯卡拉 / Java的 / Python的文檔),可用於管理當前活動查詢。
SparkSession spark = ... spark.streams().active(); // get the list of currently active streaming queries spark.streams().get(id); // get a query object by its unique id spark.streams().awaitAnyTermination(); // block until any one of them terminates
有兩個API用於監視和調試活動查詢 - 以交互方式和異步方式。
您可使用streamingQuery.lastProgress()
和直接獲取活動查詢的當前狀態和指標 streamingQuery.status()
。 在Scala 和Java中lastProgress()
返回一個StreamingQueryProgress
對象,並 在Python中返回與該字段相同的字典。它具備關於流的最後一個觸發器的進展的全部信息 - 處理哪些數據,處理速率,延遲等等。還有 哪些返回最後幾個進度的數組。 streamingQuery.recentProgress
此外,在Scala 和Java中streamingQuery.status()
返回StreamingQueryStatus
對象以及 Python中具備相同字段的字典。它提供有關查詢當即執行的信息 - 觸發器是活動的,正在處理的數據等。
這裏有幾個例子。
StreamingQuery query = ... System.out.println(query.lastProgress()); /* Will print something like the following. { "id" : "ce011fdc-8762-4dcb-84eb-a77333e28109", "runId" : "88e2ff94-ede0-45a8-b687-6316fbef529a", "name" : "MyQuery", "timestamp" : "2016-12-14T18:45:24.873Z", "numInputRows" : 10, "inputRowsPerSecond" : 120.0, "processedRowsPerSecond" : 200.0, "durationMs" : { "triggerExecution" : 3, "getOffset" : 2 }, "eventTime" : { "watermark" : "2016-12-14T18:45:24.873Z" }, "stateOperators" : [ ], "sources" : [ { "description" : "KafkaSource[Subscribe[topic-0]]", "startOffset" : { "topic-0" : { "2" : 0, "4" : 1, "1" : 1, "3" : 1, "0" : 1 } }, "endOffset" : { "topic-0" : { "2" : 0, "4" : 115, "1" : 134, "3" : 21, "0" : 534 } }, "numInputRows" : 10, "inputRowsPerSecond" : 120.0, "processedRowsPerSecond" : 200.0 } ], "sink" : { "description" : "MemorySink" } } */ System.out.println(query.status()); /* Will print something like the following. { "message" : "Waiting for data to arrive", "isDataAvailable" : false, "isTriggerActive" : false } */
您還能夠SparkSession
經過附加StreamingQueryListener
(Scala / Java文檔)異步監視與a相關聯的全部查詢 。一旦你附加了你的自定義StreamingQueryListener
對象 sparkSession.streams.attachListener()
,當查詢被啓動和中止時以及在一個活動查詢中進行進度時,你將獲得回調。這是一個例子,
SparkSession spark = ... spark.streams.addListener(new StreamingQueryListener() { @Overrides void onQueryStarted(QueryStartedEvent queryStarted) { System.out.println("Query started: " + queryStarted.id()); } @Overrides void onQueryTerminated(QueryTerminatedEvent queryTerminated) { System.out.println("Query terminated: " + queryTerminated.id()); } @Overrides void onQueryProgress(QueryProgressEvent queryProgress) { System.out.println("Query made progress: " + queryProgress.progress()); } });
若是發生故障或故意關機,您能夠恢復以前的查詢的進度和狀態,並繼續中止。這是使用檢查點和寫入日誌完成的。您可使用檢查點位置配置查詢,而且查詢將將全部進度信息(即每一個觸發器中處理的偏移範圍)和運行聚合(例如快速示例中的字數)保存到檢查點位置。此檢查點位置必須是HDFS兼容文件系統中的路徑,而且能夠在啓動查詢時將其設置爲DataStreamWriter中的選項。
aggDF .writeStream() .outputMode("complete") .option("checkpointLocation", "path/to/HDFS/dir") .format("memory") .start();