【翻譯】Flink Table Api & SQL —Streaming 概念 —— 時態表

本文翻譯自官網: Temporal Tables https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/temporal_tables.htmlhtml

Flink Table Api & SQL 翻譯目錄sql

時態表(注:Temporal Table , 我翻譯爲時態表,能夠訪問表在不一樣時間的內容)表示一直在修改的表上的(參數化)視圖的概念,該視圖返回表在特定時間點的內容。數據庫

更改表能夠是跟蹤表的修改歷史(例如,數據庫更改日誌),也能夠是維表的具體修改(例如,數據庫表)。apache

對於錶的歷史修改,Flink能夠跟蹤修改,並容許在查詢中訪問表的特定時間點的內容。 在Flink中,這種表由Temporal Table Function表示app

對於變化的維表,Flink容許在查詢中的處理時訪問表的內容。在Flink中,這種表由Temporal Table 表示ide

設計初衷

與表的修改歷史相關

假設咱們有下表 RatesHistory函數

SELECT * FROM RatesHistory; rowtime currency rate ======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1
10:45   Euro        116
11:15   Euro        119
11:49   Pounds      108

RatesHistory表示一個不斷增加的關於日元的貨幣匯率的附加表(匯率爲1)。例如,匯率期間從 09:0010:45的歐元日元的匯率爲 114從 10:45 到 11:15 是 116this

假設咱們要在10:58的時間輸出全部當前匯率,則須要如下SQL查詢來計算結果表:spa

SELECT * FROM RatesHistory AS r WHERE r.rowtime = ( SELECT MAX(rowtime) FROM RatesHistory AS r2 WHERE r2.currency = r.currency AND r2.rowtime <= TIME '10:58');

子查詢肯定對應貨幣的最大時間小於或等於所需時間。外部查詢列出具備最大時間戳的匯率。 翻譯

下表顯示了這種計算的結果。 在咱們的示例中,考慮了10:45 時歐元的更新,可是 10:58 時表的版本中未考慮 11:15 時歐元的更新和新的英鎊輸入。

rowtime currency rate ======= ======== ======
09:00   US Dollar   102
09:00   Yen           1
10:45   Euro        116

時態表的概念旨在簡化此類查詢,加快其執行速度,並減小Flink的狀態使用率。時態表是 append-only 表上的參數化視圖,該視圖將 append-only 表的行解釋爲表的變動日誌,並在特定時間點提供該表的版本將 append-only 表解釋爲變動日誌須要指定主鍵屬性和時間戳屬性。主鍵肯定覆蓋哪些行,時間戳肯定行有效的時間。

在上面的示例中,currency RatesHistory的主鍵,而且rowtime是timestamp屬性。

在Flink中,這由時態表函數表示

與維表變化相關

另外一方面,某些用例須要鏈接變化的維表,該表是外部數據庫表。

假設 LatestRates 是一個以最新匯率實現的表(例如,存儲在其中)。LatestRates 是物化的 RatesHistory 歷史那麼時間 10:58 的 LatestRates 表的內容將是

10:58> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Yen 1 Euro 116

12:00 時 LatestRates的內容將是: 

12:00> SELECT * FROM LatestRates; currency rate ======== ====== US Dollar 102 Yen 1 Euro 119 Pounds 108

在Flink中,這由Temporal Table表示

時態表函數

爲了訪問時態表中的數據,必須傳遞一個時間屬性,該屬性肯定將要返回的表的版本。Flink使用表函數的SQL語法提供一種表達它的方法。

定義後,時態表函數將使用單個時間參數timeAttribute並返回一組行。該集合包含相對於給定時間屬性的全部現有主鍵的行的最新版本。

假設咱們Rates(timeAttribute)基於RatesHistory定義了一個時態表函數,咱們能夠經過如下方式查詢該函數: 

SELECT * FROM Rates('10:15'); rowtime currency rate ======= ======== ======
09:00   US Dollar   102
09:00   Euro        114
09:00   Yen           1 SELECT * FROM Rates('11:00'); rowtime currency rate ======= ======== ======
09:00   US Dollar   102
10:45   Euro        116
09:00   Yen           1

對Rates(timeAttribute)的每一個查詢都將返回給定timeAttribute的Rates狀態。

注意:當前 Flink 不支持使用常量時間屬性參數直接查詢時態表函數。目前,時態表函數只能在 join 中使用。上面的示例用於提供有關函數 Rates(timeAttribute)返回內容的直觀信息 

另請參閱有關用於持續查詢的 join 的頁面,以獲取有關如何與時態表 join 的更多信息。 

定義時態表函數

如下代碼段說明了如何從 append-only 表中建立時態表函數。

// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env) // Provide a static data set of the rates history table.
val ratesHistoryData = new mutable.MutableList[(String, Long)] ratesHistoryData.+=(("US Dollar", 102L)) ratesHistoryData.+=(("Euro", 114L)) ratesHistoryData.+=(("Yen", 1L)) ratesHistoryData.+=(("Euro", 116L)) ratesHistoryData.+=(("Euro", 119L)) // Create and register an example table using above data set. // In the real setup, you should replace this with your own table.
val ratesHistory = env .fromCollection(ratesHistoryData) .toTable(tEnv, 'r_currency, 'r_rate, 'r_proctime.proctime)
 tEnv.registerTable("RatesHistory", ratesHistory) // Create and register TemporalTableFunction. // Define "r_proctime" as the time attribute and "r_currency" as the primary key.
val rates = ratesHistory.createTemporalTableFunction('r_proctime, 'r_currency) // <==== (1)
tEnv.registerFunction("Rates", rates)                                          // <==== (2)

Line (1)建立了一個 時態表函數 rates ,使咱們能夠在 Table API 中使用 rates 函數  

Line (2) 在表環境中以Rates名稱註冊此函數,這使咱們能夠在SQL中使用Rates函數

時態表

注意:僅 Blink planner 支持此功能。

爲了訪問時態表中的數據,當前必須使用LookupableTableSource定義一個TableSource。 Flink 使用FOR SYSTEM_TIME AS OF 的SQL語法查詢時態表,這在SQL:2011中提出。

假設咱們定義了一個時態表 LatestRates,咱們能夠經過如下方式查詢此類表: 

SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '10:15'; currency rate ======== ====== US Dollar 102 Euro 114 Yen 1 SELECT * FROM LatestRates FOR SYSTEM_TIME AS OF TIME '11:00'; currency rate ======== ====== US Dollar 102 Euro 116 Yen 1

注意:當前,Flink不支持以固定時間直接查詢時態表。目前,時態表只能在 join 中使用。上面的示例用於提供有關時態表LatestRates返回內容的直覺

另請參閱有關用於持續查詢的 join 的頁面,獲取有關如何與時態表 join 更多信息。

定義時態表

// Get the stream and table environments.
val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = TableEnvironment.getTableEnvironment(env) // Create an HBaseTableSource as a temporal table which implements LookableTableSource // In the real setup, you should replace this with your own table.
val rates = new HBaseTableSource(conf, "Rates") rates.setRowKey("currency", String.class)   // currency as the primary key
rates.addColumn("fam1", "rate", Double.class) // register the temporal table into environment, then we can query it in sql
tEnv.registerTableSource("Rates", rates)

另請參閱有關如何定義LookupableTableSource的頁面 

 

歡迎關注Flink菜鳥公衆號,會不按期更新Flink(開發技術)相關的推文

相關文章
相關標籤/搜索