Join 的實現依賴於緩存整個數據集,而 Streaming SQL Join 的對象倒是無限的數據流,內存壓力和計算效率在長期運行來講都是不可避免的問題。下文將結合 SQL 的發展解析 Flink SQL 是如何解決這些問題並實現兩個數據流的 Join。數據庫
不管在 OLAP 仍是 OLTP 領域,Join 都是業務常會涉及到且優化規則比較複雜的 SQL 語句。對於離線計算而言,通過數據庫領域多年的積累,Join 語義以及實現已經十分紅熟,然而對於近年來剛興起的 Streaming SQL 來講 Join 卻處於剛起步的狀態。編程
其中最爲關鍵的問題在於 Join 的實現依賴於緩存整個數據集,而 Streaming SQL Join 的對象倒是無限的數據流,內存壓力和計算效率在長期運行來講都是不可避免的問題。下文將結合 SQL 的發展解析 Flink SQL 是如何解決這些問題並實現兩個數據流的 Join。緩存
傳統的離線 Batch SQL (面向有界數據集的 SQL)有三種基礎的實現方式,分別是 Nested-loop Join、Sort-Merge Join 和 Hash Join。安全
相對於離線的 Join,實時 Streaming SQL(面向無界數據集的 SQL)沒法緩存全部數據,所以 Sort-Merge Join 要求的對數據集進行排序基本是沒法作到的,而 Nested-loop Join 和 Hash Join 通過必定的改良則能夠知足實時 SQL 的要求。
咱們經過例子來看基本的 Nested Join 在實時 Streaming SQL 的基礎實現(案例及圖來自 Piotr Nowojski 在 Flink Forward San Francisco 的分享[2])。app
圖1. Join-in-continuous-query-1運維
Table A 有 一、42 兩個元素,Table B 有 42 一個元素,因此此時的 Join 結果會輸出 42。ide
圖2. Join-in-continuous-query-2oop
接着 Table B 依次接受到三個新的元素,分別是 七、三、1。由於 1 匹配到 Table A 的元素,所以結果表再輸出一個元素 1。優化
圖3. Join-in-continuous-query-3ui
隨後 Table A 出現新的輸入 二、三、6,3 匹配到 Table B 的元素,所以再輸出 3 到結果表。
能夠看到在 Nested-Loop Join 中咱們須要保存兩個輸入表的內容,而隨着時間的增加 Table A 和 Table B 須要保存的歷史數據無止境地增加,致使很不合理的內存磁盤資源佔用,並且單個元素的匹配效率也會愈來愈低。相似的問題也存在於 Hash Join 中。
那麼有沒有可能設置一個緩存剔除策略,將沒必要要的歷史數據及時清理呢?答案是確定的,關鍵在於緩存剔除策略如何實現,這也是 Flink SQL 提供的三種 Join 的主要區別。
Regular Join 是最爲基礎的沒有緩存剔除策略的 Join。Regular Join 中兩個表的輸入和更新都會對全局可見,影響以後全部的 Join 結果。舉例,在一個以下的 Join 查詢裏,Orders 表的新紀錄會和 Product 表全部歷史紀錄以及將來的紀錄進行匹配。
SELECT * FROM Orders INNER JOIN Product ON Orders.productId = Product.id
由於歷史數據不會被清理,因此 Regular Join 容許對輸入表進行任意種類的更新操做(insert、update、delete)。然而由於資源問題 Regular Join 一般是不可持續的,通常只用作有界數據流的 Join。
Time-Windowed Join 利用窗口給兩個輸入表設定一個 Join 的時間界限,超出時間範圍的數據則對 JOIN 不可見並能夠被清理掉。值得注意的是,這裏涉及到的一個問題是時間的語義,時間能夠指計算髮生的系統時間(即 Processing Time),也能夠指從數據自己的時間字段提取的 Event Time。若是是 Processing Time,Flink 根據系統時間自動劃分 Join 的時間窗口並定時清理數據;若是是 Event Time,Flink 分配 Event Time 窗口並依據 Watermark 來清理數據。
以更經常使用的 Event Time Windowed Join 爲例,一個將 Orders 訂單表和 Shipments 運輸單表依據訂單時間和運輸時間 Join 的查詢以下:
SELECT * FROM Orders o, Shipments s WHERE o.id = s.orderId AND s.shiptime BETWEEN o.ordertime AND o.ordertime + INTERVAL '4' HOUR
這個查詢會爲 Orders 表設置了 o.ordertime > s.shiptime- INTERVAL ‘4’ HOUR 的時間下界(圖4)。
圖4. Time-Windowed Join 的時間下界 - Orders 表
併爲 Shipmenets 表設置了 s.shiptime >= o.ordertime 的時間下界(圖5)。
圖5. Time-Windowed Join 的時間下界 - Shipment 表
所以兩個輸入表都只須要緩存在時間下界以上的數據,將空間佔用維持在合理的範圍。
不過雖然底層實現上沒有問題,但如何經過 SQL 語法定義時間還是難點。儘管在實時計算領域 Event Time、Processing Time、Watermark 這些概念已經成爲業界共識,但在 SQL 領域對時間數據類型的支持仍比較弱[4]。所以,定義 Watermark 和時間語義都須要經過編程 API 的方式完成,好比從 DataStream 轉換至 Table ,不能單純靠 SQL 完成。這方面的支持 Flink 社區計劃經過拓展 SQL 方言來完成,感興趣的讀者能夠經過 FLIP-66[7] 來追蹤進度。
雖然 Timed-Windowed Join 解決了資源問題,但也限制了使用場景: Join 兩個輸入流都必須有時間下界,超過以後則不可訪問。這對於不少 Join 維表的業務來講是不適用的,由於不少狀況下維表並無時間界限。針對這個問題,Flink 提供了 Temporal Table Join 來知足用戶需求。
Temporal Table Join 相似於 Hash Join,將輸入分爲 Build Table 和 Probe Table。前者通常是緯度表的 changelog,後者通常是業務數據流,典型狀況下後者的數據量應該遠大於前者。在 Temporal Table Join 中,Build Table 是一個基於 append-only 數據流的帶時間版本的視圖,因此又稱爲 Temporal Table。Temporal Table 要求定義一個主鍵和用於版本化的字段(一般就是 Event Time 時間字段),以反映記錄在不一樣時間的內容。
好比典型的一個例子是對商業訂單金額進行匯率轉換。假設有一個 Orders 流記錄訂單金額,須要和 RatesHistory 匯率流進行 Join。RatesHistory 表明不一樣貨幣轉爲日元的匯率,每當匯率有變化時就會有一條更新記錄。兩個表在某一時間節點內容以下:
圖6. Temporal Table Join Example]
咱們將 RatesHistory 註冊爲一個名爲 Rates 的 Temporal Table,設定主鍵爲 currency,版本字段爲 time。
圖7. Temporal Table Registration]
此後給 Rates 指定時間版本,Rates 則會基於 RatesHistory 來計算符合時間版本的匯率轉換內容。
圖8. Temporal Table Content]
在 Rates 的幫助下,咱們能夠將業務邏輯用如下的查詢來表達:
SELECT o.amount * r.rate FROM Orders o, LATERAL Table(Rates(o.time)) r WHERE o.currency = r.currency
值得注意的是,不一樣於在 Regular Join 和 Time-Windowed Join 中兩個表是平等的,任意一個表的新記錄均可以與另外一表的歷史記錄進行匹配,在 Temporal Table Join 中,Temoparal Table 的更新對另外一表在該時間節點之前的記錄是不可見的。這意味着咱們只須要保存 Build Side 的記錄直到 Watermark 超過記錄的版本字段。由於 Probe Side 的輸入理論上不會再有早於 Watermark 的記錄,這些版本的數據能夠安全地被清理掉。
實時領域 Streaming SQL 中的 Join 與離線 Batch SQL 中的 Join 最大不一樣點在於沒法緩存完整數據集,而是要給緩存設定基於時間的清理條件以限制 Join 涉及的數據範圍。根據清理策略的不一樣,Flink SQL 分別提供了 Regular Join、Time-Windowed Join 和 Temporal Table Join 來應對不一樣業務場景。
另外,儘管在實時計算領域 Join 能夠靈活地用底層編程 API 來實現,但在 Streaming SQL 中 Join 的發展仍處於比較初級的階段,其中關鍵點在於如何將時間屬性合適地融入 SQL 中,這點 ISO SQL 委員會制定的 SQL 標準並無給出完整的答案。或者從另一個角度來說,做爲 Streaming SQL 最先的開拓者之一,Flink 社區很適合探索出一套合理的 SQL 語法反過來貢獻給 ISO。
做者介紹:
林小鉑,網易遊戲高級開發工程師,負責遊戲數據中心實時平臺的開發及運維工做,目前專一於 Apache Flink 的開發及應用。探究問題原本就是一種樂趣。