本文翻譯自官網: Temporal Tables https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.htmlhtml
時態表(注:Temporal Table , 我翻譯爲時態表,能夠訪問表在不一樣時間的內容)表示一直在修改的表上的(參數化)視圖的概念,該視圖返回表在特定時間點的內容。數據庫
更改表能夠是跟蹤表的修改歷史(例如,數據庫更改日誌),也能夠是維表的具體修改(例如,數據庫表)。apache
對於錶的歷史修改,Flink能夠跟蹤修改,並容許在查詢中訪問表的特定時間點的內容。 在Flink中,這種表由Temporal Table Function表示。app
對於變化的維表,Flink容許在查詢中的處理時訪問表的內容。在Flink中,這種表由Temporal Table 表示。ide
假設咱們有下表 RatesHistory
。函數
SELECT * FROM RatesHistory; rowtime currency rate ======= ======== ======
09:00 US Dollar 102
09:00 Euro 114
09:00 Yen 1
10:45 Euro 116
11:15 Euro 119
11:49 Pounds 108
RatesHistory
表示一個不斷增加的關於日元的貨幣匯率的附加表(匯率爲
1
)。例如,匯率期間從 09:00
到10:45
的歐元到
日元
的匯率爲 114
。從 10:45
到 11:15
是 116
。this
假設咱們要在10:58的時間輸出全部當前匯率,則須要如下SQL查詢來計算結果表:spa
SELECT * FROM RatesHistory AS r WHERE r.rowtime = ( SELECT MAX(rowtime) FROM RatesHistory AS r2 WHERE r2.currency = r.currency AND r2.rowtime <= TIME '10:58');
子查詢肯定對應貨幣的最大時間小於或等於所需時間。外部查詢列出具備最大時間戳的匯率。 翻譯
下表顯示了這種計算的結果。 在咱們的示例中,考慮了10:45 時歐元的更新,可是 10:58 時表的版本中未考慮 11:15 時歐元的更新和新的英鎊輸入。
rowtime currency rate ======= ======== ======
09:00 US Dollar 102
09:00 Yen 1
10:45 Euro 116
時態表的概念旨在簡化此類查詢,加快其執行速度,並減小Flink的狀態使用率。時態表是 append-only 表上的參數化視圖,該視圖將 append-only 表的行解釋爲表的變動日誌,並在特定時間點提供該表的版本。將 append-only 表解釋爲變動日誌須要指定主鍵屬性和時間戳屬性。主鍵肯定覆蓋哪些行,時間戳肯定行有效的時間。
在上面的示例中,currency
是RatesHistory
表的主鍵,而且rowtime
是timestamp屬性。
在Flink中,這由時態表函數表示。
另外一方面,某些用例須要鏈接變化的維表,該表是外部數據庫表。
假設 LatestRates
是一個以最新匯率實現的表(例如,存儲在其中)。LatestRates
是物化的 RatesHistory 歷史。那麼時間 10:58 的 LatestRates 表的內容將是:
10:58> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Yen 1 Euro 116
12:00 時 LatestRates
表的內容將是:
12:00> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Yen 1 Euro 119 Pounds 108
在Flink中,這由Temporal Table表示。
爲了訪問時態表中的數據,必須傳遞一個時間屬性,該屬性肯定將要返回的表的版本。Flink使用表函數的SQL語法提供一種表達它的方法。
定義後,時態表函數將使用單個時間參數timeAttribute
並返回一組行。該集合包含相對於給定時間屬性的全部現有主鍵的行的最新版本。
假設咱們Rates(timeAttribute)
基於RatesHistory
表定義了一個時態表函數,咱們能夠經過如下方式查詢該函數:
SELECT * FROM Rates('10:15'); rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 09:00 Euro 114 09:00 Yen 1 SELECT * FROM Rates('11:00'); rowtime currency rate ======= ======== ====== 09:00 US Dollar 102 10:45 Euro 116 09:00 Yen 1
對Rates(timeAttribute)的每一個查詢都將返回給定timeAttribute的Rates狀態。
注意:當前 Flink 不支持使用常量時間屬性參數直接查詢時態表函數。目前,時態表函數只能在 join 中使用。上面的示例用於提供有關函數 Rates(timeAttribute)
返回內容的直觀信息。
另請參閱有關用於持續查詢的 join 的頁面,以獲取有關如何與時態表 join 的更多信息。
如下代碼段說明了如何從 append-only 表中建立時態表函數。
// Get the stream and table environments. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) // Provide a static data set of the rates history table. val ratesHistoryData = new mutable.MutableList[(String, Long)] ratesHistoryData.+=(("US Dollar", 102L)) ratesHistoryData.+=(("Euro", 114L)) ratesHistoryData.+=(("Yen", 1L)) ratesHistoryData.+=(("Euro", 116L)) ratesHistoryData.+=(("Euro", 119L)) // Create and register an example table using above data set. // In the real setup, you should replace this with your own table. val ratesHistory = env .fromCollection(ratesHistoryData) .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime) tEnv.registerTable("RatesHistory", ratesHistory) // Create and register TemporalTableFunction. // Define "r_proctime" as the time attribute and "r_currency" as the primary key. val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1) tEnv.registerFunction("Rates", rates) // <==== (2)
Line (1)
建立了一個 時態表函數 rates
,使咱們能夠在 Table API 中使用 rates 函數 。
Line (2) 在表環境中以Rates名稱註冊此函數,這使咱們能夠在SQL中使用Rates函數。
注意:僅 Blink planner 支持此功能。
爲了訪問時態表中的數據,當前必須使用LookupableTableSource定義一個TableSource。 Flink 使用FOR SYSTEM_TIME AS OF 的SQL語法查詢時態表,這在SQL:2011中提出。
假設咱們定義了一個時態表 LatestRates
,咱們能夠經過如下方式查詢此類表:
SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15'; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00'; currency rate ======== ====== US Dollar 102 Euro 116 Yen 1
注意:當前,Flink不支持以固定時間直接查詢時態表。目前,時態表只能在 join 中使用。上面的示例用於提供有關時態表LatestRates
返回內容的直覺。
另請參閱有關用於持續查詢的 join 的頁面,以獲取有關如何與時態表 join 的更多信息。
// Get the stream and table environments. val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // Create an HBaseTableSource as a temporal table which implements LookableTableSource // In the real setup, you should replace this with your own table. val rates = new HBaseTableSource(conf, "Rates") rates.setRowKey("currency", String.class) // currency as the primary key rates.addColumn("fam1", "rate", Double.class) // register the temporal table into environment, then we can query it in sql tEnv.registerTableSource("Rates", rates)
另請參閱有關如何定義LookupableTableSource的頁面。
歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文