【Flink】流-表概念

流概念

Data Streams上的關係查詢

關係型SQLstream processing對好比下。html

SQL Stream Processing
有限元組 無限元組
完整數據集上的查詢 沒法基於全部數據查詢
查詢會結束 查詢不會結束

Materialized View被定義爲一條SQL查詢,其會緩存查詢結果。但當所查詢的表(基表)被修改時,緩存的結果將過時。
Eager View Maintenance會更新Materialized View,當基表被更新時,會馬上更新Materialized View中緩存的結果。java

Eager View MaintenanceSQL Querystreams上的關係以下。數據庫

  • 數據庫表是INSERT、UPDATE、DELETEDML語句流的結果,被流稱爲changelog stream
  • Materialized View被定義爲一條SQL查詢。爲更新View,查詢須要不斷處理changelog stream
  • Materialized Viewstreaming SQL查詢結果。

動態表 & 連續查詢

動態表是Flink流上Table Api & SQL的核心概念,其隨時間動態變化;apache

  • 查詢動態表會產生一個連續查詢;
  • 連續查詢永不中止,其會產生一個動態表;
  • 當所查詢的動態表發生變化時,查詢會更新結果動態表。

連續查詢的結果等同在輸入表的快照上以批處理模式執行相同查詢的結果。緩存

流、動態表、連續查詢的關係以下圖所示。session

  • stream會被轉化爲動態表。
  • 在動態表上進行連續查詢,產生新的動態表。
  • 動態表會被轉化爲stream

動態表是一個邏輯概念。 在查詢執行期間動態表不必定(徹底)materializedapp

爲理解動態表和連續查詢的概念,假設點擊事件流有以下模式。ide

[
user: VARCHAR, // the name of the user
cTime: TIMESTAMP, // the time when the URL was accessed
url: VARCHAR // the URL that was accessed by the user
]this

流上定義表

爲在流上使用關係查詢,流須要被轉化爲表。流的每一個記錄被解釋爲結果表(動態表)上的INSERT修改,咱們從一個只有INSERTchangelog流中構建表。以下圖所示,點擊事件流被轉化爲表,表會隨着點擊事件記錄的插入而不斷增加。編碼

連續查詢

連續查詢做用於動態表並又會產生動態表;連續查詢不會終止並會根據其輸入表(動態表)上的更新來更新其結果表(動態表)。
下面顯示在點擊事件流上定義的clicks表上顯示兩個查詢示例。

首先是GROUP-BY COUNT聚合查詢示例。

當查詢開始時,clicks表爲空;當第一行插入到clicks表中時,查詢開始計算結果表(動態表),如[Mary, ./home]插入後,結果表包含一行結果[Mary, 1];當插入第二行[Bob, ./cart]時,查詢會更新結果表並插入新記錄[Bob, 1]。第三行[Mary, ./prod=id=1]插入時,查詢會更新結果表中的[Mary, 1]記錄,將其更新爲[Mary, 2]。最後一行[Liz, 1]插入clicks表後,也會更新到結果表(插入新記錄)。

第二個查詢與第一個查詢相似,除了用戶屬性以外,還在小時滾動窗口上對clicks表進行分組,而後對URL進行計數(基於時間的計算,如窗口基於特殊的時間屬性)。

每一個小時查詢會計算結果並更新結果表。在cTime12:00:00 - 12:59:59之間,clicks表存在四條記錄,對應的查詢計算出兩條結果;下個時間窗口(13:00:00 - 13:59:59),clicks表中存在三條記錄,對應的查詢計算出兩條結果添加值結果表中;當記錄插入至clicks表中後,結果表也會被動態更新。

更新和附加查詢

上述兩個查詢雖然有些相似(均計算統計聚合分組),但二者也有顯著不一樣:第一個查詢會更新結果表的結果,如定義在結果表上的changelog流包含INSERTUPDATE;第二個查詢僅僅往結果表中添加記錄,如定義在結果表上的changelog流只包含INSERT。一個查詢是否生成僅插入表(INSERT)或更新表(UPDATE)有一些含義:生成更新表的查詢必需要維護更多狀態,將僅插入表轉化爲流與將更新表轉化爲流不一樣。

查詢限制

不少查詢能夠等同在流上的連續查詢,一些查詢因爲需維護狀態的大小或計算更新代價大致使查詢計算代價太大。

  • 狀態大小:無界限流上的連續查詢常常會運行數週或數月。所以,連續查詢處理的數據總量能夠很大,須要之前結果(結果表)的連續查詢須要維護全部行以便進行更新。例如,第一個查詢示例中須要保存每一個userurlcount以即可以增長count,使得當輸入表(左側表)接收一行新數據時會產生新的結果(右側表)。若只跟蹤註冊用戶,那麼維護cnt大小代價不會太大(註冊用戶量不太大)。但若非註冊用戶也分配惟一的用戶名,則隨着時間的增長,維護cnt大小代價將增大,最終致使查詢失敗。

SELECT user, COUNT(url)
FROM clicks
GROUP BY user;

  • 計算更新:即便只添加或更新單行記錄,一些查詢須要從新計算和更新大部分結果行,一般這樣的查詢不適合做爲連續查詢。以下查詢示例中,會根據最後一次點擊的時間爲每一個用戶計算RANK。一旦clicks表收到新行,用戶的lastAction被更新而且應該計算新的RANK。然而因爲不存在兩行相同RANK,因此全部較低RANK的行也須要被更新。

SELECT user, RANK() OVER (ORDER BY lastLogin)
FROM (
SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
);

表到流的轉化

動態表可像傳統表同樣被INSERT、UPDATE、DELETE修改。可能只有一行的表被持續更新;或者是沒有UPDATE、DELETE更改的只插入表。當將動態錶轉化爲流或將其寫入外部系統,這些更改(修改)須要被編碼,FlinkTable API & SQL支持三種方式編碼動態表上的更改(修改)。

  • Append-only流:僅使用INSERT更改進行修改的動態表可經過發出插入的行來轉化爲流。
  • Retract流:Retract流包含兩種類型消息(add消息和retract消息),經過將動態表的INSERT更改做爲add消息、將DELETE更改做爲retract消息、將UPDATE更改分解爲舊記錄的retract消息和新記錄的add消息。下圖展現了從動態錶轉化爲retract流

  • Upsert流:Upsert流包含兩種類型消息(upset消息和delete消息),動態錶轉化爲upsert流須要有主鍵(可複合),具備主鍵的動態表經過將INSERT、UPDATE更改編碼爲upset消息,將DELETE更改編碼爲delete消息upset流retract流主要區別是UPDATE更改使用單一消息(主鍵)進行編碼,所以效率更高。下圖展現了將動態表轉化爲upset流

時間屬性

  • Processing time(處理時間):表示事件被處理的系統時間。
  • Event time(事件時間):表示事件發生時的時間。
  • Ingestion time(攝入時間):表示事件進入流處理系統的時間(在內部其與Event time類型)。

上述時間能夠在代碼中指明時間特性。

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);

Table API & SQL中基於時間的操做(如窗口)須要設置時間概念和及其來源信息。所以,tables能夠提供邏輯時間屬性來指示時間並在table程序中訪問相應時間戳。時間屬性能夠是表模式的一部分(從DataStream中建立表時被定義),或在使用TableSource時被預約義,一旦時間屬性被定義,那麼其能夠做爲一個字段被引用或進行基於時間的操做。只要時間屬性沒有被修改,只是從查詢的一部分轉發到另外一部分,那麼它仍然是一個有效的時間屬性。時間屬性與常規時間戳相同,可被訪問並計算。若是在計算中使用時間屬性,那麼其將被具象化爲常規時間戳,常規時間戳不兼容Flink的時間和水位系統,所以不能再用於基於時間的操做。

處理時間

processing time容許表程序基於本地機器的時間輸出結果,它不須要提取時間戳和生成水位,有多種方式定義processing time屬性。

流轉化爲表過程

processing time屬性在模式定義時使用.proctime屬性定義,時間屬性只能經過額外的邏輯字段擴展物理模式,所以,其可被定義在模式定義的末尾,具體以下。

DataStream<Tuple2<String, String>> stream = ...;

// declare an additional logical field as a processing time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

使用TableSource

processing time屬性可經過實現DefinedProctimeAttribute接口定義,邏輯時間屬性被附加到由TableSource的返回類型定義的物理模式上。

// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"Username" , "Data"};
        TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // create stream 
        DataStream<Row> stream = ...;
        return stream;
    }

    @Override
    public String getProctimeAttribute() {
        // field with this name will be appended as a third field 
        return "UserActionTime";
    }
}

// register table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

事件時間

Event time容許表程序根據每條記錄中包含的時間輸出結果,這樣即便在無序事件或晚到事件狀況下保持一致結果,當從持久化存儲中讀取記錄時還保證可重放結果。此外,event time容許批和流環境中的表程序使用統一的語法,流環境中的時間屬性能夠是批環境中的記錄的字段。爲處理亂序事件,並區分流中準時和晚到事件,Flink須要從事件中提取時間戳信息,並在時間戳上進行處理(水位)。event time屬性可被定義在流到表的轉化中或者使用TableSourceTable API & SQL假設在上述兩種狀況下,都在DataStream API中生成時間戳和水位。

流轉化爲表的過程當中

event time屬性在模式定義時經過.rowtime屬性定義;時間戳和水位必須在轉換的DataStream中已被分配;將DataStream轉化爲Table時有以下兩種定義時間屬性的方式。

  • 經過附加邏輯字段擴展物理表模式。
  • 用邏輯字段替換物理字段(如提取時間戳後再也不須要)。
// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");

// Usage:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

使用TableSource

event time屬性可經過實現DefinedRowtimeAttribute接口定義,邏輯時間屬性被附加到由TableSource的返回類型定義的物理模式上。時間戳和水位必定要在getDataStream方法返回的流中被分配。

// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttribute {

    @Override
    public TypeInformation<Row> getReturnType() {
        String[] names = new String[] {"Username" , "Data"};
        TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
        return Types.ROW(names, types);
    }

    @Override
    public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
        // create stream 
        // ...
        // extract timestamp and assign watermarks based on knowledge of the stream
        DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
        return stream;
    }

    @Override
    public String getRowtimeAttribute() {
        // field with this name will be appended as a third field 
        return "UserActionTime";
    }
}

// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
    .scan("UserActions")
    .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));

查詢配置

無論輸入是有界批量輸入仍是無界流輸入,Table API & SQL查詢都有相同的語義。在不少狀況下,流上的連續查詢與離線計算具備相同準確的結果。然而,在實際狀況下連續查詢必需要限制其所維護狀態的大小以免使用完存儲空間,並可以在長時間處理無限流數據。所以,連續查詢可能只能根據輸入數據的特徵和查詢自己提供近似準確的結果。

Flink Table API & SQL接口提供參數調整連續查詢的準確性和資源消耗。參數經過QueryConfig對象定義,QueryConfig對象可經過TableEnvironment獲取並在翻譯表時被傳回。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// emit result Table via a TableSink
result.writeToSink(sink, qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);

下面描述了QueryConfig的參數如何影響查詢的準確性和資源消耗的。

空閒狀態保留時間

不少查詢在一個或多個關鍵屬性上聚合或鏈接記錄(如典型的聚合查詢),當在流上執行該查詢時,連續查詢須要維護記錄或保持每一個鍵的部分結果。若涉及到流的關鍵域(活動鍵值隨時間會變化),隨着不一樣鍵被觀察,連續查詢會積累愈來愈多的狀態。然而,在一段時間後鍵將變得不活動時,它們的對應狀態將變得過時和無效。以下查詢示例中計算每一個sessionclicks數量。

SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;

sessionId被做爲分組鍵,連續查詢會爲每一個sessionId維護clicks數量。sessionId屬性隨着時間推移而變化,sessionId值僅在session結束前處於活動狀態(保持一段時間)。然而,因爲不清楚sessionId屬性,連續查詢指望每一個sessionId值在任什麼時候間都有效,即會維護全部sessionId的值。這樣會致使隨着時間的推移,所維護的sessionId愈來愈多。

空閒狀態保留時間參數定義鍵的狀態不被更新,在刪除以前保留多長時間。在上述查詢中,sessionId的計數在指定的配置時間內未被更新時將被移除。當鍵會移除後再次被添加,那麼鍵將會被當成新的鍵(如上述示例中又會開始計0)。有兩個參數配置空閒狀態保留時間最小空閒狀態保留時間最大空閒狀態保留時間

StreamQueryConfig qConfig = ...

// set idle state retention time: min = 12 hour, max = 16 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(16));
// set idle state retention time. min = max = 12 hours
qConfig.withIdleStateRetentionTime(Time.hours(12);

配置不一樣的最小和最大空閒狀態保留時間的效率更高,由於它減小了查詢內部簿記什麼時候刪除狀態的次數。

參考連接

https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/table/streaming.html

相關文章
相關標籤/搜索