做者:Piotr Nowojsk正則表達式
整理:崔星燦算法
爲什麼選擇 SQL數據庫
流式 SQL 面臨的挑戰編程
Flink 流式 SQL 提供的多種 Join安全
模式識別編程語言
其餘近期成果學習
SQL 最大的優點在於普及率高,用戶基數大。和其餘編程語言相比,它的入門相對簡單。做爲一類聲明式語言,它容許用戶只告訴引擎想要作什麼,而無需指定如何作。所以即便是小白用戶,也可輕鬆(講道理有時候也不過輕鬆)利用 SQL 表達本身所需的邏輯。SQL 優化器會最大限度幫助你提升效率。優化
雖然 SQL 很優秀,但想要在數據流上執行它卻並不容易。咱們經過一個實際例子看一下:網站
上圖展現了一個簡單的雙流 Join,其語義很簡單,找出表 A 和表 B 中相同的 id。爲了執行 Join ,須要用到相關算法,咱們首先來回顧一下兩個經典的 Join 算法:ui
歸併鏈接算法的思路很簡單。拿到兩表後,咱們首先將它們 id 由小到大排好序,而後從前日後同時遍歷兩表,一旦遇到相等的 id 就輸出一條 Join 結果。
在執行哈希鏈接算法以前,咱們首先要對兩表的規模有一個估計,而後選取較小的一張表(示例中的表 B),以 id 值爲 key,以數據行爲[MOU1] value,創建哈希索引。隨後就能夠遍歷另外一側大表,對每個 id 值到創建好的哈希索引中查找有沒有 id 相等的數據行,若是有則輸出 Join 結果。
上述兩種算法有一個區別:在哈希鏈接算法中咱們只須要將較小的一張表加載到內存;而在歸併鏈接算法中,咱們須要將兩張表都加載到內存。這個區別很是關鍵,由於在現實世界中兩表一般會有大小之分。
回顧完經典算法,咱們再來看一下流式場景下的持續查詢該怎樣執行。所謂持續查詢指的是在沒有外界干預的狀況下,查詢會一直輸出結果,不會中止。因爲輸入數據流自己無窮無盡,所以在上面的查詢一般都是持續查詢,即每當新數據到來,可能都會有新的查詢結果產生。
仍是以以前的 Join 爲例,假設最開始只有表B中有一條數據,因爲表 A 爲空,這時候顯然沒有結果輸出。隨後表A中插入一條數據1,但因爲表 B 中沒有與之相等的 id,所以依然沒有結果。直到表 A 中42到來,纔有和表 B 中已有數據相等的 id,並生成相應的 Join 結果。以後表 B 中插入數據7,但不會對結果產生影響。
在傳統數據庫中容許針對一張表進行全表掃描,但在流式場景下卻作不到,其緣由在於:一、咱們沒法控制下一條到來的數據屬於表 A 仍是表 B;二、流數據具備無盡性。既然沒法進行全表掃描,傳統的歸併鏈接算法和哈希鏈接算法都沒法簡單應用。
普通鏈接
那 Flink SQL 裏面是怎麼實現 Join 的呢?簡單來講,內部的 Join 算子會同時維持 A、B 兩張表的哈希索引。當表 A 中插入一條新數據時,會去表 B 的哈希索引中嘗試尋找知足 Join 條件的數據,同時將新到來的數據加入表 A 的哈希索引中。表 B 亦然。但這種思路有一個問題:隨着數據的到來,哈希表可能會無限增加下去。這時候能夠經過狀態 TTL 等手段加以限制,也能夠考慮選用其餘種類的 Join。
時間窗口鏈接
在介紹時間窗口鏈接以前須要首先普及一下 Watermark(水位線)的概念。
現實世界中,因爲數據來源多種多樣且傳輸過程充滿不肯定性,所以數據亂序時有發生。爲了在必定程度上緩解該問題,Flink 引入了 Watermark 機制。所謂 Watermark 就是安插在數據流中的一些特殊控制數據。既然數據流存在「亂序」的概念,那表明着每條數據都會有相應的事件時間戳(也多是其餘的次序標記),Watermark 也有本身的時間戳。每當算子遇到時間戳爲 t 的 Watermark,均可以認爲將不會再收到事件時間戳小於或等於 t 的數據。
瞭解完 Watermark 的概念後,咱們回到時間窗口鏈接。
上圖是一個簡單的時間窗口鏈接查詢示例。和普通 equi-join 相比,它多出來一個查詢條件,即限制一側表的時間須要在另外一側表的時間所定義的一個窗口內(示例查詢中,限制運輸時間須要在訂單記錄產生後的4小時內)。有了這個時間窗口條件,就能夠幫助咱們清理無用的哈希表(狀態)。由於對任意一條流中的數據 x ,在另外一條流上都有一個知足 Join 條件的時間下限(最遲時間),一旦 Watermark 超過這個時間,就表示另一條流上後面到來的數據將不會再和 x 產生 Join 結果,此時就能夠安全地將 x 從狀態中清除。
時間窗口鏈接只適合自然存在窗口條件的場景,由於某條數據一旦過時就會被永久刪除,再也沒法產生包含它的 Join 結果。爲了應對更多場景,Flink SQL 在近期版本新加入了歷史表(Temporal Table)和相應的 Join 功能。
歷史錶鏈接
在介紹此類 Join 以前,咱們須要理解歷史表的概念。它是在 ANSI SQL 2011中新引入的特性,能夠簡單理解爲,對於一個隨時間不斷變化的表 T ,每給一個時間 t,都會有一個對應該時間(或版本)的表格快照 T(t)。下面咱們來看一個示例。
上圖展現了一個貨幣匯率隨時間變化的 changelog (不是表自己!),每條記錄表示某貨幣在對應時間點的匯率值。爲了在 Flink SQL 中爲它註冊一個歷史表,須要用到 TemporalTableFunction(沒錯,歷史表能夠在必定程度上理解爲一個 Time-Versioned TableFunction)。具體註冊過程以下圖所示,其中第一個參數「time」是時間字段,第二個參數「currency」表示主鍵。
如此,每給一個時間,就會返回全部幣種在那個時間的最新匯率狀況。 具體怎麼用歷史表作 Join 呢?仍是經過示例說明,假設咱們還有一個訂單表,裏面每行記錄表示在對應時間(time)利用必定數量(amount)的某種貨幣(currency)所生成的訂單。一個很容易想到的操做就是根據匯率,將每一個訂單的貨幣量都轉化爲本地貨幣的量。因爲貨幣匯率不斷變化,咱們須要用一些複雜的 Join 條件來完成上述任務,但若是用歷史錶鏈接,就變得很直觀:
這裏除了以前提到的歷史表,還引入了一個 LATERAL 關鍵字,它表示針對訂單表中的每一條數據,都須要生成一個新的匯率表。
Join 的執行過程也比較直觀,每到來一條新的訂單數據,根據對應時間的最新匯率計算便可(這裏面其實還有一些沒講到的細節問題,留給讀者思考吧)。當訂單流的 Watermark 超過必定數值後,能夠安全地將過時的匯率記錄刪除,從而限制狀態的無限增加。
簡單總結一下不一樣 Join 的特色。
OK,接下來咱們再看一看 Flink SQL 近期新加的模式識別功能。這裏的模式指的是一些能用正則表達式描述的某些數據特徵序列。
SQL 中有專門的「MATCH_RECOGNIZE」語句來作模式檢測,以下圖所示:
這個查詢的大體要作的事情是:從 Ticker 表內按照不一樣 symbol 分組,找出一段時間內價格均值小於15的連續事件序列。雖然看上去很嚇人,但語句其實不難理解,咱們一步一步來看:
最上方的「SELECT * FROM Ticker」表示要針對 Ticker 表作模式識別。
接下來的「PARTITION BY symbol」和傳統 SQL 的「GROUP BY」相似,都是將數據按照某些列進行分組。
隨後的「ORDER BY rowtime」用於指定每一個分組內的數據順序(時間升序)。
進入匹配環節,首先看最下面的「PATTERN...DEFINE…」子句,它用來定義咱們想要識別的模式。在示例中表示出現包含1個以上連續事件的模式 A 和一個緊跟的模式 B。其中模式 A 中事件序列的價格均值須要小於15,而模式 B 因爲未提供定義,所以能夠是任意事件。[Office2]
回到上方的「MEASURES...」子句,它定義在發現模式後咱們但願輸出的具體結果。在示例中,一旦匹配成功將會返回模式 B 以及模式 A 中的事件數。
下方的「ONE ROW PER MATCH」表示針對每一個匹配成功的模式,輸出一條結果。除了ONE ROW PER MATCH,SQL標準中還支持ALL ROWS PER MATCH,它表示對於每一個知足模式的數據流中的每條數據,產生一條輸出結果。Flink SQL目前還不支持ALL ROWS PER MATCH語句。
最後「AFTER MATCH TO FIRST B」是一個匹配選項,示例中表示當產生一個成功的匹配串以後,下一個匹配串的查找,從此次匹配成功的匹配串中的模式B開始。
總結一下利用 MATCH_RECOGNIZE 子句進行模式識別的兩個點。其一是它和 GROUP BY 子句相似,能夠理解爲執行一個特殊的聚合;其二是這個子句已經成爲 SQL 2016 標準的一部分。
最後咱們來看一下2018年其餘已經完成或正在進行的 Flink SQL 相關工做。
2018年 Flink 新添加了一個 SQL Client 模塊,容許用戶在配置好數據源等信息的前提下,直接經過命令行調用 Flink SQL,無須用 Java、Scala、Python 等語言進行編碼。現階段 SQL Client 的功能還有些侷限,比較適合快速開發原型等場合,你們若是有興趣能夠多參與貢獻。
社區一直在竭盡全力地拓展 Flink 與其餘項目之間的鏈接及適配工做,近期正在添加外部 Catalog 功能。該功能完成後 Flink 將能夠直接訪問 Cassandra、Hive 等系統的元數據。
除了上述兩點,Flink SQL 在2018年還有不少新功能及改進,你們能夠去閱讀官方文檔或源碼學習。
[MOU1]容易誤讀,建議改成以數據爲value
[Office2]在示例中所定義的模式表示「事件A出現1次以上,在事件A後面跟着一個事件B」。其中事件A定義爲當前全部匹配A的事件(包括當前待匹配的事件)的價格均值小於15,因爲未提供事件B的定義,能夠認爲任意事件均可以匹配B。
更多資訊請訪問 Apache Flink 中文社區網站