有狀態的計算做爲容錯以及數據一致性的保證,是當今實時計算必不可少的特性之一,流行的實時計算引擎包括 Google Dataflow、Flink、Spark (Structure) Streaming、Kafka Streams 都分別提供對內置 State 的支持。State 的引入使得實時應用能夠不依賴外部數據庫來存儲元數據及中間數據,部分狀況下甚至能夠直接用 State 存儲結果數據,這讓業界不由思考: State 和 Database 是何種關係?有沒有可能用 State 來代替數據庫呢?數據庫
在這個課題上,Flink 社區是比較早就開始探索的。整體來講,Flink 社區的努力能夠分爲兩條線: 一是在做業運行時經過做業查詢接口訪問 State 的能力,即 QueryableState;二是經過 State 的離線 dump 文件(Savepoint)來離線查詢和修改 State 的能力,即即將引入的 Savepoint Processor API。網絡
在 2017 年發佈的 Flink 1.2 版本,Flink 引入了 QueryableState 的特性以容許用戶經過特定的 client 查詢做業 State 的內容 [1],這意味着 Flink 應用能夠在徹底不依賴 State 存儲介質之外的外部存儲的狀況下提供實時訪問計算結果的能力。數據結構
只經過 Queryable State 提供實時數據訪問架構
然而,QueryableState 雖然設想上比較理想化,但因爲依賴底層架構的改動較多且功能也比較受限,它一直處於 Beta 版本並不能用於生產環境。針對這個問題,在前段時間騰訊的工程師楊華提出 QueryableState 的改進計劃 [2]。在郵件列表中,社區就 QueryableState 是否能夠用於代替數據庫做了討論並出現了不一樣的觀點。筆者結合我的看法將 State as Database 的主要優缺點整理以下。併發
優勢:運維
缺點:函數
整體來講,目前 State 代替數據庫的缺點仍是遠多於其優勢,不過對於某些對數據可用性要求不高的做業來講,使用 State 做爲數據庫仍是徹底合理的。因爲定位上的不一樣,Flink State 在短期內很難看到能夠徹底替代數據庫的可能性,但在數據訪問特性上 State 往數據庫方向發展是無需質疑的。優化
Savepoint Processor API 是社區最近提出的一個新特性(見 FLIP-42 [3]),用於離線對 State 的 dump 文件 Savepoint 進行分析、修改或者直接根據數據構建出一個初始的 Savepoint。Savepoint Processor API 屬於 Flink State Evolution 的 State Management。若是說 QueryableState 是 DSL 的話,Flink State Evolution 就是 DML,而 Savepoint Processor API 就是 DML 中最爲重要的部分。spa
Savepoint Processor API 的前身是第三方的 Bravo 項目 [4],主要思路提供 Savepoint 和 DataSet 相互轉換的能力,典型應用是 Savepoint 讀取成 DataSet,在 DataSet 上進行修改,而後再寫爲一個新的 Savepoint。這適合用於如下的場景:blog
修改 Savepoint,好比:
Savepoint 做爲 State 的 dump 文件,經過 Savepoint Processor API 能夠暴露數據查詢和修改功能,相似於一個離線的數據庫,但 State 的概念和典型關係型數據的概念仍是有不少不一樣,FLIP-43 也對這些差別進行了類比和總結。
首先 Savepoint 是多個 operator 的 state 的物理存儲集合,不一樣 operator 的 state 是獨立的,這相似於數據庫下不一樣 namespace 之間的 table。咱們能夠獲得 Savepoint 對應數據庫,單個 operator 對應 Namespace。
Database
Savepoint
Namespace
Uid
Table
State
但就 table 而言,其在 Savepoint 裏對應的概念根據 State 類型的不一樣而有所差異。State 有 Operator State、Keyed State 和 Broadcast State 三種,其中 Operator State 和 Broadcast State 屬於 non-partitioned state,即沒有按 key 分區的 state,而相反地 Keyed State 則屬於 partitioned state。對於 non-partitioned state 來講,state 是一個 table,state 的每一個元素便是 table 裏的一行;而對於 partitioned state 來講,同一個 operator 下的全部 state 對應一個 table。這個 table 像是 HBase 同樣有個 row key,而後每一個具體的 state 對應 table 裏的一個 column。
舉個例子,假設有一個遊戲玩家得分和在線時長的數據流,咱們須要用 Keyed State 來記錄玩家所在組的分數和遊戲時長,用 Operator State 記錄玩家的總得分和總時長。
在一段時間內數據流的輸入以下:
user_id
user_name
user_group
score
1001
Paul
A
5,000
1002
Charlotte
A
3,600
1003
Kate
C
2,000
1004
Robert
B
3,900
user_id
user_name
user_group
time
1001
Paul
A
1,800
1002
Charlotte
A
1,200
1003
Kate
C
600
1004
Robert
B
2,000
用 Keyed State ,咱們分別註冊 group_score 和 group_time 兩個 MapState 表示組總得分和組總時長,並根據 user_group keyby 數據流以後將兩個指標的累積值更新到 State 裏,獲得的表以下:
user_group
group_score
group_time
A
8,600
3,000
C
2,00
600
B
3,900
2,000
相對地,假如用 Operator State 來記錄總得分和總時長(並行度設爲 1),咱們註冊 total_score 和 total_time 兩個 State,獲得的表有兩個:
total_score |
------- |
14,500 |
total_time
5,600
至此 Savepoint 和 Database 的對應關係應該是比較清晰明瞭的。而對於 Savepoint 來講還有不一樣的 StateBackend 來決定 State 具體如何持續化,這顯然對應的是數據庫的存儲引擎。在 MySQL 中,咱們能夠經過簡單的一行命令 ALTER TABLE xxx ENGINE = InnoDB; 來改變存儲引擎,在背後 MySQL 會自動完成繁瑣的格式轉換工做。而對於 Savepoint 來講,因爲 StateBackend 各自的存儲格式不兼容,目前尚不能方便地切換 StateBackend。爲此,社區在不久前建立 FLIP-41 [5] 來進一步完善 Savepoint 的可操做性。
State as Database 是實時計算髮展的大趨勢,它並非要代替數據庫的使用,而是借鑑數據庫領域的經驗拓展 State 接口使其操做方式更接近咱們熟悉的數據庫。對於 Flink 而言,State 的外部使用能夠分爲在線的實時訪問和離線的訪問和修改,分別將由 Queryable State 和 Savepoint Processor API 兩個特性支持。
本文爲雲棲社區原創內容,未經容許不得轉載。