關係型SQL
與stream processing
對好比下。html
SQL | Stream Processing |
---|---|
有限元組 | 無限元組 |
完整數據集上的查詢 | 沒法基於全部數據查詢 |
查詢會結束 | 查詢不會結束 |
Materialized View被定義爲一條SQL查詢,其會緩存查詢結果。但當所查詢的表(基表)被修改時,緩存的結果將過時。
Eager View Maintenance會更新Materialized View,當基表被更新時,會馬上更新Materialized View中緩存的結果。java
Eager View Maintenance和SQL Query在streams
上的關係以下。數據庫
INSERT、UPDATE、DELETE
等DML
語句流的結果,被流稱爲changelog stream。View
,查詢須要不斷處理changelog stream。streaming SQL
查詢結果。動態表是Flink流上Table Api & SQL
的核心概念,其隨時間動態變化;apache
連續查詢的結果等同在輸入表的快照上以批處理模式執行相同查詢的結果。緩存
流、動態表、連續查詢的關係以下圖所示。session
stream
會被轉化爲動態表。stream
。動態表是一個邏輯概念。 在查詢執行期間動態表不必定(徹底)
materialized
。app
爲理解動態表和連續查詢的概念,假設點擊事件流有以下模式。ide
[
user: VARCHAR, // the name of the user
cTime: TIMESTAMP, // the time when the URL was accessed
url: VARCHAR // the URL that was accessed by the user
]this
爲在流上使用關係查詢,流須要被轉化爲表。流的每一個記錄被解釋爲結果表(動態表)上的INSERT
修改,咱們從一個只有INSERT
的changelog
流中構建表。以下圖所示,點擊事件流被轉化爲表,表會隨着點擊事件記錄的插入而不斷增加。編碼
連續查詢做用於動態表並又會產生動態表;連續查詢不會終止並會根據其輸入表(動態表)上的更新來更新其結果表(動態表)。
下面顯示在點擊事件流上定義的clicks
表上顯示兩個查詢示例。
首先是GROUP-BY COUNT
聚合查詢示例。
當查詢開始時,clicks
表爲空;當第一行插入到clicks
表中時,查詢開始計算結果表(動態表),如[Mary, ./home]插入後,結果表包含一行結果[Mary, 1];當插入第二行[Bob, ./cart]時,查詢會更新結果表並插入新記錄[Bob, 1]。第三行[Mary, ./prod=id=1]插入時,查詢會更新結果表中的[Mary, 1]記錄,將其更新爲[Mary, 2]。最後一行[Liz, 1]插入clicks
表後,也會更新到結果表(插入新記錄)。
第二個查詢與第一個查詢相似,除了用戶屬性以外,還在小時滾動窗口上對clicks
表進行分組,而後對URL進行計數(基於時間的計算,如窗口基於特殊的時間屬性)。
每一個小時查詢會計算結果並更新結果表。在cTime
在12:00:00 - 12:59:59
之間,clicks
表存在四條記錄,對應的查詢計算出兩條結果;下個時間窗口(13:00:00 - 13:59:59),clicks
表中存在三條記錄,對應的查詢計算出兩條結果添加值結果表中;當記錄插入至clicks
表中後,結果表也會被動態更新。
上述兩個查詢雖然有些相似(均計算統計聚合分組),但二者也有顯著不一樣:第一個查詢會更新結果表的結果,如定義在結果表上的changelog
流包含INSERT
和UPDATE
;第二個查詢僅僅往結果表中添加記錄,如定義在結果表上的changelog
流只包含INSERT
。一個查詢是否生成僅插入表(INSERT
)或更新表(UPDATE
)有一些含義:生成更新表的查詢必需要維護更多狀態,將僅插入表轉化爲流與將更新表轉化爲流不一樣。
不少查詢能夠等同在流上的連續查詢,一些查詢因爲需維護狀態的大小或計算更新代價大致使查詢計算代價太大。
user
的url
的count
以即可以增長count
,使得當輸入表(左側表)接收一行新數據時會產生新的結果(右側表)。若只跟蹤註冊用戶,那麼維護cnt
大小代價不會太大(註冊用戶量不太大)。但若非註冊用戶也分配惟一的用戶名,則隨着時間的增長,維護cnt
大小代價將增大,最終致使查詢失敗。SELECT user, COUNT(url)
FROM clicks
GROUP BY user;
RANK
。一旦clicks
表收到新行,用戶的lastAction
被更新而且應該計算新的RANK
。然而因爲不存在兩行相同RANK
,因此全部較低RANK
的行也須要被更新。SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);
動態表可像傳統表同樣被INSERT、UPDATE、DELETE
修改。可能只有一行的表被持續更新;或者是沒有UPDATE、DELETE
更改的只插入表。當將動態錶轉化爲流或將其寫入外部系統,這些更改(修改)須要被編碼,Flink
的Table API & SQL
支持三種方式編碼動態表上的更改(修改)。
INSERT更改
進行修改的動態表可經過發出插入的行來轉化爲流。Retract流
包含兩種類型消息(add消息和retract消息
),經過將動態表的INSERT更改
做爲add消息
、將DELETE更改
做爲retract消息
、將UPDATE更改
分解爲舊記錄的retract消息
和新記錄的add消息
。下圖展現了從動態錶轉化爲retract流
。Upsert流
包含兩種類型消息(upset消息和delete消息
),動態錶轉化爲upsert流
須要有主鍵(可複合),具備主鍵的動態表經過將INSERT、UPDATE更改
編碼爲upset消息
,將DELETE更改
編碼爲delete消息
。upset流
與retract流
主要區別是UPDATE更改
使用單一消息(主鍵)進行編碼,所以效率更高。下圖展現了將動態表
轉化爲upset流
。Event time
類型)。上述時間能夠在代碼中指明時間特性。
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default // alternatively: // env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); // env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
Table API & SQL
中基於時間的操做(如窗口)須要設置時間概念和及其來源信息。所以,tables
能夠提供邏輯時間屬性
來指示時間並在table
程序中訪問相應時間戳。時間屬性能夠是表模式
的一部分(從DataStream
中建立表時被定義),或在使用TableSource
時被預約義,一旦時間屬性被定義,那麼其能夠做爲一個字段被引用或進行基於時間的操做。只要時間屬性沒有被修改,只是從查詢的一部分轉發到另外一部分,那麼它仍然是一個有效的時間屬性。時間屬性與常規時間戳相同,可被訪問並計算。若是在計算中使用時間屬性,那麼其將被具象化爲常規時間戳,常規時間戳不兼容Flink
的時間和水位系統,所以不能再用於基於時間的操做。
processing time
容許表程序基於本地機器的時間輸出結果,它不須要提取時間戳和生成水位,有多種方式定義processing time
屬性。
processing time
屬性在模式定義時使用.proctime
屬性定義,時間屬性只能經過額外的邏輯字段擴展物理模式,所以,其可被定義在模式定義的末尾,具體以下。
DataStream<Tuple2<String, String>> stream = ...; // declare an additional logical field as a processing time attribute Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime"); WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
processing time
屬性可經過實現DefinedProctimeAttribute
接口定義,邏輯時間屬性被附加到由TableSource
的返回類型定義的物理模式上。
// define a table source with a processing attribute public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute { @Override public TypeInformation<Row> getReturnType() { String[] names = new String[] {"Username" , "Data"}; TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()}; return Types.ROW(names, types); } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { // create stream DataStream<Row> stream = ...; return stream; } @Override public String getProctimeAttribute() { // field with this name will be appended as a third field return "UserActionTime"; } } // register table source tEnv.registerTableSource("UserActions", new UserActionSource()); WindowedTable windowedTable = tEnv .scan("UserActions") .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
Event time
容許表程序根據每條記錄中包含的時間輸出結果,這樣即便在無序事件或晚到事件狀況下保持一致結果,當從持久化存儲中讀取記錄時還保證可重放結果。此外,event time
容許批和流環境中的表程序使用統一的語法,流環境中的時間屬性能夠是批環境中的記錄的字段。爲處理亂序事件,並區分流中準時和晚到事件,Flink
須要從事件中提取時間戳信息,並在時間戳上進行處理(水位)。event time
屬性可被定義在流到表的轉化中或者使用TableSource。Table API & SQL
假設在上述兩種狀況下,都在DataStream API
中生成時間戳和水位。
event time
屬性在模式定義時經過.rowtime
屬性定義;時間戳和水位必須在轉換的DataStream中已被分配;將DataStream
轉化爲Table
時有以下兩種定義時間屬性的方式。
// Option 1: // extract timestamp and assign watermarks based on knowledge of the stream DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...); // declare an additional logical field as an event time attribute Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime"); // Option 2: // extract timestamp from first field, and assign watermarks based on knowledge of the stream DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...); // the first field has been used for timestamp extraction, and is no longer necessary // replace first field with a logical event time attribute Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data"); // Usage: WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
event time
屬性可經過實現DefinedRowtimeAttribute
接口定義,邏輯時間屬性被附加到由TableSource
的返回類型定義的物理模式上。時間戳和水位必定要在getDataStream
方法返回的流中被分配。
// define a table source with a rowtime attribute public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute { @Override public TypeInformation<Row> getReturnType() { String[] names = new String[] {"Username" , "Data"}; TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()}; return Types.ROW(names, types); } @Override public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) { // create stream // ... // extract timestamp and assign watermarks based on knowledge of the stream DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...); return stream; } @Override public String getRowtimeAttribute() { // field with this name will be appended as a third field return "UserActionTime"; } } // register the table source tEnv.registerTableSource("UserActions", new UserActionSource()); WindowedTable windowedTable = tEnv .scan("UserActions") .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));
無論輸入是有界批量輸入仍是無界流輸入,Table API & SQL
查詢都有相同的語義。在不少狀況下,流上的連續查詢與離線計算具備相同準確的結果。然而,在實際狀況下連續查詢必需要限制其所維護狀態的大小以免使用完存儲空間,並可以在長時間處理無限流數據。所以,連續查詢可能只能根據輸入數據的特徵和查詢自己提供近似準確的結果。
Flink Table API & SQL
接口提供參數調整連續查詢的準確性和資源消耗。參數經過QueryConfig
對象定義,QueryConfig
對象可經過TableEnvironment
獲取並在翻譯表時被傳回。
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env); // obtain query configuration from TableEnvironment StreamQueryConfig qConfig = tableEnv.queryConfig(); // set query parameters qConfig.withIdleStateRetentionTime(Time.hours(12)); // define query Table result = ... // create TableSink TableSink<Row> sink = ... // emit result Table via a TableSink result.writeToSink(sink, qConfig); // convert result Table into a DataStream<Row> DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);
下面描述了QueryConfig
的參數如何影響查詢的準確性和資源消耗的。
不少查詢在一個或多個關鍵屬性上聚合或鏈接記錄(如典型的聚合查詢),當在流上執行該查詢時,連續查詢須要維護記錄或保持每一個鍵的部分結果。若涉及到流的關鍵域(活動鍵值隨時間會變化),隨着不一樣鍵被觀察,連續查詢會積累愈來愈多的狀態。然而,在一段時間後鍵將變得不活動時,它們的對應狀態將變得過時和無效。以下查詢示例中計算每一個session
的clicks
數量。
SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
sessionId
被做爲分組鍵,連續查詢會爲每一個sessionId
維護clicks
數量。sessionId
屬性隨着時間推移而變化,sessionId
值僅在session
結束前處於活動狀態(保持一段時間)。然而,因爲不清楚sessionId
屬性,連續查詢指望每一個sessionId
值在任什麼時候間都有效,即會維護全部sessionId
的值。這樣會致使隨着時間的推移,所維護的sessionId
愈來愈多。
空閒狀態保留時間參數定義鍵的狀態不被更新,在刪除以前保留多長時間。在上述查詢中,sessionId
的計數在指定的配置時間內未被更新時將被移除。當鍵會移除後再次被添加,那麼鍵將會被當成新的鍵(如上述示例中又會開始計0)。有兩個參數配置空閒狀態保留時間,最小空閒狀態保留時間和最大空閒狀態保留時間。
StreamQueryConfig qConfig = ... // set idle state retention time: min = 12 hour, max = 16 hours qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16)); // set idle state retention time. min = max = 12 hours qConfig.withIdleStateRetentionTime(Time.hours(12);
配置不一樣的最小和最大空閒狀態保留時間的效率更高,由於它減小了查詢內部簿記什麼時候刪除狀態的次數。
https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html