一、 結論
從性能和 TTL 兩個維度來描述區別。數據庫
性能數組
- RocksDB 場景,MapState 比 ValueState 中存 Map 性能高不少。
- 生產環境強烈推薦使用 MapState,不推薦 ValueState 中存大對象
- ValueState 中存大對象很容易使 CPU 打滿
- Heap State 場景,二者性能相似。
TTLapp
Flink 中 State 支持設置 TTL:性能
- MapState 的 TTL 是基於 UK 級別的
- ValueState 的 TTL 是基於整個 key 的
觸類旁通優化
能使用 ListState 的場景,不要使用 ValueState 中存 List。大佬們已經把 MapState 和 ListState 性能都作了不少優化,高性能不香嗎?下文會詳細分析 ValueState 和 MapState 底層的實現原理,經過分析原理得出上述結論。阿里雲
二、 State 中要存儲哪些數據
ValueState 會存儲 key、namespace、value,縮寫爲 。MapState 會存儲 key、namespace、userKey、userValue,縮寫爲 。url
解釋一下上述這些名詞。spa
Key.net
ValueState 和 MapState 都是 KeyedState,也就是 keyBy 後才能使用 ValueState 和 MapState。因此 State 中確定要保存 key。code
例如:按照 app 進行 keyBy,總共有兩個 app,分別是:app1 和 app2。那麼狀態存儲引擎中確定要存儲 app1 或 app2,用於區分當前的狀態數據究竟是 app1 的仍是 app2 的。
這裏的 app一、app2 也就是所說的 key。
Namespace
Namespace 用於區分窗口。
假設須要統計 app1 和 app2 每一個小時的 pv 指標,則須要使用小時級別的窗口。狀態引擎爲了區分 app1 在 7 點和 8 點的 pv 值,就必須新增一個維度用來標識窗口。
Flink 用 Namespace 來標識窗口,這樣就能夠在狀態引擎中區分出 app1 在 7 點和 8 點的狀態信息。
Value、UserKey、UserValue
ValueState 中存儲具體的狀態值。也就是上述例子中對應的 pv 值。MapState 相似於 Map 集合,存儲的是一個個 KV 鍵值對。爲了與 keyBy 的 key 進行區分,因此 Flink 中把 MapState 的 key、value 分別叫 UserKey、UserValue。
下面講述狀態引擎是如何存儲這些數據的。
三、StateBackend 如何存儲和讀寫State 數據
Flink 支持三種 StateBackend,分別是:MemoryStateBackend、FsStateBackend 和 RocksDBStateBackend。
其中 MemoryStateBackend、FsStateBackend 兩種 StateBackend 在任務運行期間都會將 State 存儲在內存中,二者在 Checkpoint 時將快照存儲的位置不一樣。RocksDBStateBackend 在任務運行期間將 State 存儲在本地的 RocksDB 數據庫中。
因此下文將 MemoryStateBackend、FsStateBackend 統稱爲 heap 模式,RocksDBStateBackend 稱爲 RocksDB 模式。
3.1 Heap 模式 ValueState 和 MapState 如何存儲
Heap 模式表示全部的狀態數據都存儲在 TM 的堆內存中,全部的狀態都存儲的原始對象,不會作序列化和反序列化。(注:Checkpoint 的時候會涉及到序列化和反序列化,數據的正常讀寫並不會涉及,因此這裏先不討論。)
Heap 模式下,不管是 ValueState 仍是 MapState 都存儲在 CopyOnWriteStateMap 中。
- key 、 Namespace 分別對應 CopyOnWriteStateMap 的 K、N。
- ValueState 的 value 對應 CopyOnWriteStateMap 的 V。
MapState 將會把整個 Map 做爲 CopyOnWriteStateMap 的 V,至關於 Flink 引擎建立了一個 HashMap 用於存儲 MapState 的 KV 鍵值對。
具體 CopyOnWriteStateMap 是如何實現的,能夠參考:萬字長文詳解 Flink 中的 CopyOnWriteStateTable。
回到正題:Heap 模式下,ValueState 中存 Map 與 MapState 有什麼區別?
Heap 模式下沒有區別。
ValueState 中存 Map,至關於用戶手動建立了一個 HashMap 當作 V 放到了狀態引擎中。而 MapState 是 Flink 引擎幫用戶建立了一個 HashMap 當作 V 放到了狀態引擎中。
因此實質上 ValueState 中存 Map 與 MapState 都是同樣的,存儲結構都是 CopyOnWriteStateMap。區別在於 ValueState 是用戶手動建立 HashMap,MapState 是 Flink 引擎建立 HashMap。
3.2 RocksDB 模式 ValueState 和 MapState 如何存儲
RocksDB 模式表示全部的狀態數據存儲在 TM 本地的 RocksDB 數據庫中。RocksDB 是一個 KV 數據庫,且全部的 key 和 value 都是 byte 數組。因此不管是 ValueState 仍是 MapState,存儲到 RocksDB 中都必須將對象序列化成二進制當前 kv 存儲在 RocksDB 中。
3.2.1 ValueState 如何映射成 RocksDB 的 kv
ValueState 有 key、namespace、value 須要存儲,因此最簡單的思路:
一、將 ValueState 的 key 序列化成 byte 數組
二、將 ValueState 的 namespace 序列化成 byte 數組
三、將兩個 byte 數組拼接起來作爲 RocksDB 的 key
四、將 ValueState 的 value 序列化成 byte 數組作爲 RocksDB 的 value
而後就能夠寫入到 RocksDB 中。
查詢數據也用相同的邏輯:將 key 和 namespace 序列化後拼接起來做爲 RocksDB 的 key,去 RocksDB 中進行查詢,查詢到的 byte 數組進行反序列化就獲得了 ValueState 的 value。
這就是 RocksDB 模式下,ValueState 的讀寫流程。
3.2.2 MapState 如何映射成 RocksDB 的 kv
MapState 有 key、namespace、userKey、userValue 須要存儲,因此最簡單的思路:
一、將 MapState 的 key 序列化成 byte 數組
二、將 MapState 的 namespace 序列化成 byte 數組
三、將 MapState 的 userKey 序列化成 byte 數組
四、將三個 byte 數組拼接起來作爲 RocksDB 的 key
五、將 MapState 的 value 序列化成 byte 數組作爲 RocksDB 的 value
而後就能夠寫入到 RocksDB 中。
查詢數據也用相同的邏輯:將 key、namespace、userKey 序列化後拼接起來做爲 RocksDB 的 key,去 RocksDB 中進行查詢,查詢到的 byte 數組進行反序列化就獲得了 MapState 的 userValue。
這就是 RocksDB 模式下,MapState 的讀寫流程。
3.3 RocksDB 模式下,ValueState 中存 Map 與 MapState 有什麼區別?
3.3.1 假設 Map 集合有 100 個 KV 鍵值對,具體兩種方案會如何存儲數據?
ValueState 中存 Map,Flink 引擎會把整個 Map 當作一個大 Value,存儲在 RocksDB 中。對應到 RocksDB 中,100 個 KV 鍵值對的 Map 集合會序列化成一個 byte 數組當作 RocksDB 的 value,存儲在 RocksDB 的 1 行數據中。
MapState 會根據 userKey,將 100 個 KV 鍵值對分別存儲在 RocksDB 的 100 行中。
3.3.2 修改 Map 中的一個 KV 鍵值對的流程
ValueState 的狀況,雖然要修改 Map 中的一個 KV 鍵值對,但須要將整個 Map 集合從 RocksDB 中讀出來。具體流程以下:
一、將 key、namespace 序列化成 byte 數組,生成 RocksDB 的 key
二、從 RocksDB 讀出 key 對應 value 的 byte 數組
三、將 byte 數組反序列化成整個 Map
四、堆內存中修改 Map 集合
五、將 Map 集合寫入到 RocksDB 中,須要將整個 Map 集合序列化成 byte 數組,再寫入
MapState 的狀況,要修改 Map 中的一個 KV 鍵值對,根據 key、namespace、userKey 便可定位到要修改的那一個 KV 鍵值對。具體流程以下:
一、將 key、namespace、userKey 序列化成 byte 數組,生成 RocksDB 的 key
二、從 RocksDB 讀出 key 對應 value 的 byte 數組
三、將 byte 數組反序列化成 userValue
四、堆內存中修改 userValue 的值
五、將 userKey、userValue 寫入到 RocksDB 中,須要先序列化,再寫入
3.3.3 結論
要修改 Map 中的一個 KV 鍵值對:
若是使用 ValueState 中存 Map,則每次修改操做須要序列化反序列化整個 Map 集合,每次序列化反序列大對象會很是耗 CPU,很容易將 CPU 打滿。
若是使用 MapState,每次修改操做只須要序列化反序列化 userKey 那一個 KV 鍵值對的數據,效率較高。
觸類旁通:其餘使用 ValueState、value 是大對象且 value 頻繁更新的場景,都容易將 CPU 打滿。
例如:ValueState 中存儲的位圖,若是每條數據都須要更新位圖,則可能致使 CPU 被打滿。
爲了便於理解,上述忽略了一些實現細節,下面補充一下。
3.4 直接拼接 key 和 namespace 可能致使 RocksDB 的 key 衝突
假設 ValueState 中有兩個數據:
- key1 序列化後的二進制爲 0x112233, namespace1 序列化後的二進制爲0x4455
- key2 序列化後的二進制爲 0x1122, namespace2 序列化後的二進制爲0x334455
這兩個數據對應的 RocksDB key 都是 0x1122334455,這樣的話,兩個不一樣的 key、namespace 映射到 RocksDB 中變成了相同的數據,沒法作區分。
解決方案:
在 key 和 namespace 中間寫入 key 的 byte 數組長度,在 namespace 後寫入 namespace 的 byte 長度。
寫入這兩個長度就不可能出現 key 衝突了,具體爲何,讀者能夠自行思考。
3.5 RocksDB 的 key 中還會存儲 KeyGroupId
對 KeyGroup 不瞭解的同窗能夠參考:Flink 源碼:從 KeyGroup 到 Rescale。
加上 KeyGroupId 也比較簡單。只須要修改 RocksDB key 的拼接方式,在序列化 key 和 namespace 以前,先序列化 KeyGroupId 便可。
4. State TTL 簡述
Flink 中 TTL 的實現,都是將用戶的 value 封裝了一層,具體參考下面的 TtlValue 類:
public class TtlValue<T> implements Serializable { @Nullable private final T userValue; private final long lastAccessTimestamp; }
TtlValue 類中有兩個字段,封裝了用戶的 value 且有一個時間戳字段,這個時間戳記錄了這條數據寫入的時間。
若是開啓了 TTL,則狀態中存儲的 value 就是 TtlValue 對象。時間戳字段也會保存到狀態引擎中,以後查詢數據時,就能夠經過該時間戳判斷數據是否過時。
- ValueState 將 value 封裝爲 TtlValue。
- MapState 將 userValue 封裝成 TtlValue。
- ListState 將 element 封裝成 TtlValue。
ValueState 中存 Map 與 MapState 有什麼區別?
若是 ValueState 中存 Map,則整個 Map 被當作 value,只維護一個時間戳。因此要麼整個 Map 過時,要麼都不過時。
MapState 中若是存儲了 100 個 KV 鍵值對,則 100 個 KV 鍵值對都會存儲各自的時間戳。所以每一個 KV 鍵值對的 TTL 是相互獨立的。
5.總結
本文從實現原理詳細分析了 ValueState 中存 Map 與 MapState 有什麼區別?下面將從性能和 TTL 兩個維度來描述二者的區別。
性能
- RocksDB 場景,MapState 比 ValueState 中存 Map 性能高不少,ValueState 中存大對象很容易使 CPU 打滿
- Heap State 場景,二者性能相似
TTL
Flink 中 State 支持設置 TTL,TTL 只是將時間戳與 userValue 封裝起來。
- MapState 的 TTL 是基於 UK 級別的
- ValueState 的 TTL 是基於整個 key 的
本文爲阿里雲原創內容,未經容許不得轉載。