Spark Structured Streaming框架(4)之窗口管理詳解

1. 結構

1.1 概述

  Structured Streaming組件滑動窗口功能由三個參數決定其功能:窗口時間、滑動步長和觸發時間.html

  • 窗口時間:是指肯定數據操做的長度;
  • 滑動步長:是指窗口每次向前移動的時間長度;
  • 觸發時間:是指Structured Streaming將數據寫入外部DataStreamWriter的時間間隔。

圖 11 java

1.2 API

  用戶管理Structured Streaming的窗口功能,能夠分爲兩步完成:sql

1) 定義窗口和滑動步長

  API是經過一個全局的window方法來設置,以下所示是其Spark實現細節:apache

def window(timeColumn:Column, windowDuratiion:String, slideDuration:String):Column ={ windows

window(timeColumn, windowDuration, slideDuration, "0" second) app

}less

  • timecolumn:具備時間戳的列;
  • windowDuration:爲窗口的時間長度;
  • slideDuration:爲滑動的步長;
  • return:返回的數據類型是Column
2) 設置

  Structured Streaming在經過readStream對象的load方法加載數據後,悔返回一個DataFrame對象(Dataset[T]類型)。因此用戶將上述定義的Column對象傳遞給DataFrame對象,從而就實現了窗口功能的設置。socket

  因爲window方法返回的數據類型是Column,因此只要DataFrame對象方法中具備columnl類型的參數就能夠進行設置。如Dataset的select和groupBy方法。以下是Spark源碼中select和groupBy方法的實現細節:ide

def select (cols:Column*):DataFrame = withPlan{ ui

Project(cols.map(_.named),logicalPlan)

}

def groupBy(cols:Column*):RelationGroupedDataset={

RelationGroupedDataset(toDF(), cols.map(_.expr), RelationGroupedDataset.GroupByType)

}

1.3 類型

  如上述介紹的Structured Streaming API,根據Dataset提供的方法,咱們能夠將其分爲兩類:

  1. 聚合操做:是指具備對數據進行組合操做的方法,如groupBy方法;
  2. 非聚合操做:是指普通的數據操做方法,如select方法

PS:

    兩類操做都有明確的輸出形式(outputMode),不能混用。

2. 聚合操做

2.1 操做方法

  聚合操做是指接收到的數據DataFrame先進行groupBy等操做,器操做的特徵是返回RelationGroupedDataset類型的數據。若Structured Streaming存在的聚合操做,那麼輸出形式必須爲"complete",不然程序會出現異常。

以下所示的聚合操做示例:

Import spark.implicits._

Val words = … // streaming DataFrame of schema{timestamp:timestamp, word:String}

val windowedCounts = words.groupBy(

window($"timestamp","10 minutes","5 minutes"),

$"word"

).count()

2.2 example

  本例是Spark程序自帶的example,其功能是接收socket數據,在接受socket數據,在接受完數據後將數據按空格" "進行分割;而後統計每一個單詞出現的次數;最後按時間戳排序輸出。

以下具體程序內容:

package org.apache.spark.examples.sql.streaming

 

import java.sql.Timestamp

 

import org.apache.spark.sql.SparkSession

import org.apache.spark.sql.functions._

 

/**

* Counts words in UTF8 encoded, '\n' delimited text received from the network over a

* sliding window of configurable duration. Each line from the network is tagged

* with a timestamp that is used to determine the windows into which it falls.

*

* Usage: StructuredNetworkWordCountWindowed <hostname> <port> <window duration>

* [<slide duration>]

* <hostname> and <port> describe the TCP server that Structured Streaming

* would connect to receive data.

* <window duration> gives the size of window, specified as integer number of seconds

* <slide duration> gives the amount of time successive windows are offset from one another,

* given in the same units as above. <slide duration> should be less than or equal to

* <window duration>. If the two are equal, successive windows have no overlap. If

* <slide duration> is not provided, it defaults to <window duration>.

*

* To run this on your local machine, you need to first run a Netcat server

* `$ nc -lk 9999`

* and then run the example

* `$ bin/run-example sql.streaming.StructuredNetworkWordCountWindowed

* localhost 9999 <window duration in seconds> [<slide duration in seconds>]`

*

* One recommended <window duration>, <slide duration> pair is 10, 5

*/

object StructuredNetworkWordCountWindowed {

 

def main(args: Array[String]) {

if (args.length < 3) {

System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +

" <window duration in seconds> [<slide duration in seconds>]")

System.exit(1)

}

 

val host = args(0)

val port = args(1).toInt

val windowSize = args(2).toInt

val slideSize = if (args.length == 3) windowSize else args(3).toInt

if (slideSize > windowSize) {

System.err.println("<slide duration> must be less than or equal to <window duration>")

}

val windowDuration = s"$windowSize seconds"

val slideDuration = s"$slideSize seconds"

 

val spark = SparkSession

.builder

.appName("StructuredNetworkWordCountWindowed")

.getOrCreate()

 

import spark.implicits._

 

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.option("includeTimestamp", true) //輸出內容包括時間戳

.load()

 

// Split the lines into words, retaining timestamps

val words = lines.as[(String, Timestamp)].flatMap(line =>

line._1.split(" ").map(word => (word, line._2))

).toDF("word", "timestamp")

 

// Group the data by window and word and compute the count of each group

//設置窗口大小和滑動窗口步長

val windowedCounts = words.groupBy(

window($"timestamp", windowDuration, slideDuration), $"word"

).count().orderBy("window")

 

// Start running the query that prints the windowed word counts to the console

//因爲採用聚合操做,因此須要指定"complete"輸出形式。指定"truncate"只是爲了在控制檯輸出時,不進行列寬度自動縮小。

val query = windowedCounts.writeStream

.outputMode("complete")

.format("console")

.option("truncate", "false")

.start()

 

query.awaitTermination()

}

}

 

3. 非聚合操做

3.1 操做方法

  非聚合操做是指接收到的數據DataFrame進行select等操做,其操做的特徵是返回Dataset類型的數據。若Structured Streaming進行非聚合操做,那麼輸出形式必須爲"append",不然程序會出現異常。若spark 2.1.1 版本則輸出形式開能夠是"update"。

3.2 example

  本例功能只是簡單地將接收到的數據保持原樣輸出,不進行任何其它操做。只是爲了觀察Structured Streaming的窗口功能。以下所示:

object StructuredNetworkWordCountWindowed {

 

def main(args: Array[String]) {

if (args.length < 3) {

System.err.println("Usage: StructuredNetworkWordCountWindowed <hostname> <port>" +

" <window duration in seconds> [<slide duration in seconds>]")

System.exit(1)

}

 

val host = args(0)

val port = args(1).toInt

val windowSize = args(2).toInt

val slideSize = if (args.length == 3) windowSize else args(3).toInt

    val triggerTime = args(4).toInt

if (slideSize > windowSize) {

System.err.println("<slide duration> must be less than or equal to <window duration>")

}

val windowDuration = s"$windowSize seconds"

val slideDuration = s"$slideSize seconds"

 

val spark = SparkSession

.builder

.appName("StructuredNetworkWordCountWindowed")

.getOrCreate()

 

import spark.implicits._

 

// Create DataFrame representing the stream of input lines from connection to host:port

val lines = spark.readStream

.format("socket")

.option("host", host)

.option("port", port)

.option("includeTimestamp", true)

.load()

 

    val wordCounts:DataFrame = lines.select(window($"timestamp",windowDuration,slideDuration),$"value")

 

 

// Start running the query that prints the windowed word counts to the console

val query = wordCounts.writeStream

.outputMode("append")

.format("console")

     .trigger(ProcessingTime(s"$triggerTime seconds"))

.option("truncate", "false")

.start()

 

query.awaitTermination()

}

}

#nc –lk 9999

1

2

3

4

5

6

#spark-submit –class structuredNetWordCount ./sparkStreaming.jar localhost 9999 3 2 1

輸出:

Batch:0

+---------------------------------------+-----+

|window |value|

|[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|1 |

|[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|2 |

+---------------------------------------+-----+

 

Batch:1

+---------------------------------------+-----+

|window |value|

|[2017-05-16 11:14:15.0,2017-05-16 11:14:19.0]|3 |

|[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|3 |

|[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|4 |

+---------------------------------------+-----+

 

Batch:2

+---------------------------------------+-----+

|window |value|

|[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|5 |

|[2017-05-16 11:14:18.0,2017-05-16 11:14:22.0]|6 |

|[2017-05-16 11:14:21.0,2017-05-16 11:14:25.0]|6 |

+---------------------------------------+-----+

 

4. 參考文獻

相關文章
相關標籤/搜索