Flink State 有可能代替數據庫嗎?

有狀態的計算做爲容錯以及數據一致性的保證,是當今實時計算必不可少的特性之一,流行的實時計算引擎包括 Google Dataflow、Flink、Spark (Structure) Streaming、Kafka Streams 都分別提供對內置 State 的支持。State 的引入使得實時應用能夠不依賴外部數據庫來存儲元數據及中間數據,部分狀況下甚至能夠直接用 State 存儲結果數據,這讓業界不由思考: State 和 Database 是何種關係?有沒有可能用 State 來代替數據庫呢?數據庫

在這個課題上,Flink 社區是比較早就開始探索的。整體來講,Flink 社區的努力能夠分爲兩條線: 一是在做業運行時經過做業查詢接口訪問 State 的能力,即 QueryableState;二是經過 State 的離線 dump 文件(Savepoint)來離線查詢和修改 State 的能力,即即將引入的 Savepoint Processor API。網絡

QueryableState

在 2017 年發佈的 Flink 1.2 版本,Flink 引入了 QueryableState 的特性以容許用戶經過特定的 client 查詢做業 State 的內容 [1],這意味着 Flink 應用能夠在徹底不依賴 State 存儲介質之外的外部存儲的狀況下提供實時訪問計算結果的能力。數據結構

數據庫.jpg

只經過 Queryable State 提供實時數據訪問架構

然而,QueryableState 雖然設想上比較理想化,但因爲依賴底層架構的改動較多且功能也比較受限,它一直處於 Beta 版本並不能用於生產環境。針對這個問題,在前段時間騰訊的工程師楊華提出 QueryableState 的改進計劃 [2]。在郵件列表中,社區就 QueryableState 是否能夠用於代替數據庫做了討論並出現了不一樣的觀點。筆者結合我的看法將 State as Database 的主要優缺點整理以下。併發

優勢:運維

  • 更低的數據延遲。通常狀況下 Flink 應用的計算結果須要同步到外部的數據庫,好比定時觸發輸出窗口計算結果,而這種同步一般是定時的會帶來必定的延遲,致使計算是實時的而查詢卻不是實時的尷尬局面,而直接 State 則能夠避免這個問題。
  • 更強的數據一致性保證。根據外部存儲的特性不一樣,Flink Connector 或者自定義的 SinkFunction 提供的一致性保障也有所差異。好比對於不支持多行事務的 HBase,Flink 只能經過業務邏輯的冪等性來保障 Exactly-Once 投遞。相比之下 State 則有妥妥的 Exactly-Once 投遞保證。
  • 節省資源。由於減小了同步數據到外部存儲的須要,咱們能夠節省序列化和網絡傳輸的成本,另外固然還能夠節省數據庫成本。

缺點:函數

  • SLA 保障不足。數據庫技術已經很是成熟,在可用性、容錯性和運維上都不少的積累,在這點上 State 還至關因而處於原始人時期。另外從定位上來看,Flink 做業有版本迭代維護或者遇到錯誤自動重啓帶來的 down time,並不能達到數據庫在數據訪問上的高可用性。
  • 可能致使做業的不穩定。未通過考慮的 Ad-hoc Query 可能會要求掃描並返回誇張量級的數據,這會系統帶來很大的負荷,極可能影響做業的正常執行。即便是合理的 Query,在併發數較多的狀況下也可能影響做業的執行效率。
  • 存儲數據量不能太大。State 運行時主要存儲在 TaskManager 本地內存和磁盤,State 過大會形成 TaskManager OOM 或者磁盤空間不足。另外 State 大意味着 checkpoint 大,致使 checkpoint 可能會超時並顯著延長做業恢復時長。
  • 只支持最基礎的查詢。State 只能進行最簡單的數據結構查詢,不能像關係型數據庫同樣提供函數等計算能力,也不支持謂詞下推等優化技術。
  • 只能夠讀取,不能修改。State 在運行時只能夠被做業自己修改,若是實在要修改 State 只能經過下文的 Savepoint Processor API 來實現。

整體來講,目前 State 代替數據庫的缺點仍是遠多於其優勢,不過對於某些對數據可用性要求不高的做業來講,使用 State 做爲數據庫仍是徹底合理的。因爲定位上的不一樣,Flink State 在短期內很難看到能夠徹底替代數據庫的可能性,但在數據訪問特性上 State 往數據庫方向發展是無需質疑的。優化

Savepoint Processor API

Savepoint Processor API 是社區最近提出的一個新特性(見 FLIP-42 [3]),用於離線對 State 的 dump 文件 Savepoint 進行分析、修改或者直接根據數據構建出一個初始的 Savepoint。Savepoint Processor API 屬於 Flink State Evolution 的 State Management。若是說 QueryableState 是 DSL 的話,Flink State Evolution 就是 DML,而 Savepoint Processor API 就是 DML 中最爲重要的部分。spa

Savepoint Processor API 的前身是第三方的 Bravo 項目 [4],主要思路提供 Savepoint 和 DataSet 相互轉換的能力,典型應用是 Savepoint 讀取成 DataSet,在 DataSet 上進行修改,而後再寫爲一個新的 Savepoint。這適合用於如下的場景:blog

  • 分析做業 State 以研究其模式和規律
  • 排查問題或者審計
  • 爲新的應用構建的初始 State
  • 修改 Savepoint,好比:

    • 改變做業最大並行度
    • 進行巨大的 Schema 改動
    • 修正有問題的 State

Savepoint 做爲 State 的 dump 文件,經過 Savepoint Processor API 能夠暴露數據查詢和修改功能,相似於一個離線的數據庫,但 State 的概念和典型關係型數據的概念仍是有不少不一樣,FLIP-43 也對這些差別進行了類比和總結。

首先 Savepoint 是多個 operator 的 state 的物理存儲集合,不一樣 operator 的 state 是獨立的,這相似於數據庫下不一樣 namespace 之間的 table。咱們能夠獲得 Savepoint 對應數據庫,單個 operator 對應 Namespace。

Database

Savepoint

Namespace

Uid

Table

State

但就 table 而言,其在 Savepoint 裏對應的概念根據 State 類型的不一樣而有所差異。State 有 Operator State、Keyed State 和 Broadcast State 三種,其中 Operator State 和 Broadcast State 屬於 non-partitioned state,即沒有按 key 分區的 state,而相反地 Keyed State 則屬於 partitioned state。對於 non-partitioned state 來講,state 是一個 table,state 的每一個元素便是 table 裏的一行;而對於 partitioned state 來講,同一個 operator 下的全部 state 對應一個 table。這個 table 像是 HBase 同樣有個 row key,而後每一個具體的 state 對應 table 裏的一個 column。

舉個例子,假設有一個遊戲玩家得分和在線時長的數據流,咱們須要用 Keyed State 來記錄玩家所在組的分數和遊戲時長,用 Operator State 記錄玩家的總得分和總時長。

在一段時間內數據流的輸入以下:

user_id

user_name

user_group

score

1001

Paul

A

5,000

1002

Charlotte

A

3,600

1003

Kate

C

2,000

1004

Robert

B

3,900

user_id

user_name

user_group

time

1001

Paul

A

1,800

1002

Charlotte

A

1,200

1003

Kate

C

600

1004

Robert

B

2,000

用 Keyed State ,咱們分別註冊 group_score 和 group_time 兩個 MapState 表示組總得分和組總時長,並根據 user_group keyby 數據流以後將兩個指標的累積值更新到 State 裏,獲得的表以下:

user_group

group_score

group_time

A

8,600

3,000

C

2,00

600

B

3,900

2,000

相對地,假如用 Operator State 來記錄總得分和總時長(並行度設爲 1),咱們註冊 total_score 和 total_time 兩個 State,獲得的表有兩個:

total_score | 
------- | 
14,500 |

total_time

5,600

至此 Savepoint 和 Database 的對應關係應該是比較清晰明瞭的。而對於 Savepoint 來講還有不一樣的 StateBackend 來決定 State 具體如何持續化,這顯然對應的是數據庫的存儲引擎。在 MySQL 中,咱們能夠經過簡單的一行命令 ALTER TABLE xxx ENGINE = InnoDB; 來改變存儲引擎,在背後 MySQL 會自動完成繁瑣的格式轉換工做。而對於 Savepoint 來講,因爲 StateBackend 各自的存儲格式不兼容,目前尚不能方便地切換 StateBackend。爲此,社區在不久前建立 FLIP-41 [5] 來進一步完善 Savepoint 的可操做性。

總結

State as Database 是實時計算髮展的大趨勢,它並非要代替數據庫的使用,而是借鑑數據庫領域的經驗拓展 State 接口使其操做方式更接近咱們熟悉的數據庫。對於 Flink 而言,State 的外部使用能夠分爲在線的實時訪問和離線的訪問和修改,分別將由 Queryable State 和 Savepoint Processor API 兩個特性支持。


原文連接

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索