動態表的概念是社區很早就提出的但並無所有實現,下文中全部介紹都是基於已有規劃和proposal給出的,可能與以後實現存在出入僅供參考複製代碼
動態表直觀上看是一個相似於數據庫中的Materialized View
概念。動態表隨着時間改變;相似靜態的batch table同樣能夠用標準SQL進行查詢而後一個新的動態表;能夠和流無損地互相轉換(對偶的)。對現有的API最大的改進關鍵在表的內容隨着時間改變,而如今的狀態只是append。當前的streaming table能夠認爲是一種動態表,append模式的動態表。javascript
流被轉換成Table時決定選擇哪一種模式是依據表的schema是否認義primary key。java
若是表的schema沒有包括key的定義那轉換成表時採用append模式。把流中每條新來的record當作新的row append到表中。一旦數據加到表中就不能再被更新和刪除(指當前表中,不考慮轉換成新表)。sql
相對應,若是定義了key,那麼對於流中的每條記錄若是key不在表中就insert不然就update。數據庫
表到流的操做是把表的全部change以changelog stream的方式發送到下游。這一步也有兩種模式。windows
traction模式中對於Dynamic Table的insert和delete的change分別產生insert或delete event。若是是update的change會產生兩種change event,對於以前發送出去的一樣key的record會產生delete event,對於當前的record是產生insert event。以下圖所示:app
update模式依賴Dynamic Table定義了key。全部的change event是一個kv對。key對應表的key在當前record中的值;對於insert和change value對應新的record。對於delete value是空表示該能夠已經被刪除。以下圖所示:dom
表的內容隨着時間改變意味着對錶的query結果也是隨着時間改變的。咱們定義:優化
舉個例子來理解動態表的概念:ui
因爲流是無限的,相對應 Dynamic Table 也是無界的。當查詢無限的表的時候咱們須要保證query的定時是良好的,有意義可行的。spa
1.在實踐中Flink將查詢轉換成持續的流式應用,執行的query僅針對當前的邏輯時間,因此不支持對於任意時間點的查詢(A[t])。
2.最直觀的原則是query可能的狀態和計算必須是有界的,因此能夠支持可增量計算的查詢:
Q(t+1) = q'(Q(t), c(T, t, t+1))
,其中Q(t)是query q的前一次查詢結果,c(T, t, t_+1) 是表T從t+1到t的變化, q'是q的增量版本。Q(t+1) = q''(c(T, t-x, t+1)) ∪ Q(t)
,q''是不須要時間t時q的結果增量版本query q。c(T, t-x, t+1)是表T尾部的x+1個數據,x取決於語義。例如最後一小時的window aggregation至少須要最後一小時的數據做爲狀態。其餘能支持的查詢類型還有:單獨在每一行上操做的SELECT WHERE;rowtime上的GROUP BY子句(好比基於時間的window aggregate);ORDER BY rowtime的OVER windows(row-windows);ORDER BY rowtime。如上文所說的,某些增量查詢須要保留一些數據(部分輸入數據或者中間結果)做爲狀態。爲了保證query不會失敗,保證查詢所須要的空間是有界的不隨着時間無限增加很重要。主要有兩個緣由使得狀態增加:
雖然第二種狀況可有經過下文提到的"Last Result Offset"參數解決,可是第一種狀況須要優化器檢測。咱們應該拒毫不受時間限制的中間狀態增加的查詢。優化器應該提供如何修復查詢且要求有適當的時間謂詞。好比下面這個查詢:
SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
GROUP BY user, page複製代碼
隨着用戶數和頁面數的增加,中間狀態會數據隨着時間推移而增加。對於存儲空間的要求能夠經過添加時間謂詞來限制:
SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour
GROUP BY user, page複製代碼
由於不是全部屬性都是不斷增加的, 所以能夠告訴優化器domain的size, 就能夠推斷中間狀態不會隨着時間推移而增加,而後接受沒有時間謂詞的查詢。
val sensorT: Table = sensors
.toTable('id, 'loc, 'stime, 'temp)
.attributeDomain('loc, Domain.constant) // domain of 'loc is not growing
env.registerTable("sensors", sensorT)
SELECT loc, AVG(temp) AS avgTemp
FROM sensors
GROUP BY loc複製代碼
一些關係運算符必須等數據到達才能計算最終結果。例如:在10:30關閉的窗口至少要等到10:30才能計算出最終的結果。Flink的logical clock(即 決定什麼時候纔是10:30)取決於使用event time 仍是 processing time。在processing time的狀況下,logical time是每一個機器的wallclock;在event time的狀況下,logical clock time是由源頭提供的watermark決定的。因爲數據的亂序和延遲當在event time模式下時等待一段時間來減少計算結果不完整性。另外一方面某些狀況下但願獲得不斷改進的早期結果。所以對於結果被計算、改進或者作出最終結果時有不一樣的要求、
下圖描繪了不一樣的配置參數如何用於控制早期結果和細化計算結果的。