![TOC]html
目前實時數倉須要對多張表進行關聯聚合等複雜操做, 爲了深度理解 Spark Structured Streaming 中聚合、輸出模式(complete、append、update)、窗口操做(window)以及水印(watermark)相關操做;並能讓團隊熟練在實時數倉中使用聚合和窗口相關的操做,現作出如下總結。文中以基本的 wordcount 爲例。以聚合爲基準,與輸出模式、窗口以及水印結合交叉。streaming 數據源爲 socket。java
關於Structured Streaming 詳細介紹,能夠查看Spark Structured Streaming官網,這裏只針對聚合、窗口以及水印相關操做舉例介紹。apache
文中純屬我的理解,難免有不當之處,萬望給予指正,不勝感激。安全
聚合只是純粹的聚合操做,無窗口和水印操做。app
核心代碼以下:socket
import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 數據行轉 word val words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word") val wordCounts = words.groupBy( $"word" ).agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc") // 開啓 query,將數據輸出到控制檯 val query = wordCounts.writeStream .outputMode("complete") .format("console") .option("truncate", false) .start() query.awaitTermination()
運行結果ide
能夠看出,complete 模式下,會把全部的數據都輸出,也就意味着,streaming 會保存全部 batch 的狀態數據,並根據後續 batch 舊的狀態數據作更新。ui
核心代碼spa
val query = wordCounts.writeStream .outputMode("update") .format("console") .option("truncate", false) .start()
運行結果scala
能夠看出,update 模式下, 只會將新增以及狀態改變的數據輸出,一樣streaming 會保存全部 batch 的狀態數據,並根據後續 batch 舊的狀態數據作更新。
聚合狀態下,不使用水印操做,沒法使用該模式。
窗口大小統一爲5分鐘,每三分鐘滑動一次。
核心代碼
import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() // 數據行轉 word val words = lines.as[String].flatMap(_.split(" ")).selectExpr("value as word") .withColumn("timestamp", current_timestamp())//時間戳 .withColumn("cn", lit(1)) val wordCounts = words.groupBy( window($"timestamp", "5 minutes", "3 minutes"), $"word" ).agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc") // 開啓 query,將數據輸出到控制檯 val query = wordCounts.writeStream .outputMode("append") .format("console") .option("truncate", false) .start() query.awaitTermination()
運行結果
complete 模式下,會對全部窗口數據輸出,只會對本身窗口內的事件作聚合,若是某個窗口沒有到達事件,輸出忽略該窗口。
update 模式下,只會輸出當前事件落到的窗口,而且是新增或者更新的 key 纔會被輸出,其他窗口不被輸出。
運行結果
------------------------------------------- Batch: 0 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|1 | |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|world|1 | |[2019-04-17 23:39:00, 2019-04-17 23:44:00]|hello|1 | |[2019-04-17 23:39:00, 2019-04-17 23:44:00]|world|1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 1 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|2 | |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 2 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|3 | |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|cat |1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 3 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|4 | |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|sprk |1 | |[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|1 | |[2019-04-17 23:45:00, 2019-04-17 23:50:00]|sprk |1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 4 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|hello|5 | |[2019-04-17 23:45:00, 2019-04-17 23:50:00]|hello|2 | |[2019-04-17 23:42:00, 2019-04-17 23:47:00]|spark|2 | |[2019-04-17 23:45:00, 2019-04-17 23:50:00]|spark|1 | +------------------------------------------+-----+---+
沒有添加watermark,不支持。
首先說下 watermark 水印是什麼,watermark 是爲了針對處理遲到數據的機制,所謂遲到,舉個例子,12:00這個時刻該到的數據,在12:30到達,該如何處理?watermark 提供了一個時間閾值,簡單的理解能夠說是等待一個 watermark 時間,在"此時間"前的將被拋棄不處理,在"此時間"後的將被正常處理。在 window 操做中,是以當前最大事件時間爲基礎,減去 watermark 閾值獲得"此時間",即***每次窗口滑動***都會從新計算。這裏不作詳細介紹,詳細介紹請查看 Spark 官網,有詳細的圖表介紹:Handle late data and watermark
窗口大小統一爲5分鐘,每三分鐘滑動一次,水印長度三分鐘。
具體代碼,不一樣模式修改 outputMode 便可。
import spark.implicits._ val lines = spark.readStream .format("socket") .option("host", "localhost") .option("port", 9999) .load() val words = lines.as[String] .selectExpr("value") .map(line => { val arr = line.mkString.split(",") WordBean(arr(0), DateUtils.parseSqlTimestamp(arr(1))) }) .withColumn("cn", lit(1)) .withWatermark("timestamp", "3 minutes") .groupBy( window($"timestamp", "5 minutes", "3 minutes"), $"word" ) .agg("cn" -> "sum").withColumnRenamed("sum(cn)", "wc") val query = words.writeStream .outputMode("update") .format("console") .option("truncate", false) .start() query.awaitTermination() case class WordBean(word: String, timestamp: Timestamp)
complete 模式下輸出全部窗口,watermark 無效。省略演示。
在update 模式下,過時數據將被清除。
運行結果
------------------------------------------- Batch: 0 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 1 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 2 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 3 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|1 | |[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|2 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 4 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 5 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|2 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 6 ------------------------------------------- +------+----+---+ |window|word|wc | +------+----+---+ +------+----+---+ ------------------------------------------- Batch: 7 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|1 | |[2019-04-19 09:06:00, 2019-04-19 09:11:00]|spark|1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 8 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 9 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:12:00, 2019-04-19 09:17:00]|spark|1 | |[2019-04-19 09:09:00, 2019-04-19 09:14:00]|spark|2 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 10 ------------------------------------------- +------+----+---+ |window|word|wc | +------+----+---+ +------+----+---+ ------------------------------------------- Batch: 11 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:06:00, 2019-04-19 09:11:00]|world|3 | +------------------------------------------+-----+---+
update 模式下,基本和 window 下輸出結果同樣,值得注意的是水印產生的效果,觀察結果,batch 6時咱們試圖更新[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的值,可是獲得確實空的輸出,這是由於咱們在這以前的 batch 將事件最大值更新到了2019-04-19 09:08:00,水印長度三分鐘,此時[2019-04-19 09:00:00, 2019-04-19 09:05:00]窗口的狀態已經被清除,將得不到更新;一樣在 batch10時,咱們想更新[2019-04-19 09:03:00, 2019-04-19 09:08:00]的值,也沒法實現;尤爲觀察batch 11 最後一條數據"world,2019-04-19 09:07:00",數據能夠落在[2019-04-19 09:06:00, 2019-04-19 09:11:00]和[2019-04-19 09:03:00, 2019-04-19 09:08:00],可是結果只輸出了[2019-04-19 09:06:00, 2019-04-19 09:11:00]窗口的數據。
在update 模式下,過時數據將被清除。
append 模式應該最值得考究的模式了,初用時可能會以爲神奇,爲何沒有數據?明明 source 端發了不少批次,就是看不到數據,尤爲是程序中窗口和水印長度都設置很長的話,可能不得不懷疑是否是本身代碼寫錯了?官網案例寫錯了?
實則否則,update 模式下的機制是:必須在確認某個窗口不在更新時纔會輸出該窗口,即過了水印長度時間。因此設置了數小時窗口的就耐心等待結果吧,若是後續沒有新的事件,那麼你能夠關機睡覺覺了,由於你永遠也看不到你想看到的結果。下面作演示。
------------------------------------------- Batch: 0 ------------------------------------------- +------+----+---+ |window|word|wc | +------+----+---+ +------+----+---+ ------------------------------------------- Batch: 1 ------------------------------------------- +------+----+---+ |window|word|wc | +------+----+---+ +------+----+---+ ------------------------------------------- Batch: 2 ------------------------------------------- +------+----+---+ |window|word|wc | +------+----+---+ +------+----+---+ ------------------------------------------- Batch: 3 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 08:57:00, 2019-04-19 09:02:00]|hello|1 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 4 ------------------------------------------- +------+----+---+ |window|word|wc | +------+----+---+ +------+----+---+ ------------------------------------------- Batch: 5 ------------------------------------------- +------+----+---+ |window|word|wc | +------+----+---+ +------+----+---+ ------------------------------------------- Batch: 6 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:00:00, 2019-04-19 09:05:00]|hello|2 | +------------------------------------------+-----+---+ ------------------------------------------- Batch: 7 ------------------------------------------- +------+----+---+ |window|word|wc | +------+----+---+ +------+----+---+ ------------------------------------------- Batch: 8 ------------------------------------------- +------------------------------------------+-----+---+ |window |word |wc | +------------------------------------------+-----+---+ |[2019-04-19 09:03:00, 2019-04-19 09:08:00]|hello|2 | |[2019-04-19 09:03:00, 2019-04-19 09:08:00]|world|1 | |[2019-04-19 09:03:00, 2019-04-19 09:08:00]|spark|1 | +------------------------------------------+-----+---+
注意,在滑動窗口跟新水印時,即使是肯定前面窗口再也不更新也不會當即輸出,須要等到下一個 batch 觸發,纔會安全的輸出,上述輸出中[2019-04-19 08:57:00, 2019-04-19 09:02:00]|,輸入的前三個 batch,窗口滑動了兩次,在第二個窗口輸入兩個 batch,[2019-04-19 08:57:00, 2019-04-19 09:02:00]|的數據才被輸出;
另外,窗口一旦被輸出,將會被清理,後續不會被更新。
以上,便是聚合、窗口、水印以及各個輸出模式結合過程。