Spark譯文(三)

Structured Streaming Programming Guide(結構化流編程指南)

Overview(概貌)

·Structured Streaming是一種基於Spark SQL引擎的可擴展且容錯的流處理引擎。
·您能夠像表達靜態數據的批處理計算同樣表達流式計算。
·Spark SQL引擎將負責逐步和連續地運行它,並在流數據繼續到達時更新最終結果。
·您可使用Scala,Java,Python或R中的數據集/數據框架API來表示流聚合,事件時間窗口,流到批處理鏈接等。計算在同一優化的Spark SQL引擎上執行。
·最後,系統經過檢查點和預寫日誌確保端到端的一次性容錯保證。
·簡而言之,Structured Streaming提供快速,可擴展,容錯,端到端的精確一次流處理,而無需用戶推理流式傳輸。
·在內部,默認狀況下,結構化流式查詢使用微批處理引擎進行處理,該引擎將數據流做爲一系列小批量做業處理,從而實現低至100毫秒的端到端延遲和徹底一次的容錯保證。
·可是,自Spark 2.3以來,咱們引入了一種稱爲連續處理的新型低延遲處理模式,它能夠實現低至1毫秒的端到端延遲,而且具備至少一次保證。
·無需更改查詢中的數據集/數據框操做,您就能夠根據應用程序要求選擇模式。
·在本指南中,咱們將引導您完成編程模型和API。
·咱們將主要使用默認的微批處理模型來解釋這些概念,而後討論連續處理模型。
·首先,讓咱們從一個結構化流式查詢的簡單示例開始 - 一個流式字數。

Quick Example(快速示例)

·假設您但願維護從偵聽TCP套接字的數據服務器接收的文本數據的運行字數。
·讓咱們看看如何使用Structured Streaming表達這一點。
·您能夠在Scala / Java / Python / R中看到完整的代碼。
·若是你下載Spark,你能夠直接運行這個例子。
·在任何狀況下,讓咱們一步一步地瞭解示例,並瞭解它是如何工做的。
·首先,咱們必須導入必要的類並建立一個本地SparkSession,這是與Spark相關的全部功能的起點。
from pyspark.sql import SparkSession from pyspark.sql.functions import explode from pyspark.sql.functions import split spark = SparkSession \ .builder \ .appName("StructuredNetworkWordCount") \ .getOrCreate()

接下來,讓咱們建立一個流式DataFrame,它表示從偵聽localhost:9999的服務器接收的文本數據,並轉換DataFrame以計算字數。html

# Create DataFrame representing the stream of input lines from connection to localhost:9999 lines = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() # Split the lines into words words = lines.select( explode( split(lines.value, " ") ).alias("word") ) # Generate running word count wordCounts = words.groupBy("word").count()
·這行DataFrame表示包含流文本數據的無界表。
·此表包含一列名爲「value」的字符串,而且流式文本數據中的每一行都成爲表中的一行。
·請注意,因爲咱們只是設置轉換,而且還沒有啓動它,所以目前沒有接收任何數據。
·接下來,咱們使用了兩個內置的SQL函數 - split和explode,將每行分紅多行,每行包含一個單詞。
·此外,咱們使用函數別名將新列命名爲「word」。
·最後,咱們經過對數據集中的惟一值進行分組並對它們進行計數來定義wordCounts DataFrame。
·請注意,這是一個流式DataFrame,它表示流的運行字數。
·咱們如今已經設置了關於流數據的查詢。
·剩下的就是實際開始接收數據並計算計數。
·爲此,咱們將其設置爲每次更新時將完整的計數集(由outputMode(「complete」)指定)打印到控制檯。
·而後使用start()開始流式計算。
# Start running the query that prints the running counts to the console query = wordCounts \ .writeStream \ .outputMode("complete") \ .format("console") \ .start() query.awaitTermination()
·執行此代碼後,流式計算將在後臺啓動。
·查詢對象是該活動流式查詢的句柄,咱們決定使用awaitTermination()等待查詢終止,以防止進程在查詢處於活動狀態時退出。
·要實際執行此示例代碼,您能夠在本身的Spark應用程序中編譯代碼,或者只需在下載Spark後運行該示例。
·咱們正在展現後者。
·您首先須要使用Netcat(在大多數類Unix系統中找到的小實用程序)做爲數據服務器運行
$ nc -lk 9999

而後,在不一樣的終端中,您可使用啓動示例java

$ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999
·而後,在運行netcat服務器的終端中鍵入的任何行將被計數並每秒在屏幕上打印。
·它看起來像下面這樣。
# TERMINAL 1: # Running Netcat $ nc -lk 9999 apache spark apache hadoop ...
 
# TERMINAL 2: RUNNING structured_network_wordcount.py $ ./bin/spark-submit examples/src/main/python/sql/streaming/structured_network_wordcount.py localhost 9999 ------------------------------------------- Batch: 0 ------------------------------------------- +------+-----+ | value|count| +------+-----+ |apache| 1| | spark| 1| +------+-----+ ------------------------------------------- Batch: 1 ------------------------------------------- +------+-----+ | value|count| +------+-----+ |apache| 2| | spark| 1| |hadoop| 1| +------+-----+ ...

Programming Model(編程模型)

·Structured Streaming中的關鍵思想是將實時數據流視爲連續追加的表。
·這致使新的流處理模型很是相似於批處理模型。
·您將流式計算表示爲靜態表上的標準批處理查詢,Spark將其做爲無界輸入表上的增量查詢運行。
·讓咱們更詳細地瞭解這個模型。

Basic Concepts(基本概念)

·將輸入數據流視爲「輸入表」。
·到達流的每一個數據項都像一個新行被附加到輸入表。

Stream as a Table

·對輸入的查詢將生成「結果表」。
·每一個觸發間隔(例如,每1秒),新行將附加到輸入表,最終更新結果表。
·每當結果表更新時,咱們都但願將更改的結果行寫入外部接收器。

Model

·「輸出」定義爲寫入外部存儲器的內容。
·輸出能夠以不一樣的模式定義:
·完整模式 - 整個更新的結果表將寫入外部存儲器。
·由存儲鏈接器決定如何處理整個表的寫入。
·追加模式 - 自上次觸發後,只有結果表中附加的新行纔會寫入外部存儲器。
·這僅適用於預計結果表中的現有行不會更改的查詢。
·更新模式 - 只有自上次觸發後在結果表中更新的行纔會寫入外部存儲(自Spark 2.1.1起可用)。
·請注意,這與完整模式的不一樣之處在於此模式僅輸出自上次觸發後已更改的行。
·若是查詢不包含聚合,則它將等同於追加模式。
·請注意,每種模式適用於某些類型的查詢。
·稍後將對此進行詳細討論。
·爲了說明此模型的使用,讓咱們在上面的快速示例的上下文中理解模型。
·第一行DataFrame是輸入表,最後一個wordCounts DataFrame是結果表。
·請注意,生成wordCounts的流線DataFrame上的查詢與靜態DataFrame徹底相同。
·可是,當啓動此查詢時,Spark將不斷檢查套接字鏈接中的新數據。
·若是有新數據,Spark將運行「增量」查詢,該查詢將先前運行的計數與新數據相結合,以計算更新的計數,以下所示。

Model

請注意,Structured Streaming不會實現整個表。·它從流數據源讀取最新的可用數據,逐步處理以更新結果,而後丟棄源數據。·它只保留更新結果所需的最小中間狀態數據(例如前面例子中的中間計數)python

·該模型與許多其餘流處理引擎明顯不一樣。
·許多流系統要求用戶本身維護運行聚合,所以必須推斷容錯和數據一致性(至少一次,或至多一次,或徹底一次)。
·在此模型中,Spark負責在有新數據時更新結果表,從而減輕用戶對其的推理。
·做爲一個例子,讓咱們看看這個模型如何處理基於事件時間的處理和遲到的數據。

Handling Event-time and Late Data(處理事件時間和後期數據)

·事件時間是嵌入數據自己的時間。
·對於許多應用程序,您可能但願在此事件時間運行。
·例如,若是您想每分鐘獲取IoT設備生成的事件數,那麼您可能但願使用生成數據的時間(即數據中的事件時間),而不是Spark接收的時間
·此事件時間在此模型中很是天然地表達 - 來自設備的每一個事件都是表中的一行,事件時間是行中的列值。
·這容許基於窗口的聚合(例如,每分鐘的事件數)在事件時間列上只是一種特殊類型的分組和聚合 - 每一個時間窗口是一個組,每行能夠屬於多個窗口/組。
·所以,能夠在靜態數據集(例如,來自收集的設備事件日誌)以及數據流上一致地定義這種基於事件時間窗口的聚合查詢,使得用戶的生活更加容易。
·此外,該模型天然地處理基於其事件時間到達的時間晚於預期的數據。
·因爲Spark正在更新結果表,所以它能夠在存在延遲數據時徹底控制更新舊聚合,以及清理舊聚合以限制中間狀態數據的大小。
·從Spark 2.1開始,咱們支持水印,容許用戶指定後期數據的閾值,並容許引擎相應地清理舊狀態。
·稍後將在「窗口操做」部分中詳細介紹這些內容。

Fault Tolerance Semantics(容錯語義)

·提供端到端的一次性語義是結構化流的設計背後的關鍵目標之一。
·爲實現這一目標,咱們設計告終構化流媒體源,接收器和執行引擎,以可靠地跟蹤處理的確切進度,以便經過從新啓動和/或從新處理來處理任何類型的故障。
·假設每一個流源都具備偏移(相似於Kafka偏移或Kinesis序列號)以跟蹤流中的讀取位置。
·引擎使用檢查點和預寫日誌來記錄每一個觸發器中正在處理的數據的偏移範圍。
·流式接收器設計爲處理從新處理的冪等功能。
·結合使用可重放的源和冪等接收器,結構化流能夠確保在任何失敗的狀況下端到端徹底一次的語義。

API using Datasets and DataFrames(使用數據集和數據框架的API)

·從Spark 2.0開始,DataFrames和Datasets能夠表示靜態的,有界的數據,以及流式無界數據。
·與靜態數據集/數據框相似,您可使用公共入口點SparkSession(Scala / Java / Python / R docs)從流源建立流式DataFrames / Datasets,並對它們應用與靜態DataFrames / Datasets相同的操做。
·若是您不熟悉數據集/數據框架,強烈建議您使用「數據框架/數據集編程指南」熟悉它們。

Creating streaming DataFrames and streaming Datasets(建立流式DataFrame和流式數據集)

·能夠經過SparkSession.readStream()返回的DataStreamReader接口(Scala / Java / Python文檔)建立Streaming DataFrame。
·在R中,使用read.stream()方法。
·與用於建立靜態DataFrame的讀取接口相似,您能夠指定源的詳細信息 - 數據格式,架構,選項等。

Input Sources(輸入源)

·有一些內置源。
·文件來源 - 將目錄中寫入的文件做爲數據流讀取。
·支持的文件格式爲text,csv,json,orc,parquet。
·有關更新的列表,請參閱DataStreamReader接口的文檔,以及每種文件格式支持的選項。
·請注意,文件必須原子地放置在給定目錄中,在大多數文件系統中,能夠經過文件移動操做來實現。
·Kafka來源 - 從Kafka讀取數據。
·它與Kafka經紀人版本0.10.0或更高版本兼容。
·有關更多詳細信息,請參閱Kafka集成指南。
·套接字源(用於測試) - 從套接字鏈接讀取UTF8文本數據。
·偵聽服務器套接字位於驅動程序中。
·請注意,這應僅用於測試,由於這不提供端到端的容錯保證。
·速率源(用於測試) - 以每秒指定的行數生成數據,每一個輸出行包含時間戳和值。
·其中timestamp是包含消息調度時間的Timestamp類型,value是包含消息計數的Long類型,從0開始做爲第一行。
·此源用於測試和基準測試。
·某些源不具備容錯能力,由於它們沒法保證在發生故障後可使用檢查點偏移重放數據。
·請參閱前面的容錯語義部分。
·如下是Spark中全部源代碼的詳細信息。
Source Options Fault-tolerant Notes
File source path: path to the input directory, and common to all file formats. 
maxFilesPerTrigger: maximum number of new files to be considered in every trigger (default: no max) 
latestFirst: whether to process the latest new files first, useful when there is a large backlog of files (default: false) 
fileNameOnly: whether to check new files based on only the filename instead of on the full path (default: false). With this set to `true`, the following files would be considered as the same file, because their filenames, "dataset.txt", are the same: 
"file:///dataset.txt"
"s3://a/dataset.txt"
"s3n://a/b/dataset.txt"
"s3a://a/b/c/dataset.txt"


For file-format-specific options, see the related methods in DataStreamReader(Scala/Java/Python/R). E.g. for "parquet" format options see DataStreamReader.parquet()

In addition, there are session configurations that affect certain file-formats. See the SQL Programming Guide for more details. E.g., for "parquet", see Parquet configuration section.
Yes Supports glob paths, but does not support multiple comma-separated paths/globs.
Socket Source host: host to connect to, must be specified
port: port to connect to, must be specified
No  
Rate Source rowsPerSecond (e.g. 100, default: 1): How many rows should be generated per second.

rampUpTime (e.g. 5s, default: 0s): How long to ramp up before the generating speed becomes rowsPerSecond. Using finer granularities than seconds will be truncated to integer seconds. 

numPartitions (e.g. 10, default: Spark's default parallelism): The partition number for the generated rows. 

The source will try its best to reach rowsPerSecond, but the query may be resource constrained, and numPartitions can be tweaked to help reach the desired speed.
Yes  
Kafka Source See the Kafka Integration Guide. Yes  
       

Here are some examples.sql

spark = SparkSession. ... # Read text from socket socketDF = spark \ .readStream \ .format("socket") \ .option("host", "localhost") \ .option("port", 9999) \ .load() socketDF.isStreaming() # Returns True for DataFrames that have streaming sources socketDF.printSchema() # Read all the csv files written atomically in a directory userSchema = StructType().add("name", "string").add("age", "integer") csvDF = spark \ .readStream \ .option("sep", ";") \ .schema(userSchema) \ .csv("/path/to/directory") # Equivalent to format("csv").load("/path/to/directory")
·這些示例生成無類型的流式DataFrame,這意味着在編譯時不檢查DataFrame的架構,僅在提交查詢時在運行時檢查。
·map,flatMap等一些操做須要在編譯時知道類型。
·要執行這些操做,您可使用與靜態DataFrame相同的方法將這些無類型流式DataFrame轉換爲類型化流式數據集。
·有關更多詳細信息,請參見SQL編程指南。
·此外,有關受支持的流媒體源的更多詳細信息將在本文檔後面討論。

Schema inference and partition of streaming DataFrames/Datasets(流式DataFrames / Datasets的模式推理和分區)

·默認狀況下,基於文件的源的結構化流須要您指定架構,而不是依靠Spark自動推斷它。
·此限制可確保即便在出現故障的狀況下,也將使用一致的架構進行流式查詢。
·對於臨時用例,能夠經過將spark.sql.streaming.schemaInference設置爲true來從新啓用模式推斷。
·當名爲/ key = value /的子目錄存在且列表將自動遞歸到這些目錄中時,會發生分區發現。
·若是這些列出如今用戶提供的模式中,則Spark將根據正在讀取的文件的路徑填充它們。
·構成分區方案的目錄必須在查詢開始時存在,而且必須保持靜態。
·例如,能夠添加/ data / year = 2016 / when / data / year = 2015 /,但更改分區列無效(即經過建立目錄/ data / date = 2016-04-
·17 /)。

Operations on streaming DataFrames/Datasets(流式傳輸DataFrames / Datasets的操做)

·您能夠對流式數據框架/數據集應用各類操做 - 從無類型,相似SQL的操做(例如select,where,groupBy)到類型化RDD類操做(例如map,filter,flatMap)。
·有關更多詳細信息,請參閱SQL編程指南。
·咱們來看看您可使用的一些示例操做。

Basic Operations - Selection, Projection, Aggregation(基本操做 - 選擇,投影,聚合)

·DataFrame / Dataset上的大多數常見操做都支持流式傳輸。
·本節稍後將討論幾個不受支持的操做
df = ... # streaming DataFrame with IOT device data with schema { device: string, deviceType: string, signal: double, time: DateType } # Select the devices which have signal more than 10 df.select("device").where("signal > 10") # Running count of the number of updates for each device type df.groupBy("deviceType").count()

您還能夠將流式DataFrame / Dataset註冊爲臨時視圖,而後在其上應用SQL命令。apache

df.createOrReplaceTempView("updates") spark.sql("select count(*) from updates") # returns another streaming DF

注意,您可使用df.isStreaming來識別DataFrame / Dataset是否具備流數據。編程

df.isStreaming()

Window Operations on Event Time(事件時間的窗口操做)

·使用結構化流式傳輸時,滑動事件時間窗口上的聚合很是簡單,而且與分組聚合很是類似。
·在分組聚合中,爲用戶指定的分組列中的每一個惟一值維護聚合值(例如計數)。
·在基於窗口的聚合的狀況下,爲每一個窗口維護一行的事件時間的聚合值。
·讓咱們經過一個例子來理解這一點。
·想一下,咱們的快速示例已被修改,流如今包含行以及生成行的時間。
·咱們不想運行字數,而是計算10分鐘內的單詞,每5分鐘更新一次。
·也就是說,在10分鐘窗口12:00-12:10,12:05-12:15,12:10-12:20等之間收到的單詞數量。請注意,12:00 - 12:10表示數據
·在12:00以後但在12:10以前到達。
·如今,考慮一下在12:07收到的一個字。
·這個詞應該增長對應於兩個窗口12:00 - 12:10和12:05 - 12:15的計數。
·所以,計數將由分組鍵(即單詞)和窗口(能夠從事件時間計算)二者索引。

結果表看起來以下所示。json

Window Operations

·因爲此窗口相似於分組,所以在代碼中,您可使用groupBy()和window()操做來表示窗口化聚合。
·您能夠在Scala / Java / Python中看到如下示例的完整代碼。
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } # Group the data by window and word and compute the count of each group windowedCounts = words.groupBy( window(words.timestamp, "10 minutes", "5 minutes"), words.word ).count()

Handling Late Data and Watermarking(處理延遲數據和水印)

·如今考慮若是其中一個事件到達應用程序的後期會發生什麼。
·例如,應用程序在12:11能夠接收在12:04(即事件時間)生成的單詞。
·應用程序應使用時間12:04而不是12:11來更新窗口12:00 - 12:10的舊計數。
·這在咱們基於窗口的分組中天然發生 - 結構化流能夠長時間維持部分聚合的中間狀態,以便後期數據能夠正確更新舊窗口的聚合,以下所示。

Handling Late Data

·可是,要運行此查詢數天,系統必須限制它累積的中間內存中狀態的數量。
·這意味着系統須要知道什麼時候能夠從內存狀態中刪除舊聚合,由於應用程序再也不接收該聚合的後期數據。
·爲了實現這一點,咱們在Spark 2.1中引入了水印,使引擎可以自動跟蹤數據中的當前事件時間並嘗試相應地清理舊狀態。
·您能夠經過指定事件時間列以及根據事件時間預計數據的延遲時間來定義查詢的水印。
·對於在時間T結束的特定窗口,引擎將保持狀態並容許延遲數據更新狀態直到(引擎看到的最大事件時間 - 晚閾值> T)。
·換句話說,閾值內的後期數據將被聚合,可是晚於閾值的數據將開始被丟棄(參見本節後面的確切保證)。
·讓咱們經過一個例子來理解這一點。
·咱們可使用withWatermark()在上一個示例中輕鬆定義水印,以下所示。
words = ... # streaming DataFrame of schema { timestamp: Timestamp, word: String } # Group the data by window and word and compute the count of each group windowedCounts = words \ .withWatermark("timestamp", "10 minutes") \ .groupBy( window(words.timestamp, "10 minutes", "5 minutes"), words.word) \ .count()
·在這個例子中,咱們在「timestamp」列的值上定義查詢的水印,而且還將「10分鐘」定義爲容許數據延遲的閾值。
·若是此查詢在更新輸出模式下運行(稍後將在「輸出模式」部分中討論),則引擎將繼續更新結果表中窗口的計數,直到窗口早於水印,該水印落後於列中的當前事件時間「
·時間戳「10分鐘。
·這是一個例子。

Watermarking in Update Mode

·如圖所示,引擎跟蹤的最大事件時間是藍色虛線,而且在每一個觸發開始時設置爲(最大事件時間 - '10分鐘')的水印是紅線。
·例如,當引擎觀察數據(12:14,dog)時,它會將下一次觸發的水印設置爲12:04。
·該水印使發動機保持中間狀態另外10分鐘,以容許計算延遲數據。
·例如,數據(12:09,cat)無序且遲到,它在Windows 12:00-12:10和12:05 - 12:15之間。
·由於它仍然在觸發器中的水印12:04以前,因此引擎仍然將中間計數保持爲狀態而且正確地更新相關窗口的計數。
·可是,當水印更新爲12:11時,窗口的中間狀態(12:00-12:10)被清除,全部後續數據(例如(12:04,驢))被認爲「太晚」,所以忽略了.
·注意,在每次觸發以後,更新的計數(即紫色行)被寫入接收器做爲觸發輸出,如更新模式所指示的。
·某些接收器(例如文件)可能不支持更新模式所需的細粒度更新。
·爲了使用它們,咱們還支持附加模式,其中只有最終計數被寫入接收器。
·這以下圖所示。
·請注意,在非流式數據集上使用withWatermark是no-op。
·因爲水印不該以任何方式影響任何批量查詢,咱們將直接忽略它。

Watermarking in Append Mode

·與以前的更新模式相似,引擎維護每一個窗口的中間計數。
·可是,部分計數不會更新到結果表,也不會寫入接收器。
·引擎等待「10分鐘」以計算延遲日期,而後丟棄窗口<水印的中間狀態,並將最終計數附加到結果表/接收器。
·例如,僅在水印更新爲12:11後,窗口12:00 - 12:10的最終計數纔會附加到結果表中。
Conditions for watermarking to clean aggregation state(用於清除聚合狀態的水印的條件)
·值得注意的是,在聚合查詢中,水印清除狀態必須知足如下條件(從Spark 2.1.1開始,未來可能會有變化)。
·輸出模式必須爲Append或Update。
·完整模式要求保留全部聚合數據,所以不能使用水印來下降中間狀態。
·有關每種輸出模式語義的詳細說明,請參見「輸出模式」部分。
·聚合必須具備事件時間列或事件時間列上的窗口。
·必須在與聚合中使用的時間戳列相同的列上調用withWatermark。
·例如,df.withWatermark(「time」,「1 min」)。groupBy(「time2」)。count()在Append輸出模式中無效,由於水印是在與聚合列不一樣的列上定義的。
·必須在聚合以前調用withWatermark才能使用水印細節。
·例如,df.groupBy(「time」)。count()。withWatermark(「time」,「1 min」)在追加輸出模式下無效。
Semantic Guarantees of Aggregation with Watermarking(帶水印聚合的語義保證)
·水印延遲(使用withWatermark設置)爲「2小時」可確保引擎永遠不會丟棄任何延遲小於2小時的數據。
·換句話說,任何不到2小時(在事件時間方面)的數據都保證彙總到那時處理的最新數據。
·可是,保證只在一個方向嚴格。
·延遲2小時以上的數據不能保證被丟棄;
·它可能會也可能不會聚合。
·更多延遲的是數據,發動機進行處理的可能性較小。

Join Operations(流靜態鏈接)

·結構化流式傳輸支持將流式數據集/數據框架與靜態數據集/數據框架以及另外一個流式數據集/數據框架鏈接起來。
·流鏈接的結果以遞增方式生成,相似於上一節中的流聚合的結果。
·在本節中,咱們將探討在上述狀況下支持哪一種類型的鏈接(即內部,外部等)。
·請注意,在全部受支持的鏈接類型中,與流式數據集/數據框架的鏈接結果與使用包含流中相同數據的靜態數據集/數據框架的結果徹底相同。

Stream-static Joins(流靜態鏈接)

·自Spark 2.0引入以來,Structured Streaming支持流和靜態DataFrame / Dataset之間的鏈接(內鏈接和某種類型的外鏈接)。
·這是一個簡單的例子。
staticDf = spark.read. ... streamingDf = spark.readStream. ... streamingDf.join(staticDf, "type") # inner equi-join with a static DF streamingDf.join(staticDf, "type", "right_join") # right outer join with a static DF
·請注意,流靜態鏈接不是有狀態的,所以不須要進行狀態管理。可是,尚不支持幾種類型的流靜態外鏈接。這些列在此加入部分的末尾。

Stream-stream Joins(流靜態鏈接)

In Spark 2.3, we have added support for stream-stream joins, that is, you can join two streaming Datasets/DataFrames. The challenge of generating join results between two data streams is that, at any point of time, the view of the dataset is incomplete for both sides of the join making it much harder to find matches between inputs. Any row received from one input stream can match with any future, yet-to-be-received row from the other input stream. Hence, for both the input streams, we buffer past input as streaming state, so that we can match every future input with past input and accordingly generate joined results. Furthermore, similar to streaming aggregations, we automatically handle late, out-of-order data and can limit the state using watermarks. Let’s discuss the different types of supported stream-stream joins and how to use them.bootstrap

Inner Joins with optional Watermarking(內部聯合可選水印)
·支持任何類型的列上的內鏈接以及任何類型的鏈接條件。
·可是,當流運行時,流狀態的大小將無限增加,由於必須保存全部過去的輸入,由於任何新輸入均可以與過去的任何輸入匹配。
·爲了不無界狀態,您必須定義其餘鏈接條件,以便無限期舊輸入沒法與未來的輸入匹配,所以能夠從狀態清除。
·換句話說,您必須在鏈接中執行如下附加步驟。
·定義兩個輸入上的水印延遲,以便引擎知道輸入的延遲時間(相似於流聚合)
·在兩個輸入上定義事件時間的約束,使得引擎能夠肯定什麼時候不須要一個輸入的舊行(即,將不知足時間約束)與另外一個輸入匹配。
·能夠用兩種方式之必定義該約束。
·時間範圍鏈接條件(例如...... JOE ON leftTime BETWEEN rightTime AND rightTime + INTERVAL 1 HOUR),
·加入事件時間窗口(例如...... JOIN ON leftTimeWindow = rightTimeWindow)。
·讓咱們經過一個例子來理解這一點。
·假設咱們但願加入一系列廣告展現次數(展現廣告時),並在廣告上添加另外一個用戶點擊流,以便在展現次數達到可獲利的點擊時進行關聯。
·要在此流 - 流鏈接中容許狀態清理,您必須指定水印延遲和時間約束,以下所示。
·水印延遲:好比說,展現次數和相應的點擊次數能夠分別在事件時間內延遲/無序,最多2個小時和3個小時。
·事件時間範圍條件:假設,在相應的印象後0秒到1小時的時間範圍內可能發生咔嗒聲。
·代碼看起來像這樣。
from pyspark.sql.functions import expr impressions = spark.readStream. ... clicks = spark.readStream. ... # Apply watermarks on event-time columns impressionsWithWatermark = impressions.withWatermark("impressionTime", "2 hours") clicksWithWatermark = clicks.withWatermark("clickTime", "3 hours") # Join with event-time constraints impressionsWithWatermark.join( clicksWithWatermark, expr("""  clickAdId = impressionAdId AND  clickTime >= impressionTime AND  clickTime <= impressionTime + interval 1 hour  """) )
Semantic Guarantees of Stream-stream Inner Joins with Watermarking(具備水印的流內部鏈接的語義保證)
·這相似於經過聚合水印提供的保證。水印延遲「2小時」可確保發動機永遠不會丟失任何延遲小於2小時的數據。但延遲2小時以上的數據可能會或可能不會獲得處理。
Outer Joins with Watermarking(帶水印的外部鏈接)
·雖然水印+事件時間約束對於內鏈接是可選的,但對於左外鏈接和右外鏈接,必須指定它們。這是由於爲了在外鏈接中生成NULL結果,引擎必須知道輸入行什麼時候不會與未來的任何內容匹配。所以,必須指定水印+事件時間約束以生成正確的結果。
·所以,使用外部聯接的查詢看起來與以前的廣告貨幣化示例很是類似,只是會有一個附加參數將其指定爲外部聯接
impressionsWithWatermark.join( clicksWithWatermark, expr("""  clickAdId = impressionAdId AND  clickTime >= impressionTime AND  clickTime <= impressionTime + interval 1 hour  """), "leftOuter" # can be "inner", "leftOuter", "rightOuter" )
Semantic Guarantees of Stream-stream Outer Joins with Watermarking(具備水印的流 - 流外鏈接的語義保證)

外鏈接與內部鏈接具備相同的保證,關於水印延遲以及數據是否會被丟棄。api

Caveats(注意事項)
·關於如何生成外部結果,有一些重要的特徵須要注意。
·將生成外部NULL結果,延遲取決於指定的水印延遲和時間範圍條件。
·這是由於引擎必須等待那麼長時間以確保沒有匹配,而且未來不會再有匹配。
·在微批量引擎的當前實現中,水印在微批次結束時前進,而且下一個微批次使用更新的水印來清理狀態並輸出外部結果。
·因爲咱們僅在存在要處理的新數據時才觸發微批處理,所以若是在流中沒有接收到新數據,則外部結果的生成可能會延遲。
·簡而言之,若是鏈接的兩個輸入流中的任何一個在一段時間內沒有接收到數據,則外部(兩種狀況,左側或右側)輸出可能會延遲
Support matrix for joins in streaming queries(支持流式查詢中的鏈接矩陣)
Left Input Right Input Join Type  
Static Static All types Supported, since its not on streaming data even though it can be present in a streaming query
Stream Static Inner Supported, not stateful
Left Outer Supported, not stateful
Right Outer Not supported
Full Outer Not supported
Static Stream Inner Supported, not stateful
Left Outer Not supported
Right Outer Supported, not stateful
Full Outer Not supported
Stream Stream Inner Supported, optionally specify watermark on both sides + time constraints for state cleanup
Left Outer Conditionally supported, must specify watermark on right + time constraints for correct results, optionally specify watermark on left for all state cleanup
Right Outer Conditionally supported, must specify watermark on left + time constraints for correct results, optionally specify watermark on right for all state cleanup
Full Outer Not supported
       
·有關支持鏈接的其餘詳細信息
·鏈接能夠級聯,也就是說,你能夠作df1.join(df2,...)。join(df3,...)。join(df4,....)。
·從Spark 2.3開始,只有在查詢處於追加輸出模式時才能使用鏈接。其餘輸出模式尚不支持。
·從Spark 2.3開始,在鏈接以前不能使用其餘非相似地圖的操做。如下是一些不能使用的例子。
·在加入以前沒法使用流聚合。
·在鏈接以前,沒法在更新模式下使用mapGroupsWithState和flatMapGroupsWithState。

Streaming Deduplication(流式重複數據刪除)

·您可使用事件中的惟一標識符對數據流中的記錄進行重複數據刪除。
·這與使用惟一標識符列的靜態重複數據刪除徹底相同。
·該查詢將存儲來自先前記錄的必要數據量,以便它能夠過濾重複記錄。
·與聚合相似,您可使用帶或不帶水印的重複數據刪除。
·使用水印 - 若是重複記錄的到達時間有上限,則能夠在事件時間列上定義水印,並使用guid和事件時間列進行重複數據刪除。
·該查詢將使用水印從過去的記錄中刪除舊的狀態數據,這些記錄不會再被重複。
·這限制了查詢必須維護的狀態量。
·沒有水印 - 因爲重複記錄可能到達時沒有界限,查詢未來自全部過去記錄的數據存儲爲狀態。
streamingDf = spark.readStream. ... # Without watermark using guid column streamingDf.dropDuplicates("guid") # With watermark using guid and eventTime columns streamingDf \ .withWatermark("eventTime", "10 seconds") \ .dropDuplicates("guid", "eventTime")

Policy for handling multiple watermarks(處理多個水印的政策)

·流式查詢能夠具備多個聯合或鏈接在一塊兒的輸入流。
·每一個輸入流能夠具備不一樣的後期數據閾值,這些閾值須要被容忍用於有狀態操做。
·您能夠在每一個輸入流上使用withWatermarks(「eventTime」,delay)指定這些閾值。
·例如,考慮在inputStream1和inputStream2之間使用流 - 流鏈接的查詢。
·inputStream1.withWatermark(「eventTime1」,「1小時」)。join(inputStream2.withWatermark(「eventTime2」,「2小時」),joinCondition)
·在執行查詢時,Structured Streaming單獨跟蹤每一個輸入流中看到的最大事件時間,根據相應的延遲計算水印,並選擇單個全局水印用於有狀態操做。
·默認狀況下,選擇最小值做爲全局水印,由於它確保若是其中一個流落後於其餘流(例如,其中一個流因上游故障而中止接收數據),則不會意外丟棄數據。
·換句話說,全局水印將以最慢流的速度安全地移動,而且查詢輸出將相應地延遲。
·可是,在某些狀況下,您可能但願得到更快的結果,即便這意味着從最慢的流中刪除數據。
·從Spark 2.4開始,您能夠設置多個水印策略,經過將SQL配置spark.sql.streaming.multipleWatermarkPolicy設置爲max(默認爲min)來選擇最大值做爲全局水印。
·這使得全球水印以最快的速度發展。
·可是,做爲反作用,來自較慢流的數據將被積極地丟棄。
·所以,明智地使用此配置。

Arbitrary Stateful Operations(任意有狀態的行動)

·許多用例須要比聚合更高級的有狀態操做。
·例如,在許多用例中,您必須從事件的數據流中跟蹤會話。
·要進行此類會話,您必須將任意類型的數據保存爲狀態,並使用每一個觸發器中的數據流事件對狀態執行任意操做。
·從Spark 2.2開始,這可使用操做mapGroupsWithState和更強大的操做flatMapGroupsWithState來完成。
·這兩個操做都容許您在分組數據集上應用用戶定義的代碼以更新用戶定義的狀態。
·有關更具體的詳細信息,請查看API文檔(Scala / Java)和示例(Scala / Java)。

Unsupported Operations(不支持的操做)

·流式DataFrames / Datasets不支持一些DataFrame / Dataset操做。其中一些以下。
·流數據集上尚不支持多個流聚合(即,流DF上的聚合鏈)。
·流數據集不支持限制和前N行。
·不支持對流數據集進行不一樣的操做。
·僅在聚合和徹底輸出模式以後,流數據集才支持排序操做。
·不支持流數據集上的幾種外鏈接類型。
·有關詳細信息,請參閱「鏈接操做」部分中的支持矩陣。
·此外,有一些數據集方法不適用於流數據集。
·它們是當即運行查詢並返回結果的操做,這對流式數據集沒有意義。
·相反,這些功能能夠經過顯式啓動流式查詢來完成(請參閱下一節)。
·count() - 沒法從流數據集返回單個計數。
·相反,使用ds.groupBy()。count()返回包含運行計數的流數據集。
·foreach() - 而是使用ds.writeStream.foreach(...)(參見下一節)。
·show() - 而是使用控制檯接收器(參見下一節)。
·若是您嘗試這些操做中的任何一個,您將看到一個AnalysisException,例如「流數據框架/數據集不支持操做XYZ」。
·雖然其中一些可能在將來的Spark版本中獲得支持,但還有一些基本上難以有效地實現流數據。
·例如,不支持對輸入流進行排序,由於它須要跟蹤流中接收的全部數據。
·所以,這基本上難以有效執行。

Starting Streaming Queries(啓動流式查詢)

·一旦定義了最終結果DataFrame / Dataset,剩下的就是開始流式計算。
·爲此,您必須使用經過Dataset.writeStream()返回的DataStreamWriter(Scala / Java / Python文檔)。
·您必須在此界面中指定如下一項或多項。
·輸出接收器的詳細信息:數據格式,位置等。
·輸出模式:指定寫入輸出接收器的內容。
·查詢名稱:可選地,指定查詢的惟一名稱以進行標識。
·觸發間隔:可選擇指定觸發間隔。
·若是未指定,則系統將在前一處理完成後當即檢查新數據的可用性。
·若是因爲先前的處理還沒有完成而錯過了觸發時間,則系統將當即觸發處理。
·檢查點位置:對於能夠保證端到端容錯的某些輸出接收器,請指定系統寫入全部檢查點信息的位置。
·這應該是與HDFS兼容的容錯文件系統中的目錄。
·檢查點的語義將在下一節中詳細討論。

Output Modes(輸出模式)

·有幾種類型的輸出模式。
·追加模式(默認) - 這是默認模式,其中只有自上次觸發後添加到結果表的新行纔會輸出到接收器。
·僅支持那些添加到結果表中的行永遠不會更改的查詢。
·所以,此模式保證每行僅輸出一次(假設容錯接收器)。
·例如,僅使用select,where,map,flatMap,filter,join等的查詢將支持Append模式。
·完成模式 - 每次觸發後,整個結果表將輸出到接收器。
·聚合查詢支持此功能。
·更新模式 - (自Spark 2.1.1起可用)僅將結果表中自上次觸發後更新的行輸出到接收器。
·在未來的版本中添加更多信息。
·不一樣類型的流式查詢支持不一樣的輸出模式。
·這是兼容性矩陣。
Query Type   Supported Output Modes Notes
Queries with aggregation Aggregation on event-time with watermark Append, Update, Complete Append mode uses watermark to drop old aggregation state. But the output of a windowed aggregation is delayed the late threshold specified in `withWatermark()` as by the modes semantics, rows can be added to the Result Table only once after they are finalized (i.e. after watermark is crossed). See the Late Data section for more details. 

Update mode uses watermark to drop old aggregation state. 

Complete mode does not drop old aggregation state since by definition this mode preserves all data in the Result Table.
Other aggregations Complete, Update Since no watermark is defined (only defined in other category), old aggregation state is not dropped. 

Append mode is not supported as aggregates can update thus violating the semantics of this mode.
Queries with mapGroupsWithState Update  
Queries with flatMapGroupsWithState Append operation mode Append Aggregations are allowed after flatMapGroupsWithState.
Update operation mode Update Aggregations not allowed after flatMapGroupsWithState.
Queries with joins Append Update and Complete mode not supported yet. See the support matrix in the Join Operations section for more details on what types of joins are supported.
Other queries Append, Update Complete mode not supported as it is infeasible to keep all unaggregated data in the Result Table.
       

Output Sinks(輸出接收器)

有幾種類型的內置輸出接收器數組

  • 文件接收器 - 將輸出存儲到目錄。
writeStream .format("parquet") // can be "orc", "json", "csv", etc. .option("path", "path/to/destination/dir") .start()
  • Kafka sink - 將輸出存儲到Kafka中的一個或多個主題
writeStream .format("kafka") .option("kafka.bootstrap.servers", "host1:port1,host2:port2") .option("topic", "updates") .start()
  • Foreach接收器 - 對輸出中的記錄運行任意計算。有關詳細信息,請參閱本節後面的內容
writeStream .foreach(...) .start()
  • 控制檯接收器(用於調試) - 每次觸發時將輸出打印到控制檯/標準輸出。支持Append和Complete輸出模式。這應該用於低數據量的調試目的,由於在每次觸發後收集整個輸出並將其存儲在驅動程序的內存中
writeStream .format("console") .start()
  • 內存接收器(用於調試) - 輸出做爲內存表存儲在內存中。支持Append和Complete輸出模式。這應該用於低數據量的調試目的,由於整個輸出被收集並存儲在驅動程序的內存中。所以,請謹慎使用。
writeStream .format("memory") .queryName("tableName") .start()
·某些接收器不具備容錯能力,由於它們不保證輸出的持久性,僅用於調試目的。請參閱前面的容錯語義部分。如下是Spark中全部接收器的詳細信息。
Sink Supported Output Modes Options Fault-tolerant Notes
File Sink Append path: path to the output directory, must be specified. 

For file-format-specific options, see the related methods in DataFrameWriter (Scala/Java/Python/R). E.g. for "parquet" format options see DataFrameWriter.parquet()
Yes (exactly-once) Supports writes to partitioned tables. Partitioning by time may be useful.
Kafka Sink Append, Update, Complete See the Kafka Integration Guide Yes (at-least-once) More details in the Kafka Integration Guide
Foreach Sink Append, Update, Complete None Depends on ForeachWriter implementation More details in the next section
ForeachBatch Sink Append, Update, Complete None Depends on the implementation More details in the next section
Console Sink Append, Update, Complete numRows: Number of rows to print every trigger (default: 20) 
truncate: Whether to truncate the output if too long (default: true)
No  
Memory Sink Append, Complete None No. But in Complete Mode, restarted query will recreate the full table. Table name is the query name.
         
·請注意,您必須調用start()來實際開始執行查詢。這將返回一個StreamingQuery對象,該對象是持續運行的執行的句柄。您可使用此對象來管理查詢,咱們將在下一小節中討論。
·如今,讓咱們經過幾個例子來理解這一切。
# ========== DF with no aggregations ========== noAggDF = deviceDataDf.select("device").where("signal > 10") # Print new data to console noAggDF \ .writeStream \ .format("console") \ .start() # Write new data to Parquet files noAggDF \ .writeStream \ .format("parquet") \ .option("checkpointLocation", "path/to/checkpoint/dir") \ .option("path", "path/to/destination/dir") \ .start() # ========== DF with aggregation ========== aggDF = df.groupBy("device").count() # Print updated aggregations to console aggDF \ .writeStream \ .outputMode("complete") \ .format("console") \ .start() # Have all the aggregates in an in-memory table. The query name will be the table name aggDF \ .writeStream \ .queryName("aggregates") \ .outputMode("complete") \ .format("memory") \ .start() spark.sql("select * from aggregates").show() # interactively query in-memory table
Using Foreach and ForeachBatch(使用Foreach和ForeachBatch)
·foreach和foreachBatch操做容許您在流式查詢的輸出上應用任意操做和編寫邏輯。它們的用例略有不一樣 - 雖然foreach容許在每一行上自定義寫入邏輯,foreachBatch容許在每一個微批量的輸出上進行任意操做和自定義邏輯。讓咱們更詳細地瞭解他們的用法。
ForeachBatch
·foreachBatch(...)容許您指定在流式查詢的每一個微批次的輸出數據上執行的函數。從Spark 2.4開始,Scala,Java和Python都支持它。它須要兩個參數:DataFrame或Dataset,它具備微批次的輸出數據和微批次的惟一ID。
def foreach_batch_function(df, epoch_id): # Transform and write batchDF pass streamingDF.writeStream.foreachBatch(foreach_batch_function).start() 

使用foreachBatch,您能夠執行如下操做。

  • 重用現有的批處理數據源 - 對於許多存儲系統,可能尚未可用的流式接收器,但可能已經存在用於批量查詢的數據寫入器。使用foreachBatch,您能夠在每一個微批次的輸出上使用批處理數據編寫器。
  • 寫入多個位置 - 若是要將流式查詢的輸出寫入多個位置,則能夠簡單地屢次寫入輸出DataFrame / Dataset。可是,每次寫入嘗試都會致使從新計算輸出數據(包括可能從新讀取輸入數據)。要避免從新計算,您應該緩存輸出DataFrame / Dataset,將其寫入多個位置,而後將其解除。這是一個大綱。

    streamingDF.writeStream.foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.persist() batchDF.write.format(…).save(…) // location 1 batchDF.write.format(…).save(…) // location 2 batchDF.unpersist() }

  • 應用其餘DataFrame操做 - 流式DataFrame中不支持許多DataFrame和Dataset操做,由於Spark不支持在這些狀況下生成增量計劃。使用foreachBatch,您能夠在每一個微批輸出上應用其中一些操做。可是,您必須本身解釋執行該操做的端到端語義。

Note:

  • 默認狀況下,foreachBatch僅提供至少一次寫保證。可是,您可使用提供給該函數的batchId做爲重複數據刪除輸出並得到一次性保證的方法。
  • foreachBatch不適用於連續處理模式,由於它從根本上依賴於流式查詢的微批量執行。若是以連續模式寫入數據,請改用foreach。
Foreach
·若是foreachBatch不是一個選項(例如,相應的批處理數據寫入器不存在,或連續處理模式),那麼您可使用foreach表達自定義編寫器邏輯。具體來講,您能夠經過將數據劃分爲三種方法來表達數據寫入邏輯:打開,處理和關閉。
·從Spark 2.4開始,foreach可用於Scala,Java和Python。
·在Python中,您能夠經過兩種方式調用foreach:在函數中或在對象中。該函數提供了一種表達處理邏輯的簡單方法,可是當故障致使某些輸入數據的從新處理時,不容許您對生成的數據進行重複數據刪除。對於這種狀況,您必須在對象中指定處理邏輯。
  1. 該函數將一行做爲輸入。
def process_row(row): # Write row to storage pass query = streamingDF.writeStream.foreach(process_row).start() 
  1. 該對象有一個處理方法和可選的打開和關閉方法:
class ForeachWriter: def open(self, partition_id, epoch_id): # Open connection. This method is optional in Python. pass def process(self, row): # Write row to connection. This method is NOT optional in Python. pass def close(self, error): # Close the connection. This method in optional in Python. pass query = streamingDF.writeStream.foreach(ForeachWriter()).start() 

執行語義啓動流式查詢時,Spark如下列方式調用函數或對象的方法:

  • 此對象的單個副本負責查詢中單個任務生成的全部數據。換句話說,一個實例負責處理以分佈式方式生成的數據的一個分區。
  • 此對象必須是可序列化的,由於每一個任務都將得到所提供對象的新的序列化反序列化副本。所以,強烈建議在調用open()方法以後完成用於寫入數據的任何初始化(例如,打開鏈接或啓動事務),這表示任務已準備好生成數據
  • 方法的生命週期以下:

    • 對於partition_id的每一個分區:

      • 對於epoch_id的流數據的每一個批次/紀元:

        • 方法open(partitionId,epochId)被調用。

        • 若是open(...)返回true,則對於分區和批處理/紀元中的每一行,將調用方法進程(行)

        • 調用方法close(錯誤),在處理行時看到錯誤(若是有)。

  • 若是open()方法存在而且成功返回(無論返回值),則調用close()方法(若是存在),除非JVM或Python進程在中間崩潰。

  • Note: 當失敗致使某些輸入數據的從新處理時,open()方法中的partitionId和epochId可用於對生成的數據進行重複數據刪除。這取決於查詢的執行模式。若是以微批處理模式執行流式查詢,則保證由惟一元組(partition_id,epoch_id)表示的每一個分區具備相同的數據。所以,(partition_id,epoch_id)可用於對數據進行重複數據刪除和/或事務提交,並實現一次性保證。可是,若是正在以連續模式執行流式查詢,則此保證不成立,所以不該用於重複數據刪除。

Triggers(觸發器)

流式查詢的觸發器設置定義了流式數據處理的時間,查詢是做爲具備固定批處理間隔的微批量查詢仍是做爲連續處理查詢來執行。如下是支持的各類觸發器。
Trigger Type Description
unspecified (default) 若是未明確指定觸發設置,則默認狀況下,查詢將以微批處理模式執行,一旦前一個微批處理完成處理,將當即生成微批處理。
Fixed interval micro-batches 查詢將以微批處理模式執行,其中微批處理將以用戶指定的間隔啓動。
  • 若是前一個微批次在該間隔內完成,那麼引擎將等待該間隔結束,而後開始下一個微批次
  • 若是前一個微批次須要的時間長於完成的間隔(即若是錯過了間隔邊界),則下一個微批次將在前一個完成後當即開始(即,它不會等待下一個間隔邊界)。
  • 若是沒有可用的新數據,則不會啓動微批次
One-time micro-batch
查詢將執行*僅一個*微批處理全部可用數據,而後自行中止。這在您但願按期啓動集羣,處理自上一個時間段以來可用的全部內容,而後關閉集羣的方案中很是有用。在某些狀況下,這可能會顯着節省成本。
Continuous with fixed checkpoint interval
(experimental)
查詢將以新的低延遲,連續處理模式執行。
·在下面的連續處理部分中閱讀更多相關信息

如下是一些代碼示例:

# Default trigger (runs micro-batch as soon as it can) df.writeStream \ .format("console") \ .start() # ProcessingTime trigger with two-seconds micro-batch interval df.writeStream \ .format("console") \ .trigger(processingTime='2 seconds') \ .start() # One-time trigger df.writeStream \ .format("console") \ .trigger(once=True) \ .start() # Continuous trigger with one-second checkpointing interval df.writeStream .format("console") .trigger(continuous='1 second') .start()

Managing Streaming Queries(管理流式查詢)

啓動查詢時建立的StreamingQuery對象可用於監視和管理查詢

query = df.writeStream.format("console").start() # get the query object query.id() # get the unique identifier of the running query that persists across restarts from checkpoint data query.runId() # get the unique id of this run of the query, which will be generated at every start/restart query.name() # get the name of the auto-generated or user-specified name query.explain() # print detailed explanations of the query query.stop() # stop the query query.awaitTermination() # block until query is terminated, with stop() or with error query.exception() # the exception if the query has been terminated with error query.recentProgress() # an array of the most recent progress updates for this query query.lastProgress() # the most recent progress update of this streaming query
·您能夠在單個SparkSession中啓動任意數量的查詢。它們將同時運行,共享羣集資源。您可使用sparkSession.streams()來獲取可用於管理當前活動查詢的StreamingQueryManager(Scala / Java / Python文檔)
spark = ... # spark session spark.streams().active # get the list of currently active streaming queries spark.streams().get(id) # get a query object by its unique id spark.streams().awaitAnyTermination() # block until any one of them terminates

Monitoring Streaming Queries(監視流式查詢)

·有多種方法能夠監控活動的流式查詢。您可使用Spark的Dropwizard Metrics支持將指標推送到外部系統,也能夠經過編程方式訪問它們。

Reading Metrics Interactively(以交互方式閱讀度量標準)

·您可使用streamingQuery.lastProgress()和streamingQuery.status()直接獲取活動查詢的當前狀態和指標。lastProgress()返回Scala和Java中的StreamingQueryProgress對象以及Python中具備相同字段的字典。它包含有關在流的最後一次觸發中所取得進展的所 有信息 - 處理了哪些數據,處理速率,延遲等等。還有streamingQuery.recentProgress,它返回最後幾個進展的數組。此外,streamingQuery.status()返回Scala和Java中的StreamingQueryStatus對象以及Python中具備相同字段的字典。它提供了有關查詢當即執行操做的信息 - 觸發器是否處於活動狀態,是否正在處理數據等。
·這裏有一些例子。
query = ... # a StreamingQuery print(query.lastProgress) ''' Will print something like the following. {u'stateOperators': [], u'eventTime': {u'watermark': u'2016-12-14T18:45:24.873Z'}, u'name': u'MyQuery', u'timestamp': u'2016-12-14T18:45:24.873Z', u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'sources': [{u'description': u'KafkaSource[Subscribe[topic-0]]', u'endOffset': {u'topic-0': {u'1': 134, u'0': 534, u'3': 21, u'2': 0, u'4': 115}}, u'processedRowsPerSecond': 200.0, u'inputRowsPerSecond': 120.0, u'numInputRows': 10, u'startOffset': {u'topic-0': {u'1': 1, u'0': 1, u'3': 1, u'2': 0, u'4': 1}}}], u'durationMs': {u'getOffset': 2, u'triggerExecution': 3}, u'runId': u'88e2ff94-ede0-45a8-b687-6316fbef529a', u'id': u'ce011fdc-8762-4dcb-84eb-a77333e28109', u'sink': {u'description': u'MemorySink'}} ''' print(query.status) ''' Will print something like the following. {u'message': u'Waiting for data to arrive', u'isTriggerActive': False, u'isDataAvailable': False} '''

Reporting Metrics programmatically using Asynchronous APIs(使用異步API以編程方式報告度量標準)

您還能夠經過附加StreamingQueryListener(Scala / Java文檔)異步監視與SparkSession關聯的全部查詢。使用sparkSession.streams.attachListener()附加自定義StreamingQueryListener對象後,將在啓動和中止查詢以及活動查詢中取得進展時得到回調。
這是一個例子,
Not available in Python.

Reporting Metrics using Dropwizard(使用Dropwizard報告指標)

Spark支持使用Dropwizard庫報告指標。要同時報告結構化流式查詢的指標,您必須在SparkSession中顯式啓用配置spark.sql.streaming.metricsEnabled。
spark.conf.set("spark.sql.streaming.metricsEnabled", "true") # or spark.sql("SET spark.sql.streaming.metricsEnabled=true")

All queries started in the SparkSession after this configuration has been enabled will report metrics through Dropwizard to whatever sinks have been configured (e.g. Ganglia, Graphite, JMX, etc.).

Recovering from Failures with Checkpointing(經過檢查點從故障中恢復)

若是發生故障或故意關機,您能夠恢復先前查詢的先前進度和狀態,並從中斷處繼續。這是使用檢查點和預寫日誌完成的。您可使用檢查點位置配置查詢,查詢將保存全部進度信息(即每一個觸發器中處理的偏移範圍)和運行聚合(例如快速示例中的字數)到檢查點位置。此檢查點位置必須是HDFS兼容文件系統中的路徑,而且能夠在啓動查詢時設置爲DataStreamWriter中的選項。
aggDF \ .writeStream \ .outputMode("complete") \ .option("checkpointLocation", "path/to/HDFS/dir") \ .format("memory") \ .start()

Recovery Semantics after Changes in a Streaming Query(流式查詢中更改後的恢復語義)

在從同一檢查點位置從新啓動之間容許對流查詢進行哪些更改存在限制。如下是一些不容許的更改,或者更改的效果未明肯定義。
  • 術語「容許」意味着您能夠執行指定的更改,但其效果的語義是否明肯定義取決於查詢和更改.

  • 術語「不容許」意味着您不該該執行指定的更改,由於從新啓動的查詢可能會因不可預測的錯誤而失敗。sdf表示使用sparkSession.readStream生成的流式DataFrame / Dataset

Types of changes(變化的類型)

  • 輸入源的數量或類型(即不一樣來源)的變化:這是不容許的。

  • 輸入源參數的更改:是否容許此更改以及更改的語義是否明肯定義取決於源和查詢。這裏有一些例子。
    • 容許添加/刪除/修改速率限制: spark.readStream.format("kafka").option("subscribe", "topic") to  spark.readStream.format("kafka").option("subscribe", "topic").option("maxOffsetsPerTrigger", ...)
    • 不容許對訂閱的主題/文件進行更改,由於結果是不可預測:spark.readStream.format("kafka").option("subscribe", "topic") to  spark.readStream.format("kafka").option("subscribe", "newTopic")
  • 輸出接收器類型的變化:容許幾個特定接收器組合之間的變化。這須要根據具體狀況進行驗證。這裏有一些例子。
    • File sink to Kafka sink is allowed. Kafka will see only the new data.

    • Kafka sink to file sink is not allowed.

    • Kafka sink changed to foreach, or vice versa is allowed.

  • Changes in the parameters of output sink: Whether this is allowed and whether the semantics of the change are well-defined depends on the sink and the query. Here are a few examples.

    • Changes to output directory of a file sink is not allowed: sdf.writeStream.format("parquet").option("path", "/somePath") to sdf.writeStream.format("parquet").option("path", "/anotherPath")

    • Changes to output topic is allowed: sdf.writeStream.format("kafka").option("topic", "someTopic") to sdf.writeStream.format("kafka").option("topic", "anotherTopic")

    • Changes to the user-defined foreach sink (that is, the ForeachWriter code) is allowed, but the semantics of the change depends on the code.

  • *Changes in projection / filter / map-like operations**: Some cases are allowed. For example:

    • Addition / deletion of filters is allowed: sdf.selectExpr("a") to sdf.where(...).selectExpr("a").filter(...).

    • Changes in projections with same output schema is allowed: sdf.selectExpr("stringColumn AS json").writeStream to sdf.selectExpr("anotherStringColumn AS json").writeStream

    • Changes in projections with different output schema are conditionally allowed: sdf.selectExpr("a").writeStream to sdf.selectExpr("b").writeStream is allowed only if the output sink allows the schema change from "a" to "b".

  • Changes in stateful operations: Some operations in streaming queries need to maintain state data in order to continuously update the result. Structured Streaming automatically checkpoints the state data to fault-tolerant storage (for example, HDFS, AWS S3, Azure Blob storage) and restores it after restart. However, this assumes that the schema of the state data remains same across restarts. This means that any changes (that is, additions, deletions, or schema modifications) to the stateful operations of a streaming query are not allowed between restarts. Here is the list of stateful operations whose schema should not be changed between restarts in order to ensure state recovery:

    • Streaming aggregation: For example, sdf.groupBy("a").agg(...). Any change in number or type of grouping keys or aggregates is not allowed.

    • Streaming deduplication: For example, sdf.dropDuplicates("a"). Any change in number or type of grouping keys or aggregates is not allowed.

    • Stream-stream join: For example, sdf1.join(sdf2, ...) (i.e. both inputs are generated with sparkSession.readStream). Changes in the schema or equi-joining columns are not allowed. Changes in join type (outer or inner) not allowed. Other changes in the join condition are ill-defined.

    • Arbitrary stateful operation: For example, sdf.groupByKey(...).mapGroupsWithState(...) or sdf.groupByKey(...).flatMapGroupsWithState(...). Any change to the schema of the user-defined state and the type of timeout is not allowed. Any change within the user-defined state-mapping function are allowed, but the semantic effect of the change depends on the user-defined logic. If you really want to support state schema changes, then you can explicitly encode/decode your complex state data structures into bytes using an encoding/decoding scheme that supports schema migration. For example, if you save your state as Avro-encoded bytes, then you are free to change the Avro-state-schema between query restarts as the binary state will always be restored successfully.

Continuous Processing(連續處理)

[Experimental]

連續處理是Spark 2.3中引入的一種新的實驗性流執行模式,可實現低(~1 ms)端到端延遲,而且至少具備一次容錯保證。將其與默認的微批處理引擎相比較,該引擎能夠實現一次性保證,但最多可實現~100ms的延遲。
對於某些類型的查詢(在下面討論),您能夠選擇執行它們的模式而無需修改應用程序邏輯(即不更改DataFrame / Dataset操做)。
要在連續處理模式下運行支持的查詢,您只需指定一個連續觸發器,並將所需的檢查點間隔做爲參數。
例如:
spark \ .readStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("subscribe", "topic1") \ .load() \ .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \ .writeStream \ .format("kafka") \ .option("kafka.bootstrap.servers", "host1:port1,host2:port2") \ .option("topic", "topic1") \ .trigger(continuous="1 second") \ # only change in query .start()

A checkpoint interval of 1 second means that the continuous processing engine will records the progress of the query every second. The resulting checkpoints are in a format compatible with the micro-batch engine, hence any query can be restarted with any trigger. For example, a supported query started with the micro-batch mode can be restarted in continuous mode, and vice versa. Note that any time you switch to continuous mode, you will get at-least-once fault-tolerance guarantees.

Supported Queries(支持的查詢)

從Spark 2.3開始,連續處理模式僅支持如下類型的查詢。

  • 操做:在連續模式下僅支持相似地圖的數據集/數據框操做,即僅投影(select,map,flatMap,mapPartitions等)和選擇(where,filter等)。
    • 除了聚合函數(由於尚不支持聚合),current_timestamp()和current_date()(使用時間的肯定性計算具備挑戰性)以外,支持全部SQL函數。
  • Sources:
    • Kafka來源:支持全部選項。
    • Rate source: Good for testing. Only options that are supported in the continuous mode are numPartitions and rowsPerSecond.
  • Sinks:
    • Kafka sink:支持全部選項。
    • Memory sink: Good for debugging.
    • Console sink: Good for debugging. All options are supported. Note that the console will print every checkpoint interval that you have specified in the continuous trigger.
有關它們的更多詳細信息,請參閱輸入源和輸出接收器部分。雖然控制檯接收器很是適合測試,可是使用Kafka做爲源和接收器能夠最好地觀察到端到端的低延遲處理,由於這容許引擎處理數據並使結果在輸出主題中可用
輸入主題中可用的毫秒輸入數據。

Caveats(注意事項)

  • 連續處理引擎啓動多個長時間運行的任務,這些任務不斷從源中讀取數據,處理數據並連續寫入接收器。查詢所需的任務數取決於查詢能夠並行從源讀取的分區數。所以,在開始連續處理查詢以前,必須確保羣集中有足夠的核心並行執行全部任務。
    例如,若是您正在讀取具備10個分區的Kafka主題,則羣集必須至少具備10個核心才能使查詢取得進展。
  • 中止連續處理流可能會產生虛假的任務終止警告。這些能夠安全地忽略。
  • 目前沒有自動重試失敗的任務。任何失敗都將致使查詢中止,而且須要從檢查點手動從新啓動。

Additional Information(附加信息)

Further Reading(進一步閱讀)

Talks

  • Spark Summit Europe 2017
  • Spark Summit 2016
相關文章
相關標籤/搜索