流計算中一個常見的需求就是爲數據流補齊字段。由於數據採集端採集到的數據每每比較有限,在作數據分析以前,就要先將所需的維度信息補全。好比採集到的交易日誌中只記錄了商品 id,可是在作業務時須要根據店鋪維度或者行業緯度進行聚合,這就須要先將交易日誌與商品維表進行關聯,補全所需的維度信息。這裏所說的維表與數據倉庫中的概念相似,是維度屬性的集合,好比商品維,地點維,用戶維等等。html
在流計算中,這是一個典型的 stream-to-table jon 的問題。本文主要講解在 Flink SQL 中是如何解決這個問題的,用戶如何簡單上手使用這個功能。sql
因爲維表是一張不斷變化的表(靜態表只是動態表的一種特例)。那如何 JOIN 一張不斷變化的表呢?若是用傳統的 JOIN 語法SELECT * FROM T JOIN dim_table on T.id = dim_table.id
來表達維表 JOIN,是不完整的。由於維表是一直在更新變化的,若是用這個語法那麼關聯上的是哪一個時刻的維表呢?咱們是不知道的,結果是不肯定的。因此 Flink SQL 的維表 JOIN 語法引入了 SQL:2011 Temporal Table 的標準語法,用來聲明關聯的是維表哪一個時刻的快照。維表 JOIN 語法/示例以下。數據庫
假設咱們有一個 Orders 訂單數據流,但願根據產品 ID 補全流上的產品維度信息,因此須要跟 Products 維度表進行關聯。Orders 和 Products 的 DDL 聲明語句以下:apache
CREATE TABLE Orders ( orderId VARCHAR, -- 訂單 id productId VARCHAR, -- 產品 id units INT, -- 購買數量 orderTime TIMESTAMP -- 下單時間 ) with ( type = 'tt', -- tt 日誌流 ... ) CREATE TABLE Products ( productId VARCHAR, -- 產品 id name VARCHAR, -- 產品名稱 unitPrice DOUBLE -- 單價 PERIOD FOR SYSTEM_TIME, -- 這是一張隨系統時間而變化的表,用來聲明維表 PRIMARY KEY (productId) -- 維表必須聲明主鍵 ) with ( type = 'alihbase', -- HBase 數據源 ... )
如上聲明瞭一張來自 TT 的 Orders 訂單數據流,和一張存儲於 HBase 中的 Products 產品維表。爲了補齊訂單流的產品信息,須要 JOIN 維表,這裏能夠分爲 JOIN 當前表和 JOIN 歷史表。緩存
SELECT * FROM Orders AS o [LEFT] JOIN Products FOR SYSTEM_TIME AS OF PROCTIME() AS p ON o.productId = p.productId
Flink SQL 支持 LEFT JOIN 和 INNER JOIN 的維表關聯。如上語法所示的,維表 JOIN 語法與傳統的 JOIN 語法並沒有二異。只是 Products 維表後面須要跟上 FOR SYSTEM_TIME AS OF PROCTIME()
的關鍵字,其含義是每條到達的數據所關聯上的是到達時刻的維錶快照,也就是說,當數據到達時,咱們會根據數據上的 key 去查詢遠程數據庫,拿到匹配的結果後關聯輸出。這裏的 PROCTIME
即 processing time。使用 JOIN 當前維表功能須要注意的是,若是維表插入了一條數據能匹配上以前左表的數據時,JOIN的結果流,不會發出更新的數據以彌補以前的未匹配。JOIN行爲只發生在處理時間(processing time),即便維表中的數據都被刪了,以前JOIN流已經發出的關聯上的數據也不會被撤回或改變。網絡
SELECT * FROM Orders AS o [LEFT] JOIN Products FOR SYSTEM_TIME AS OF o.orderTime AS p ON o.productId = p.productId
有時候想關聯上的維度數據,並非當前時刻的值,而是某個歷史時刻的值。好比,產品的價格一直在發生變化,訂單流但願補全的是下單時的價格,而不是當前的價格,那就是 JOIN 歷史維表。語法上只須要將上文的 PROCTIME()
改爲 o.orderTime
便可。含義是關聯上的是下單時刻的 Products 維表。併發
Flink 在獲取維度數據時,會根據左流的時間去查對應時刻的快照數據。所以 JOIN 歷史維表須要外部存儲支持多版本存儲,如 HBase,或者存儲的數據中帶有多版本信息。異步
注:JOIN 歷史維表功能目前暫未開放ide
在實際使用的過程當中,會遇到許多性能問題。爲了解決這些性能問題,咱們作了大量的優化,性能獲得大幅提高,在雙11的洪峯下表現特別淡定。性能
咱們的優化主要是爲了解決兩方面的問題:
1. 提升吞吐。維表的IO請求嚴重阻塞了數據流的計算處理。
2. 下降維表數據庫讀壓力。如 HBase 也只能承受單機每秒 20 萬的讀請求。
我在 《Flink 原理與實現:Aysnc I/O》 中介紹了 Async IO 的使用場景和實現原理。原始的維表 JOIN 是同步訪問的方式,來一條數據,去數據庫查詢一次,等待返回後輸出關聯結果。能夠發現網絡等待時間極大地阻礙了吞吐和延遲。爲了解決同步訪問的問題,異步模式能夠併發地處理多個請求和回覆,從而連續的請求之間不須要阻塞等待。
數據庫的維表查詢請求,有大量相同 key 的重複請求。如何減小重複請求?本地緩存是經常使用的方案。Flink SQL 目前提供兩種緩存方案:LRU 和 ALL。(詳見文檔)
經過 cache='LRU'
參數能夠開啓 LRU 緩存優化,Blink 會爲每一個 JoinTable 節點建立一個 LRU 本地緩存。當每一個數據進來的時候,先去緩存中查詢,若是存在則直接關聯輸出,減小了一次 IO 請求。若是不存在,再發起數據庫查詢請求(異步或同步方式),請求返回的結果會先存入緩存中以備下次查詢。
爲了防止緩存無限制增加,因此使用的是 LRU 緩存,而且能夠經過 cacheSize
調整緩存的大小。爲了按期更新維表數據,能夠經過 cacheTTLMs
調整緩存的失效時間。cacheTTLMs
是做用於每條緩存數據上的,也就是某條緩存數據在指定 timeout 時間內沒有被訪問,則會從緩存中移除。
Async 和 LRU-Cache 能極大提升吞吐率並下降數據庫的讀壓力,可是仍然會有大量的 IO 請求存在,尤爲是當 miss key(維表中不存在的 key)不少的時候。若是維表數據不大(一般百萬級之內),那麼其實能夠將整個維表緩存到本地。那麼 miss key 永遠不會去請求數據庫。由於本地緩存就是維表的鏡像,緩存中不存在那麼遠程數據庫中也不存在。
ALL cache 能夠經過 cache='ALL'
參數開啓,經過cacheTTLMs
控制緩存的刷新間隔。Flink SQL 會爲 JoinTable 節點起一個異步線程去同步緩存。在 Job 剛啓動時,會先阻塞主數據流,直到緩存數據加載完畢,保證主數據流流過期緩存就已經 ready。在以後的更新緩存的過程當中,不會阻塞主數據流。由於異步更新線程會將維表數據加載到臨時緩存中,加載完畢後再與主緩存作原子替換。只有替換操做是加了鎖的。
由於幾乎沒有 IO 操做,因此使用 cache ALL 的維表 JOIN 性能能夠很是高。可是因爲內存須要能同時容納下兩份維表拷貝,所以須要加大內存的配置。
在使用 LRU 緩存時,若是存在大量的 invalid key ,或者數據庫中不存在的 key。因爲命中不了緩存,致使緩存的收益較低,仍然會有大量請求打到數據庫。所以咱們將未命中的 key 也加進了緩存,提升了未命中 key 和 invalid key 狀況下的緩存命中率。
默認 JoinTable 節點與上游節點之間的數據是經過 shuffle 傳輸的。這在緩存大小有限、key總量大、熱點不明顯的狀況下, 緩存的收益可能較低。這種狀況下能夠將上游節點與 JoinTable 節點的數據傳輸改爲按 key 分區。這樣一般能夠縮小單個節點的 key 個數,提升緩存的命中率。好比一段時間內 JoinTable 節點總共須要處理100萬個key, 節點併發100, 在數據不傾斜時單節點平均只需處理1萬個key = 100萬/100併發. 若是不作 key 分區, 單節點實際處理的key個數可能遠大於1萬。使用上也很是簡單,在維表的 DDL 參數中加上partitionedJoin='true'
便可。
在使用維表 JOIN 時,若是維表數據不大,或者 miss key (維表中不存在的 key)很是多,則可使用 ALL cache,可是可能須要適當調大節點的內存,由於內存須要能同時容納下兩份維表拷貝。若是用不了 ALL cache,則可使用 Async + LRU 來提升節點的吞吐。
流計算中維表 JOIN 是一個很是常見的需求,遇到的挑戰也很是多。好比超大維表問題,ALL cache 沒法裝下整個維表。將來咱們打算引入 Partitioned-ALL-cache,也就是上游數據到 JoinTable 節點根據 JOIN key 分區,那麼每一個節點只須要加載屬於該分區key的緩存數據,從而作到了緩存的水平擴展。從而遇到超大維表時能夠經過擴併發也可以全量緩存下維表數據。另外,ALL cache 如今每一個節點是都會起一個線程去加載全量維表數據,若是有1000個節點,則會全量讀數據庫1000次。將來打算經過 Side Input功能作到只須要全量讀取一次,維表數據會自動分發到各個節點。
另外,Async 極大地提升了吞吐,可是每一次 IO 請求只取了單 key 的數據,效率比較低。將來計劃使用 Batch Get 來提升每次 IO 請求的吞吐。
目前在 Table API 上已經支持了 Multi-Join 的優化,能極大提升多維表連續 JOIN 時的性能,減小網絡數據的傳輸開銷。將來會在 SQL 上也支持 Multi-Join 的優化。