Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.html
Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees.java
However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.python
In this guide, we are going to walk you through the programming model and the APIs. We are going to explain the concepts mostly using the default micro-batch processing model, and then later discuss Continuous Processing model. First, let’s start with a simple example of a Structured Streaming query - a streaming word count.git
Structured Streaming是一個可縮放、容錯的流逝處理引擎,基於Spark SQL引擎構建。當你在處理流計算時,能夠像處理靜態數據批計算同樣。Spark SQL引擎負責不斷地連續運行它,並隨着流數據持續到達而更新最終結果。你能夠在Scala、Java、Python或者R中使用Dataset/DataFrame API來表示流集合(aggregations)、事件時間窗口(event-time windows)、流到批鏈接(stream-to-batch joins)等。計算在同一個優化的Spark SQL引擎上被執行。最終,該系統經過檢查點(checkpoint)和預先寫日誌(Write Ahead Logs)來確保端到端一次性執行的容錯保證(ensures end-to-end exactly-once guarantees)。簡而言之,Structured Streaming提供了快速、可伸縮、容錯、端到端一次性流處理,而用戶無需對流進行推理。github
在內部,默認狀況下,Structured Streaming(結構化流)查詢使用微批處理引擎(a micro-batch procession engine),該微批處理引擎將數據流處理爲一系列小批做業,從而實現低至100毫秒的端到端延遲,而且具備一次性執行容錯保證(and exactly-once fault-tolerance guarantees)。sql
然而,從Spark2.3,咱們引入了一種低延遲處理模式,稱爲連續處理,它能夠實現端到端延遲低至1毫秒,並提供至少一次性能保證。在查詢中不須要修改Dataset/DataFrame操做的狀況下,你將可以基於你的系統需求選擇這種模式。shell
在該向導中,咱們將向你介紹編程模式和API。咱們會解釋大多使用默認的微批處理的概念,而後討論連續處理模型。首先,讓咱們以一個使用Structured Streaming查詢(一個流式的單詞計數)的簡單例子開始。express
Let’s say you want to maintain a running word count of text data received from a data server listening on a TCP socket. Let’s see how you can express this using Structured Streaming. You can see the full code in Scala/Java/Python/R. And if you download Spark, you can directly run the example. In any case, let’s walk through the example step-by-step and understand how it works. First, we have to import the necessary classes and create a local SparkSession, the starting point of all functionalities related to Spark.apache
讓咱們說你想維持一個運行的單詞個數,從數據服務器監聽TCP套接字接收的文本數據。讓咱們看看在Structured Streaming中如何表達。你能夠在Scala/Java/Python/R中查看所有代碼。同時若是你下載download Spark,你能夠直接運行這個例子(run the example.)。不管如何,咱們按部就班地瞭解這個例子,瞭解它如何工做的。首先,咱們導入必要的類並建立本地SparkSession對象,這是全部與Spark相關的功能的切點。編程
import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.sql.*; import org.apache.spark.sql.streaming.StreamingQuery; import java.util.Arrays; import java.util.Iterator; SparkSession spark = SparkSession .builder() .appName("JavaStructuredNetworkWordCount") .getOrCreate();
Next, let’s create a streaming DataFrame that represents text data received from a server listening on localhost:9999, and transform the DataFrame to calculate word counts.
下一步,讓咱們建立一個流式的DataFrame,它用來表示從服務器localhost:9999監聽接收到的文本數據,並轉換DataFrame來計算單詞計數。
// Create DataFrame representing the stream of input lines from connection to localhost:9999 Dataset<Row> lines = spark .readStream() .format("socket") .option("host", "localhost") .option("port", 9999) .load(); // Split the lines into words Dataset<String> words = lines .as(Encoders.STRING()) .flatMap((FlatMapFunction<String, String>) x -> Arrays.asList(x.split(" ")).iterator(), Encoders.STRING()); // Generate running word count Dataset<Row> wordCounts = words.groupBy("value").count();
This lines
DataFrame represents an unbounded table containing the streaming text data. This table contains one column of strings named 「value」, and each line in the streaming text data becomes a row in the table. Note, that this is not currently receiving any data as we are just setting up the transformation, and have not yet started it. Next, we have converted the DataFrame to a Dataset of String using .as(Encoders.STRING())
, so that we can apply the flatMap
operation to split each line into multiple words. The resultant words
Dataset contains all the words. Finally, we have defined the wordCounts
DataFrame by grouping by the unique values in the Dataset and counting them. Note that this is a streaming DataFrame which represents the running word counts of the stream.
We have now set up the query on the streaming data. All that is left is to actually start receiving data and computing the counts. To do this, we set it up to print the complete set of counts (specified by outputMode("complete")
) to the console every time they are updated. And then start the streaming computation using start()
.
DataFrame類型的lines變量表明一個包含流式文本數據的無界表。該表包含列名爲「value」的字符串,並在流文本數據中的一行成爲了表中的一行記錄。請注意,到目前還未接收到任何數據,由於咱們纔剛創建的轉換還未開始。接下來,咱們使用.as(Encoders.STRING())把lines類型從DataFrame轉變爲Dataset,爲了使用flatMap操做來分割每一行記錄中的多個單詞。這個合成的words數據集包含了全部單詞。最後,咱們定義了一個類型爲DataFrame的wordCounts變量,用來在Dataset中按照惟一鍵值進行分組和計數。請注意這是一個流式的DataFrame,它表示了
運行的單詞計數。流式
讓咱們如今已經設置了查詢這個流式數據。剩下的事情是真正開始接收數據、計數。爲此,咱們設置了在每次數據更新時打印counts所有集合(指定的輸出模式(「complete」))到控制檯。而後,調用start()來開始數據流計算。
// Start running the query that prints the running counts to the console StreamingQuery query = wordCounts.writeStream() .outputMode("complete") .format("console") .start(); query.awaitTermination();
After this code is executed, the streaming computation will have started in the background. The query
object is a handle to that active streaming query, and we have decided to wait for the termination of the query using awaitTermination()
to prevent the process from exiting while the query is active。
To actually execute this example code, you can either compile the code in your own Spark application, or simply run the example once you have downloaded Spark. We are showing the latter. You will first need to run Netcat (a small utility found in most Unix-like systems) as a data server by using
在代碼執行後,流運算將會在後臺開始。這query對象是活動流查詢句柄,而且咱們決定使用awaitTermination()終止查詢,以防止進程在查詢處於活動狀態時退出。
要實際執行這段代碼,你能夠在你的spark應用程序中編譯,或者在下載spark代碼以後簡單的運行該示例。咱們在展現後者。首先你須要運行Netcat(在大多數UNIX類系統中發現的小型實用程序)做爲數據服務器。
$ nc -lk 9999
Then, in a different terminal, you can start the example by using
而後在另一個終端上,你能夠運行這個示例。
$ ./bin/run-example org.apache.spark.examples.sql.streaming.JavaStructuredNetworkWordCount localhost 9999
Then, any lines typed in the terminal running the netcat server will be counted and printed on screen every second. It will look something like the following.
而後,運行在netcat服務器的終端中輸入的每一行被計數並每秒打印到屏幕上,它看起來像下邊這樣:
實際運行中,netcat終端運行狀況以下:
在spark job提交的終端上顯示以下:
The key idea in Structured Streaming is to treat a live data stream as a table that is being continuously appended. This leads to a new stream processing model that is very similar to a batch processing model. You will express your streaming computation as standard batch-like query as on a static table, and Spark runs it as an incremental query on the unbounded input table. Let’s understand this model in more detail.
Structured Streaming的核心思想是把在線數據流視爲連續追加的表。這就致使了一個新的流處理模型,它很是相似批處理模式。你能夠把你的流計算像標準的批處理查詢表示爲一個靜態表,而spark將它視爲無界表上的一個增量查詢來運行。讓咱們更詳細的瞭解這種模型。
Consider the input data stream as the 「Input Table」. Every data item that is arriving on the stream is like a new row being appended to the Input Table.
把輸入數據流當作「輸入表」。在流上到達的每條數據記錄被當作「輸入表」新的一行記錄追加進來。
A query on the input will generate the 「Result Table」. Every trigger interval (say, every 1 second), new rows get appended to the Input Table, which eventually updates the Result Table. Whenever the result table gets updated, we would want to write the changed result rows to an external sink.
對於輸入的查詢將生成「結果表(Result Table)」。每次觸發間隔(好比說,每1秒),新的記錄將會追加到輸入表(Input Table),最終被更新到結果表(Result Table)。每當結果表被修改時,咱們但願將更改後的結果行寫入外部接收器。
The 「Output」 is defined as what gets written out to the external storage. The output can be defined in a different mode:
Complete Mode - The entire updated Result Table will be written to the external storage. It is up to the storage connector to decide how to handle writing of the entire table.
Append Mode - Only the new rows appended in the Result Table since the last trigger will be written to the external storage. This is applicable only on the queries where existing rows in the Result Table are not expected to change.
Update Mode - Only the rows that were updated in the Result Table since the last trigger will be written to the external storage (available since Spark 2.1.1). Note that this is different from the Complete Mode in that this mode only outputs the rows that have changed since the last trigger. If the query doesn’t contain aggregations, it will be equivalent to Append mode.
Note that each mode is applicable on certain types of queries. This is discussed in detail later.
"Output"是用來定義寫入外部存儲器的內容。輸出能夠被定義爲不一樣模型:
「Complete 模型」----整個更新後的結果表將會被寫入到外部存儲器。取決於存儲鏈接器來決定如何處理整個表的寫入。
「Append 模型」 ----只有最後一個觸發器中附加的新行將被寫入外部存儲。這僅僅適用於預期結果表中現有行不發生更改的查詢。
「Update 模型」 ----只有最後一個觸發器中在結果表中更新的行被寫入外部存儲(從spark2.1.1纔可使用)。請注意,這與Complete模式不一樣,由於該模式只輸出自上次觸發器以來已經改變的行。若是查詢不包含聚合,那麼等同於Append模式。
注意,每種模型都適用於某些類型的查詢。這將在後面詳細討論。
To illustrate the use of this model, let’s understand the model in context of the Quick Example above. The first lines
DataFrame is the input table, and the final wordCounts
DataFrame is the result table. Note that the query on streaming lines
DataFrame to generate wordCounts
is exactly the same as it would be a static DataFrame. However, when this query is started, Spark will continuously check for new data from the socket connection. If there is new data, Spark will run an 「incremental」 query that combines the previous running counts with the new data to compute updated counts, as shown below.
爲了說明模型的使用,讓咱們在上邊的快捷示例上下文中理解模型。第一行的lines DataFrame是輸入表,最後一行的wordcounts DataFrame是結果表。請注意:流式查詢lines DataFrame生成wordCounts與靜態DataFrame徹底同樣的。可是,當流查詢開始後,Spark將會持續檢查從socket鏈接中而來的新數據。若是有新數據從socket鏈接中進來,Spark將會執行一個「增量」查詢,將先前運行的計數與這些新數據進行結合而後計算更新計數,以下所示:
Note that Structured Streaming does not materialize the entire table. It reads the latest available data from the streaming data source, processes it incrementally to update the result, and then discards the source data. It only keeps around the minimal intermediate state data as required to update the result (e.g. intermediate counts in the earlier example).
This model is significantly different from many other stream processing engines. Many streaming systems require the user to maintain running aggregations themselves, thus having to reason about fault-tolerance, and data consistency (at-least-once, or at-most-once, or exactly-once). In this model, Spark is responsible for updating the Result Table when there is new data, thus relieving the users from reasoning about it. As an example, let’s see how this model handles event-time based processing and late arriving data.
請注意,Structured Streaming不會實現整個表。它從流數據源讀取最新可用的數據,增量地處理它以更新結果,而後丟棄源數據。它只保留最小中間狀態數據以更新結果(例如,較早的示例的中間計數)。
該模型與其餘流處理引擎有很大的不一樣。不少流系統要求用戶要本身去維護運行的聚合,所以必須關注容錯性,數據一致性(至少一次,或至少屢次,或準確地一次)。在這種模型下,Spark的職責就是當有新的數據的狀況下更新結果表,從而減小用戶對其的推理。舉個例子,當咱們看看這種模型是如何處理基於事件的處理和遲到達的數據的。
Event-time is the time embedded in the data itself. For many applications, you may want to operate on this event-time. For example, if you want to get the number of events generated by IoT devices every minute, then you probably want to use the time when the data was generated (that is, event-time in the data), rather than the time Spark receives them. This event-time is very naturally expressed in this model – each event from the devices is a row in the table, and event-time is a column value in the row. This allows window-based aggregations (e.g. number of events every minute) to be just a special type of grouping and aggregation on the event-time column – each time window is a group and each row can belong to multiple windows/groups. Therefore, such event-time-window-based aggregation queries can be defined consistently on both a static dataset (e.g. from collected device events logs) as well as on a data stream, making the life of the user much easier.
Furthermore, this model naturally handles data that has arrived later than expected based on its event-time. Since Spark is updating the Result Table, it has full control over updating old aggregates when there is late data, as well as cleaning up old aggregates to limit the size of intermediate state data. Since Spark 2.1, we have support for watermarking which allows the user to specify the threshold of late data, and allows the engine to accordingly clean up old state. These are explained later in more detail in the Window Operations section.
事件時間是嵌入在數據自身的時間。對於不少應用程序,你均可能但願基於事件時間的操做(運行)。好比,你但願獲取到IoT設備每分鐘產生的事件次數,你可能想使用數據產生的時間(也就是,數據中的事件時間),而不是Spark接收到數據的時間。這個事件時間在這種模型中很天然地表達出來------從設備來的每一個事件都是表(流的無界表)中的一行,並且事件時間是行中的列值。這容許基於窗口的聚合(好比,每分鐘事件次數)只是事件時間列上的一種特殊類型的分組和聚合------每一個時間窗口是一組,每一行可能屬於多個窗口/分組。這樣的基於事件時間窗口統計查詢可使用在靜態數據集(好比,從收集設備的事件日誌)以及數據流上一致地定義,使得用戶的生活更容易。
此外,這種模型也很天然地處理基於其事件時間而到達的比預期時間晚的數據。因爲Spark是正在更新結果表,因此當存在延時數據時,它徹底控制更新舊的聚合,以及清理舊的聚合以限制中間狀態數據的大小。自Spark2.1開始,咱們支持水印(watermarking),容許用戶指定延時數據的閾值,並容許引擎相應地清理舊狀態。稍後將在窗口操做部分對此進行更詳細的說明。