Spark Structured Steaming 聚合、watermark 以及 window操做,結合輸出模式的研究總結

![TOC]html

1、 背景

目前實時數倉須要對多張表進行關聯聚合等複雜操做, 爲了深度理解 Spark Structured Streaming 中聚合、輸出模式(complete、append、update)、窗口操做(window)以及水印(watermark)相關操做;並能讓團隊熟練在實時數倉中使用聚合和窗口相關的操做,現作出如下總結。文中以基本的 wordcount 爲例。以聚合爲基準,與輸出模式、窗口以及水印結合交叉。streaming 數據源爲 socket。java

關於Structured Streaming 詳細介紹,能夠查看Spark Structured Streaming官網,這裏只針對聚合、窗口以及水印相關操做舉例介紹。apache

文中純屬我的理解,難免有不當之處,萬望給予指正,不勝感激。安全

2、 聚合

聚合只是純粹的聚合操做,無窗口和水印操做。app

1. complete 模式

核心代碼以下:socket

import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    // 數據行轉 word
    val words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word")
    val wordCounts = words.groupBy(
      $"word"
    ).agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
    // 開啓 query,將數據輸出到控制檯
    val query = wordCounts.writeStream
      .outputMode("complete")
      .format("console")
      .option("truncate", false)
      .start()
    query.awaitTermination()

運行結果ide

能夠看出,complete 模式下,會把全部的數據都輸出,也就意味着,streaming 會保存全部 batch 的狀態數據,並根據後續 batch 舊的狀態數據作更新。ui

2. update 模式

核心代碼spa

val query = wordCounts.writeStream
      .outputMode("update")
      .format("console")
      .option("truncate", false)
      .start()

運行結果scala

能夠看出,update 模式下, 只會將新增以及狀態改變的數據輸出,一樣streaming 會保存全部 batch 的狀態數據,並根據後續 batch 舊的狀態數據作更新。

3. append 模式

聚合狀態下,不使用水印操做,沒法使用該模式。

3、聚合+窗口

窗口大小統一爲5分鐘,每三分鐘滑動一次。

1. complete 模式

核心代碼

import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    // 數據行轉 word
    val words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word")
      .withColumn("timestamp", current_timestamp())//時間戳
			.withColumn("cn", lit(1))
    val wordCounts = words.groupBy(
      window($"timestamp", "5 minutes", "3 minutes"),
      $"word"
    ).agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
    // 開啓 query,將數據輸出到控制檯
    val query = wordCounts.writeStream
      .outputMode("append")
      .format("console")
      .option("truncate", false)
      .start()
    query.awaitTermination()

運行結果

complete 模式下,會對全部窗口數據輸出,只會對本身窗口內的事件作聚合,若是某個窗口沒有到達事件,輸出忽略該窗口。

2. update 模式

update 模式下,只會輸出當前事件落到的窗口,而且是新增或者更新的 key 纔會被輸出,其他窗口不被輸出。

運行結果

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|1  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|world|1  |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|hello|1  |
|[2019-04-17 23:39:00, 2019-04-17 23:44:00]|world|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|2  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|3  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|cat  |1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|4  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|sprk |1  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|1  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|sprk |1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|5  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|2  |
|[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|2  |
|[2019-04-17 23:45:00, 2019-04-17 23:50:00]|spark|1  |
+------------------------------------------+-----+---+

3. append 模式

沒有添加watermark,不支持。

3、聚合+窗口+水印

首先說下 watermark 水印是什麼,watermark 是爲了針對處理遲到數據的機制,所謂遲到,舉個例子,12:00這個時刻該到的數據,在12:30到達,該如何處理?watermark 提供了一個時間閾值,簡單的理解能夠說是等待一個 watermark 時間,在"此時間"前的將被拋棄不處理,在"此時間"後的將被正常處理。在 window 操做中,是以當前最大事件時間爲基礎,減去 watermark 閾值獲得"此時間",即***每次窗口滑動***都會從新計算。這裏不作詳細介紹,詳細介紹請查看 Spark 官網,有詳細的圖表介紹:Handle late data and watermark

窗口大小統一爲5分鐘,每三分鐘滑動一次,水印長度三分鐘。

具體代碼,不一樣模式修改 outputMode 便可。

import spark.implicits._
    val lines = spark.readStream
      .format("socket")
      .option("host", "localhost")
      .option("port", 9999)
      .load()
    val words = lines.as[String]
      .selectExpr("value")
      .map(line => {
        val arr = line.mkString.split(",")
        WordBean(arr(0), DateUtils.parseSqlTimestamp(arr(1)))
      })
      .withColumn("cn", lit(1))
      .withWatermark("timestamp", "3 minutes")
      .groupBy(
        window($"timestamp", "5 minutes", "3 minutes"),
        $"word"
      )
      .agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc")
      val query = words.writeStream
            .outputMode("update")
            .format("console")
            .option("truncate", false)
            .start()
      query.awaitTermination()

		case class WordBean(word: String, timestamp: Timestamp)

1. complete 模式

complete 模式下輸出全部窗口,watermark 無效。省略演示。

2. update 模式

在update 模式下,過時數據將被清除。

運行結果

-------------------------------------------
Batch: 0
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|1  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 5
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 6
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 7
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|1  |
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|spark|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 9
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:12:00, 2019-04-19 09:17:00]|spark|1  |
|[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 10
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+
-------------------------------------------
Batch: 11
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|3  |
+------------------------------------------+-----+---+

update 模式下,基本和 window 下輸出結果同樣,值得注意的是水印產生的效果,觀察結果,batch 6時咱們試圖更新[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的值,可是獲得確實空的輸出,這是由於咱們在這以前的 batch 將事件最大值更新到了2019-04-19 09:08:00,水印長度三分鐘,此時[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的狀態已經被清除,將得不到更新;一樣在 batch10時,咱們想更新[2019-04-19 09:03:00, 2019-04-19 09:08:00]的值,也沒法實現;尤爲觀察batch 11 最後一條數據"world,2019-04-19 09:07:00",數據能夠落在[2019-04-19 09:06:00, 2019-04-19 09:11:00]和[2019-04-19 09:03:00, 2019-04-19 09:08:00],可是結果只輸出了[2019-04-19 09:06:00, 2019-04-19 09:11:00]窗口的數據。

在update 模式下,過時數據將被清除。

3. append 模式

append 模式應該最值得考究的模式了,初用時可能會以爲神奇,爲何沒有數據?明明 source 端發了不少批次,就是看不到數據,尤爲是程序中窗口和水印長度都設置很長的話,可能不得不懷疑是否是本身代碼寫錯了?官網案例寫錯了?

實則否則,update 模式下的機制是:必須在確認某個窗口不在更新時纔會輸出該窗口,即過了水印長度時間。因此設置了數小時窗口的就耐心等待結果吧,若是後續沒有新的事件,那麼你能夠關機睡覺覺了,由於你永遠也看不到你想看到的結果。下面作演示。

-------------------------------------------
Batch: 0
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 1
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 2
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 3
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 08:57:00, 2019-04-19 09:02:00]|hello|1  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 4
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 5
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 6
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2  |
+------------------------------------------+-----+---+

-------------------------------------------
Batch: 7
-------------------------------------------
+------+----+---+
|window|word|wc |
+------+----+---+
+------+----+---+

-------------------------------------------
Batch: 8
-------------------------------------------
+------------------------------------------+-----+---+
|window                                    |word |wc |
+------------------------------------------+-----+---+
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1  |
|[2019-04-19 09:03:00, 2019-04-19 09:08:00]|spark|1  |
+------------------------------------------+-----+---+

注意,在滑動窗口跟新水印時,即使是肯定前面窗口再也不更新也不會當即輸出,須要等到下一個 batch 觸發,纔會安全的輸出,上述輸出中[2019-04-19 08:57:00, 2019-04-19 09:02:00]|,輸入的前三個 batch,窗口滑動了兩次,在第二個窗口輸入兩個 batch,[2019-04-19 08:57:00, 2019-04-19 09:02:00]|的數據才被輸出;

另外,窗口一旦被輸出,將會被清理,後續不會被更新。

4、總結

以上,便是聚合、窗口、水印以及各個輸出模式結合過程。

相關文章
相關標籤/搜索