Flink原理(七)——動態表(Dynamic tables)

前言

  本文是結合Flink官網,我的理解所得,如果有誤歡迎留言指出,謝謝!文中圖皆來自官網(連接[1])。html

  本文將隨着下面這個問題展開,針對該問題更爲生動的解釋能夠參見金竹老師的分享(連接[2])。數據庫

  SQL適合流計算場景嗎?apache

  對於流計算,每一條數據的到來都會觸發一次查詢產生一個結果,併發射出去。咱們發現對於相同的數據源,使用相同的SQL查詢時,批、流的結果是相同的,即在不一樣模式下,SQL的語意是一致的(One Query One Result),最終的結果是一致。緩存

一、動態表與連續查詢(Dynamic Table&Continuous Query)

  和動態表對應的是靜態表——常規的數據庫中的表或批處理中的表等,其在查詢時數據再也不變化。動態表是隨時間變化的,即便是在查詢的時候。怎麼理解了?流上的數據是源源不斷的,一條數據的到來會觸發一次查詢,此次查詢在執行時還有下一條數據到來,對錶自己數據是在變化的。併發

  對動態表的查詢是連續的,即連續查詢(Continuous Query)。實質上, 動態表上的連續查詢與定義物理視圖(Materialized View)的查詢很類似。物理視圖定義爲SQL查詢,就像常規的虛擬視圖同樣,不一樣的是物理視圖會緩存查詢結果,這樣在訪問時不須要從新計算,而緩存帶來的挑戰是有可能提供過期的結果,Eager View Maintenance則是用於及時跟新物理視圖的技術,這裏就不展開了。app

  流、動態表、連續查詢三者的關係以下圖所示:編碼

  用一句話歸納是:流被轉換爲動態表,對動態表的連續查詢生成新的動態表(結果表),而後結果表被轉換爲流。url

二、流上定義表

  2.1 定義表

  爲了在流上使用關係型查詢,須要將流轉換成表。下面的分析過程均採用官網(Ref[2])中的例子進行說明。spa

  1)點擊事件流的schema以下:3d

1 [
2   user:  VARCHAR,   // the name of the user
3   cTime: TIMESTAMP, // the time when the URL was accessed
4   url:   VARCHAR    // the URL that was accessed by the user
5 ]    

  2)從概念上來講,流上的天天記錄都是動態表進行INSERT修改。從本質上講,是從一個INSERT-Only(僅插入)的ChangeLog流上構建一個表。點擊事件流上構建表以下圖所示,且隨着更多點擊流記錄的插入,生成的表不斷增加:

  Note:定義在流上的表在內部是沒有實現的。

  2.2 連續查詢

  連續查詢不會停止,會根據輸入表來更新結果表,下面介紹兩種查詢的例子。

  1)簡單的GROUP-BY Count聚合查詢

  下圖中,左邊是輸入表click,是隨着時間updata增長的,右邊是查詢的結果表。開始clicks表中只有一條數據[Mary, ./home]時其結果表是表-1,當clicks表中新增一條數據[Bob, ./cart]時,其結果表是表-2,依次下推。每一條新數據的到來會對以前錶行進update或INSERT操做,SQL語句就會根據現有數據更新的結果表。

  2)帶有窗口(window)的聚合查詢

  窗口的時間間隔是1個小時,窗口-1對應的時表-1,窗口-2對應的時表-2,依次類推。和第一種查詢不同的是,每一張時表只是統計對應窗口的數據,以前窗口的數據對其沒有影響,對不一樣窗口的查詢結果是以追加的形式寫入result表中的。

   2.3 Update和Append查詢

  2.2中的兩種例子分別對應的兩種查詢方式,

  1)例子1對應着Update查詢,這種方式須要更新以前已經發出的結果,包括INSERT和UPDATE兩種改變。改變以前已經發出的結果意味着,這種查詢須要維護更多的狀態(state)數據;

  2)例子2對應着Append查詢,這種方式查詢的結果都是以追加的形式加入到result表中,僅包含INSERT操做。這種方式生成的表和update生成的錶轉換成流的方式不同(見下文)。

  2.4 Restrictions查詢

  對於有些SQL查詢會因須要保留的state多大或從新計算已發出的記錄用來更新的代價太大而得不償失。

  1)state size:例以下面的SQL,在連續查詢中,當一條新的消息到來時,爲了更新以前已發出的結果(聯想2.2中例1),須要保存以前的計算結果即state。當時當連續查詢持續很長時間時,須要保存的state的容量會很大,且隨着時間的遞增會愈來愈大,更糟糕的是若不斷有新用戶(分配不一樣的username)加入,其要保存的count會隨着時間更加恐怖,最後有可能致使任務失敗。

1 SELECT user, COUNT(url) FROM clicks GROUP BY user;

  2)computing updates:例以下面的SQL,當clicks表新增一條記錄,爲計算rank,須要對以前全部的從新計算和更新已發出結果的中很大一部分,一條記錄的的增長,有可能致使不少user的rank變化。

1 SELECT user, RANK() OVER (ORDER BY lastLogin)
2 FROM (
3   SELECT user, MAX(cTime) AS lastAction FROM clicks GROUP BY user
4 );

   3)查詢配置(連接[3])

  在常見的場景中,對長期的運行的job作連續查詢,爲了防止保存的state過大超出存儲而任務失敗,可能會對state的大小作必定限制即刪除state。但這種方式可能引起另外一個問題——查詢出來的結果可能不許確。Flink Table API和SQL中提供查詢參數試圖在準確性和資源消耗中找到一個平衡點。

   Idle State Retention Time含義是state的key在被刪除以前多長時間沒有被更新,即沒有被更新state的保存時間。使用方式以下:

1 StreamQueryConfig qConfig = ...
2 
3 // set idle state retention time: min = 12 hours, max = 24 hours
4 qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

  三、Table到流的轉換

  能夠經過INSERT、UPDATE、DELETE像修改常規表同樣去改變更態表。將動態錶轉換爲流或將其寫入外部系統時,須要對這些更改進行encode。 Flink的Table API和SQL支持三種encode改變更態表的方法:

  1)Append-only Stream(僅追加流):僅經過INSERT操做獲得的動態表能夠發射插入行來轉換爲流(聯想2.2中例2),這種方式轉換的流中數據都是片斷性的,一個片斷表明一個窗口;

  2)Retract Stream(回溯流):restract stream有兩種消息:添加(add)消息和回溯(retract)消息。將動態錶轉換爲回溯(retract)流,經過將INSERT更改encode爲添加消息,將DELETE更改encode爲回溯消息,將UPDATE更改endcode爲更新(上一個)行的回溯消息以及添加消息更新新的行 。 下圖顯示了動態表到回溯流的轉換。

  流上每條消息都有一個標識位,其中+標識INSERT操做,-標識DELETE操做。在clicks表中第1、二行消息[Mary, ./home]和[Bob, ./cat]被轉換爲流中1第、2條消息,當clicks表中第三行[Mary, ./prod?id=1]轉換時,會先將已發出的第1條信息標記爲DELETE告訴下游,而後第4條消息從新插入user爲Mary的消息,依次類推,這樣能夠保證輸出結果的正確性。

  3)Upsert Stream(上插流):Upsert流包括upsert消息和刪除消息。 動態表要轉換爲upsert流須要(多是複合的)惟一鍵。 經過將INSERT和UPDATE 操做encode爲upsert消息,並將DELETE更改encode爲刪除消息,能夠是具備惟一鍵的動態錶轉換爲流。 流運算須要知道惟一鍵屬性才能正確應用消息。 與回溯流的主要區別在於UPDATE使用單個消息((主鍵))進行編碼,所以更有效。

  (我的理解待驗證)Upsert流和Retract流的區別在於數據存在第三方系統中時,前者可能存在重複數據,後者沒有。

 

   NOTE:在將動態表(dynamic table)轉換爲數據流(Data Stream)時,僅支持append和retract兩種方式

  

Ref:

  [1]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/dynamic_tables.html#table-to-stream-conversion

  [2]http://www.itdks.com/Course/detail?id=13213&from=search

  [3]https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/table/streaming/query_configuration.html

相關文章
相關標籤/搜索