Flink Table/SQL API 規劃 —— Dynamic Table

動態表的概念是社區很早就提出的但並無所有實現,下文中全部介紹都是基於已有規劃和proposal給出的,可能與以後實現存在出入僅供參考複製代碼

概念

動態表直觀上看是一個相似於數據庫中的Materialized View概念。動態表隨着時間改變;相似靜態的batch table同樣能夠用標準SQL進行查詢而後一個新的動態表;能夠和流無損地互相轉換(對偶的)。對現有的API最大的改進關鍵在表的內容隨着時間改變,而如今的狀態只是append。當前的streaming table能夠認爲是一種動態表,append模式的動態表。javascript

流到 Dynamic Table

流被轉換成Table時決定選擇哪一種模式是依據表的schema是否認義primary key。java

Append模式:

若是表的schema沒有包括key的定義那轉換成表時採用append模式。把流中每條新來的record當作新的row append到表中。一旦數據加到表中就不能再被更新和刪除(指當前表中,不考慮轉換成新表)。sql

Replace模式:

相對應,若是定義了key,那麼對於流中的每條記錄若是key不在表中就insert不然就update。數據庫

Dynamic Table 到 流

表到流的操做是把表的全部change以changelog stream的方式發送到下游。這一步也有兩種模式。windows

Retraction模式:

traction模式中對於Dynamic Table的insert和delete的change分別產生insert或delete event。若是是update的change會產生兩種change event,對於以前發送出去的一樣key的record會產生delete event,對於當前的record是產生insert event。以下圖所示:app

Update模式:

update模式依賴Dynamic Table定義了key。全部的change event是一個kv對。key對應表的key在當前record中的值;對於insert和change value對應新的record。對於delete value是空表示該能夠已經被刪除。以下圖所示:dom

example

表的內容隨着時間改變意味着對錶的query結果也是隨着時間改變的。咱們定義:優化

  • A[t]: 時間t時的表A
  • q(A[t]):時間t時對錶A執行query q

舉個例子來理解動態表的概念:ui

query的限制

因爲流是無限的,相對應 Dynamic Table 也是無界的。當查詢無限的表的時候咱們須要保證query的定時是良好的,有意義可行的。spa

1.在實踐中Flink將查詢轉換成持續的流式應用,執行的query僅針對當前的邏輯時間,因此不支持對於任意時間點的查詢(A[t])。
2.最直觀的原則是query可能的狀態和計算必須是有界的,因此能夠支持可增量計算的查詢:

  • 不斷更新當前結果的查詢:查詢能夠產生insert,update和delete更改。查詢能夠表示爲 Q(t+1) = q'(Q(t), c(T, t, t+1)),其中Q(t)是query q的前一次查詢結果,c(T, t, t_+1) 是表T從t+1到t的變化, q'是q的增量版本。
  • 產生append-only的表,能夠從輸入表的尾端直接計算出新數據。查詢能夠表示爲 Q(t+1) = q''(c(T, t-x, t+1)) ∪ Q(t),q''是不須要時間t時q的結果增量版本query q。c(T, t-x, t+1)是表T尾部的x+1個數據,x取決於語義。例如最後一小時的window aggregation至少須要最後一小時的數據做爲狀態。其餘能支持的查詢類型還有:單獨在每一行上操做的SELECT WHERE;rowtime上的GROUP BY子句(好比基於時間的window aggregate);ORDER BY rowtime的OVER windows(row-windows);ORDER BY rowtime。
    3.當輸入表足夠小時,對錶的每條數據進行訪問。好比對兩個大小固定的流表(好比key的個數固定)進行join。

中間狀態有界

如上文所說的,某些增量查詢須要保留一些數據(部分輸入數據或者中間結果)做爲狀態。爲了保證query不會失敗,保證查詢所須要的空間是有界的不隨着時間無限增加很重要。主要有兩個緣由使得狀態增加:

  1. 不受時間謂詞約束的中間計算狀態的增加(好比 聚合key的膨脹)
  2. 時間有界可是須要遲到的數據(好比 window 的聚合)

雖然第二種狀況可有經過下文提到的"Last Result Offset"參數解決,可是第一種狀況須要優化器檢測。咱們應該拒毫不受時間限制的中間狀態增加的查詢。優化器應該提供如何修復查詢且要求有適當的時間謂詞。好比下面這個查詢:

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
GROUP BY user, page複製代碼

隨着用戶數和頁面數的增加,中間狀態會數據隨着時間推移而增加。對於存儲空間的要求能夠經過添加時間謂詞來限制:

SELECT user, page, COUNT(page) AS pCnt
FROM pageviews
WHERE rowtime BETWEEN now() - INTERVAL '1' HOUR AND now() // only last hour
GROUP BY user, page複製代碼

由於不是全部屬性都是不斷增加的, 所以能夠告訴優化器domain的size, 就能夠推斷中間狀態不會隨着時間推移而增加,而後接受沒有時間謂詞的查詢。

val sensorT: Table = sensors
  .toTable('id, 'loc, 'stime, 'temp)
  .attributeDomain('loc, Domain.constant) // domain of 'loc is not growing 
env.registerTable("sensors", sensorT)

SELECT loc, AVG(temp) AS avgTemp
FROM sensors
GROUP BY loc複製代碼

結果的計算和細化時序

一些關係運算符必須等數據到達才能計算最終結果。例如:在10:30關閉的窗口至少要等到10:30才能計算出最終的結果。Flink的logical clock(即 決定什麼時候纔是10:30)取決於使用event time 仍是 processing time。在processing time的狀況下,logical time是每一個機器的wallclock;在event time的狀況下,logical clock time是由源頭提供的watermark決定的。因爲數據的亂序和延遲當在event time模式下時等待一段時間來減少計算結果不完整性。另外一方面某些狀況下但願獲得不斷改進的早期結果。所以對於結果被計算、改進或者作出最終結果時有不一樣的要求、

下圖描繪了不一樣的配置參數如何用於控制早期結果和細化計算結果的。

  • "First Result Offset" 指第一個早期結果被計算的結果的時間。時間是相對於第一次能夠計算完整結果的時間(好比相對於window的結束時間10:30)。若是設置的是-10分鐘,對於結束時間是10:30的window那麼第一個被髮出去的結果是在邏輯時間10:20計算的。這個參數的默認值是0,即在window結束的時候才計算結果。
  • "Complete Result Offset" 指完整的結果被計算的時間。時間是相對於第一次能夠計算完整的時間。若是設置的是+5分鐘,對於結束時間是10:30的window那麼產生完整結果的時間是10:35。這個參數能夠減輕延遲數據形成的影響。默認是0,即在window結束的時候計算的結果就是完整結果。
  • "Update Rate" 指計算完整結果以前一次次更新結果的時間間隔(能夠是時間和次數)。若是設爲5分鐘,窗口大小是30分鐘的tumbling window,開始時間是10:300,"First Result Offset"是-15分鐘, "Complete Result Offset"是2分鐘,那麼將在10:20, 10:25, 10:30更新結果,10:15禪城寄一個結果,10:32產生完整結果。
  • "Last Updates Switch" 指完整結果發出後對於延遲的數據是否計算延遲更新,直到計算狀態被清除。
  • "Last Result Offset" 指可計算的最後一個結果的時間。這是內部狀態被清除的時間,清除狀態後再到達的數據將被丟棄。Last Result Offset 意味着計算的結果是近似值,不能保證精確。
相關文章
相關標籤/搜索