4.用法sql
結構化流使用Datasets和DataFrames.從Spark2.0開始,Spark-SQL中的Datasets和DataFrames,就已經能很好表示靜態(有界)數據,動態(無界)數據json
4.1 數據源app
結構化流提供了四種不中斷數據源 file-system,kafka,socket.rate-source socket
4.1.1 socketide
從一個socket鏈接中讀取 UTF-8 的文本數據. <=注意這是一種不可容錯的數據源,建議僅在測試環境中使用.函數
配置參數:測試
host 鏈接地址ui
port 端口號spa
4.1.2 rate-source設計
這是結構流內置的模擬數據生成的數據源.這也是一種不可容錯的數據源.只能在測試環境使用
它每秒,以指定的設置生成N行的數據.每行記錄包含一個timestamp(分發時間)
和 value (消息序號(count),從0開始)
配置參數:
屬性 | 描述 |
rowsPerSecond |
每秒生成N行.默認1 |
rampUpTime |
生成速度. 默認 0(秒) |
numPartitions |
生成數據的分區數.默認讀取 Spark's default parallelism |
4.1.3 文件系統
4.1.3.1 概述&配置
從一個文件目錄讀取文件流. <=可容錯數據源
支持的格式包括: text, csv, json, orc, parquet.須要注意的是,文件必須以原子方式放入目錄(例如文件移動進目標目錄).
配置參數以下:
屬性 | 描述 |
path | 輸入路徑.支持文件通配符匹配,但不支持多個逗號分割的文件通配符匹配 |
maxFilesPerTrigger | 每一個觸發器的最大新文件數量.默認不限 |
latestFirst | 是否先處理新文件.當文件大量積壓時比較有用.默認false |
fileNameOnly | 是否只以文件名而忽略完整路徑,來做爲檢查新文件的標準.默認false |
4.1.3.2 文件系統的元數據接口
默認狀況下,基於文件系統的結構化流須要指定文件的元數據(schema),而不是依靠Spark進行類型推斷
但能夠配置 spark.sql.streaming.schemaInference 設爲true(默認false)來從新啓用 schema inference(元數據接口)
4.1.3.3 文件系統的分區發現
結構化流支持自動以目錄爲憑據的分區發現(key=value)
分區發現時,分區列必須出如今用戶提供的schema中.
當分區列出如今查詢中,此時它將會被路徑上發現的分區信息自動填充.
4.1.2 kafka
從kafka中讀取數據.<=可容錯數據源 (只支持 kafka 0.10.0 或以上版本)
kafka 是最推薦使用的方式,後面詳解
4.2 事件時間窗口操做
4.2.1 概述
在不少狀況下,咱們並不但願針對整個流進行聚合.而是但願經過一個時間窗口(每5分鐘內,或每小時),在一個自定義的小時間分片的範圍內數據進行聚合.
而且這種時間分片的依據,是相似事件時間這種業務概念時間,而不是以結構化流自身維護的收到時間爲依據.
這種狀況下,結構化流提供了建立窗口函數(window).經過指定窗口大小(範圍),再滑動窗口位置進行聚合(sliding event-time window),來很是方便的處理這種狀況
4.2.2 窗口函數使用
窗口函數須要如下兩個參數:
窗口範圍(window length) 就是統計的時間範圍
滑動間隔(sliding interval) 就是統計間隔,每多長時間出一次結果
object StructuredSteamingWindowApp extends App { val spark = SparkSession.builder().master("local[2]").appName("Structured-Steaming-Window-App").getOrCreate(); import spark.implicits._; val lines = spark.readStream.format("socket").option("host", "192.168.178.1").option("port", "9999").load(); //格式化分割:abcd|1531060985000 efg|1531060984000 val words = lines.as[String] .flatMap(line => line.split(" ")) .map(word => word.split("\\|")) .map(wordItem => (wordItem(0), new Timestamp(wordItem(1).toLong))) .as[(String, Timestamp)] .toDF("word", "timestamp"); //窗口範圍,就是統計數據的時間範圍=>30秒範圍內 val windowDuration = "30 seconds"; //滑動時間,就是每多長時間計算一個結果=>每10秒計算一次 //以上綜合就是 每10秒出一次最近30秒的結果 val slideDuration = "30 seconds"; //以窗口計算的事件時間+單詞分組 統計計數 val windowedCounts = words.groupBy(window($"timestamp", windowDuration, slideDuration), $"word").count().orderBy("window") //完整統計結果輸出到控制檯 val query = windowedCounts.writeStream .outputMode("complete") .format("console") .option("truncate", "false") .option("checkpointLocation", "D:\\Tmp") .start() query.awaitTermination() }
輸出結果:
4.2.3 數據延遲與水印
事件窗口很好的解決了自定義的小時間分片的範圍內數據進行聚合這種設計.但實際過程還有一個比較常見的問題:因某種緣由,部分數據到達延遲了.
想象一下,一個應該12:05到達的數據,本應該進入12:00-12:10切片的,但實際在12:15才達到,結果被納入了12:10-12:20的分片中被聚合.
針對這種問題,結構化流提供一種解決方案: 水印(watermarking)
水印是指讓引擎自動的跟蹤數據中的事件事件(current-event-time),並根據用戶指定來清理舊狀態.
用戶能夠指定水印的事件時間列(event time column),和數據預期的延遲閾值.
對於一個從T時間開始的窗口,引擎將保持狀態並將延遲到達的數據(記錄事件時間>系統當前時間-延遲閾值)從新更新狀態.(就是水印時間以內的數據併入計算,水印以外拋棄)
使用:
.withWatermark("事件時間列", "水印狀態持續時間")
使用水印時,務必注意如下:
水印時間,是以當前最大的事件時間減去水印延時的時間,切記它跟窗口範圍沒有關係
輸出模式必須是追加或者更新(不支持徹底模式). 由於水印是去除過時數據,與徹底模式保留全部聚合數據衝突了(徹底模式下,水印將會失效)
使用水印的聚合必須具備事件時間列或運行在有事件時間的窗口上.而且水印的時間列必須與事件時間列一致.
使用水印必須在聚合以前使用,不然水印不會起效
追加模式下的水印:
append的核心之處在於append不容許變動.因此必須是能產生最終結果的時候才能輸出計算結果.而水印狀態下,是可能有由於數據延時然後到前插的數據的
因此水印狀態append模式下的最終結果輸出,是在水印時間完全離開一個窗口的範圍後,纔會對窗口數據進行計算
這是官網的圖:
12:14
Row(12:14,dog)到達,它屬於窗口(12:10-12:20).
此時的水印時間是12:04=12:14(水印的基準不是窗口時間12:15,而是當前記錄裏最大事件時間12:14)-10
而水印時間(12:04),還在窗口(12:00-12:10)中,
因此窗口(12:00-12:10)沒有任何結果輸出,而且若是此時有Row(12:09)延遲到如今纔到,是能夠被窗口(12:00-12:10)正確統計的
12:15
Row(12:15,cat) 同上.
12:21
Row(12:21,owi)到達,它屬於窗口(12:20-12:30)
此時的水印時間是12:11(12:21-10),離開窗口(12:10-12:20)
直到此時,當窗口(12:20-12:30)被12:25的觸發器觸發,纔會生成12:10-12:20的最終結果.此時Row(12:08)數據會被忽略
4.3 兩個流數據之間的鏈接
從Spark2.3開始,結構化流提供了兩個流數據之間的join.
兩個流數據鏈接的困難之處在於,兩個無界數據集,其中一個的記錄,可能會與另外一數據集的未來某一條記錄匹配.
因次,流數據之間的鏈接,會將過去的輸入緩衝爲流狀態.以便在收到每一條記錄時都會嘗試與過去的輸入鏈接,並相應的生成匹配結果.
而且這種鏈接依然保持自動處理延遲和無序的數據,並可使用水印限制狀態.下面是一些流數據鏈接的介紹
4.3.1 內鏈接(inner join)和水印
流數據內聯支持任意字段和任意條件內聯.
可是隨着流數據的不斷進入,流狀態數據也會不斷的增長.因此必須定義鏈接的附加條件,能讓流狀態可以將明確不可能再有機會與未來數據匹配的數據清除.
換句話說,必須再內鏈接中定義如下步驟:
i).在兩個流上都定義水印延遲,從而讓流數據能被水印優雅的去除(與流聚合相似)
ii).定義帶事件時間的約束條件,從而能讓引擎可以根據這些條件從流狀態去撿出不符合條件的數據.好比以下兩種方式
鏈接條件上帶事件時間過濾
帶事件時間的窗口
4.3.2 外鏈接(out join)和水印
外鏈接基本與內鏈接基本相同.但外鏈接須要更加註意的是可能大量存在的Null值,而這些Null值在將來又是可能有匹配的,因此在時間附加條件上必需要作的更加嚴格.
對於外鏈接,有一些重要的特徵以下:
i).對於Null或不匹配的值,必須有一個完善的方案(水印或者時間區域),來肯定將來不會有匹配從而優雅的移除
ii).在微批處理的引擎中,水印在微批處理的最後作了改進: 由下一個批次纔會使用更新後的水印清洗結果來輸出外部結果.
因爲只有在出現新數據時纔會觸發微批處理,所以當某(任)一個數據流沒有輸入數據的時候,都會形成外部結果的延遲
另外,對於鏈接還有一些須要知道的:
i).鏈接是級聯.便可以是 df1.join(df2, ...).join(df3, ...).join(df4, ....)
ii).從Spark2.3開始,只有在查詢輸出模式爲 Append output mode 時,才能使用鏈接.其它輸出模式不支持
iii).在join以前,不能有 非map系(non-map-like) 的操做.好比說,如下操做是不容許的
在join以前,不能有任何聚合操做
在Update mode下,join以前不能使用mapGroupsWithState 或者flatMapGroupsWithState 之類的操做
關於鏈接具體支持以下:
左 | 右 | join類型 | 描述 |
Static | Static | 支持全部join類型 | 這是靜態數據集而非流式數據集 |
Stream | Static | Inner | 支持鏈接,但不支持流狀態 |
Left Outer | 支持鏈接,但不支持流狀態 | ||
Right Outer | 不支持 | ||
Full Outer | 不支持 | ||
Static | Stream | Inner | 支持鏈接,但不支持流狀態 |
Left Outer | 不支持 | ||
Right Outer | 支持鏈接,但不支持流狀態 | ||
Full Outer | 不支持 | ||
Stream | Stream | Inner | 支持鏈接,支持流狀態.而且水印和事件時間範圍可選 |
Left Outer | 有條件的支持鏈接和流狀態.對右數據必須使用水印或者事件範圍條件,對左數據是可選水印和事件時間範圍條件 | ||
Right Outer | 有條件的支持鏈接和流狀態.對左數據必須使用水印或者事件範圍條件,對右數據是可選水印和事件時間範圍條件 | ||
Full Outer | 不支持 |
4.4 流數據的去重
結構化流支持以數據中的某一惟一標識符(unique identifier)爲憑據,對流數據內的記錄進行重複刪除(與靜態數據批處理的惟一標識列重複消除用法徹底相同).
重複刪除完成機制是將暫時存儲先前的記錄,以即可以查詢過濾重複的記錄.注意這種暫存不與水印綁定,你能夠選擇水印,也能夠不選擇使用水印來完成暫存查詢過濾重複的功能
i).水印完成
若是數據延遲有其上限(這個上限指超過延遲後能夠直接丟棄),則能夠在事件時間上定義水印,並使用一個Guid和事件時間列完成去重
ii).非水印完成
若是數據延遲沒有界限(指再晚到達都必須接受處理不能丟棄),這將查詢全部過去記錄的存儲狀態
使用:
.dropDuplicates("去除條件列".....) <=去重
4.5 有狀態的操做
???
4.6 streaming DataFrames/Datasets 與 DataFrames/Datasets 的一些不一樣之處
streaming Datasets 不支持 Multiple streaming aggregations (多個流聚合)(即 streaming DF 上的聚合鏈)
streaming Datasets 不支持 Limit and take first N rows
streaming Datasets 上的 Distinct operations 不支持
只有在 aggregation 和 Complete Output Mode 下,streaming Datasets 才支持排序操做
有條件的流數據集鏈接(詳見兩個流數據集鏈接)
此外,一些當即返回結果的操做對流數據也沒有意義,好比:
count(),流數據不能返回計算,只能使用ds.groupBy().count()返回一個流媒體數據集,其中包含一個運行計數
foreach()
- 使用 ds.writeStream.foreach(...) 代替
show()-使用 控制檯接收器 代替
嘗試進行這些不支持的操做,會拋出一些 相似"operation XYZ is not supported with streaming DataFrames/Datasets"的AnalysisException