Flink SQL 核心解密 —— 提高吞吐的利器 MicroBatch

以前咱們在 Flink SQL 中支持了 MiniBatch, 在支持高吞吐場景發揮了重要做用。今年咱們在 Flink SQL 性能優化中一項重要的改進就是升級了微批模型,咱們稱之爲 MicroBatch,也叫 MiniBatch2.0。前端

在設計和實現 Flink 的流計算算子時,咱們通常會把「面向狀態編程」做爲第一準則。由於在流計算中,爲了保證狀態(State)的一致性,須要將狀態數據存儲在狀態後端(StateBackend),由框架來作分佈式快照。而目前主要使用的RocksDB,Niagara狀態後端都會在每次read和write操做時發生序列化和反序列化操做,甚至是磁盤的 I/O 操做。所以狀態的相關操做一般都會成爲整個任務的性能瓶頸,狀態的數據結構設計以及對狀態的每一次訪問都須要特別注意。數據庫

微批的核心思想就是緩存一小批數據,在訪問狀態狀態時,多個同 key 的數據就只須要發生一次狀態的操做。當批次內數據的 key 重複率較大時,能顯著下降對狀態的訪問頻次,從而大幅提升吞吐。MicroBatch 和 MiniBatch 的核心機制是同樣的,就是攢批,而後觸發計算。只是攢批策略不太同樣。咱們先講解觸發計算時是如何節省狀態訪問頻次的。編程

微批計算

MicroBatch 的一個典型應用場景就是 Group Aggregate。例如簡單的求和例子:後端

SELECT key, SUM(value) FROM T GROUP BY key

image

如上圖所示,當未開啓 MicroBatch 時,Aggregate 的處理模式是每來一條數據,查詢一次狀態,進行聚合計算,而後寫入一次狀態。當有 N 條數據時,須要操做 2*N 次狀態。緩存

當開啓 MicroBatch 時,對於緩存下來的 N 條數據一塊兒觸發,同 key 的數據只會讀寫狀態一次。例如上圖緩存的 4 條 A 的記錄,只會對狀態讀寫各一次。因此當數據的 key 的重複率越大,攢批的大小越大,那麼對狀態的訪問會越少,獲得的吞吐量越高。性能優化

攢批策略

攢批策略通常分紅兩個維度,一個是延時,一個是內存。延時即控制多久攢一次批,這也是用來權衡吞吐和延遲的重要參數。內存即爲了不瞬間 TPS 太大致使內存沒法存下緩存的數據,避免形成 Full GC 和 OOM。下面會分別介紹舊版 MiniBatch 和 新版 MicroBatch 在這兩個維度上的區別。網絡

MiniBatch 攢批策略

MiniBatch 攢批策略的延時維度是經過在每一個聚合節點註冊單獨的定時器來實現,時間分配策略採用簡單的均分。好比有4個 aggregate 節點,用戶配置 10s 的 MiniBatch,那麼每一個節點會分配2.5s,例以下圖所示:數據結構

可是這種策略有如下幾個問題:框架

  1. 用戶能容忍 10s 的延時,可是真正用來攢批的只有2.5秒,攢批效率低。拓撲越複雜,差別越明顯。
  2. 因爲上下游的定時器的觸發是純異步的,可能致使上游觸發微批的時候,下游也正好觸發微批,而處理微批時會一段時間不消費網絡數據,致使上游很容易被反壓。
  3. 計時器會引入額外的線程,增長了線程調度和搶鎖上的開銷。

MiniBatch 攢批策略在內存維度是經過統計輸入條數,當輸入的條數超過用戶配置的 blink.miniBatch.size 時,就會觸發批次以防止 OOM。可是 size 參數並非很好評估,一方面當 size 配的過大,可能會失去保護內存的做用;而當 size 配的過小,又會致使攢批效率下降。異步

MicroBatch 攢批策略

MicroBatch 的提出就是爲了解決 MiniBatch 遇到的上述問題。MicroBatch 引入了 watermark 來控制聚合節點的定時觸發功能,用 watermark 做爲特殊事件插入數據流中將數據流切分紅相等時間間隔的一個個批次。實現原理以下所示:

MicroBatch 會在數據源以後插入一個 MicroBatchAssigner 的節點,用來定時發送 watermark,其間隔是用戶配置的延時參數,如10s。那麼每隔10s,無論數據源有沒有數據,都會發一個當前系統時間戳的 watermark 下去。一個節點的當前 watermark 取自全部 channel 的最小 watermark 值,因此當聚合節點的 watermark 值前進時,也就意味着攢齊了上游的一個批次,咱們就能夠觸發這個批次了。處理完這個批次後,須要將當前 watermark 廣播給下游全部 task。當下遊 task 收齊上游 watermark 時,也會觸發批次。這樣批次的觸發會從上游到下游逐級觸發。

這裏將 watermark 做爲劃分批次的特殊事件是頗有意思的一點。Watermark 是一個很是強大的工具,通常咱們用來衡量業務時間的進度,解決業務時間亂序的問題。但其實換一個維度,它也能夠用來衡量全局系統時間的進度,從而很是巧妙地解決數據劃批的問題。

所以與 MiniBatch 策略相比,MicroBatch 具備如下優勢:

  1. 相同延時下,MicroBatch 的攢批效率更高,能攢更多的數據。
  2. 因爲 MicroBatch 的批次觸發是靠事件的,當上遊觸發時,下游不會同時觸發,因此不像 MiniBatch 那麼容易引發反壓。
  3. 解決數據抖動問題(下一小節分析)

咱們利用一個 DAU 做業進行了性能測試對比,在相同的 allowLatency(6秒)配置的狀況下,MicroBatch 能獲得更高的吞吐,並且還能獲得與 MiniBatch 相同的端到端延遲!

另外,仍然是上述的性能測試對比,能夠發現運行穩定後 MicroBatch 的隊列使用率平均值在 50% 如下,而 MiniBatch 基本是一直處於隊列滿載下。說明 MicroBatch 比 MiniBatch 更加穩定,更不容易引發反壓。

MicroBatch 在內存維度目前仍然與 MiniBatch 同樣,使用 size 參數來控制條數。可是未來會基於內存管理,將緩存的數據存於管理好的內存塊中(BytesHashMap),從而減小 Java 對象的空間成本,減小 GC 的壓力和防止 OOM。

防止數據抖動

所謂數據抖動問題是指,兩層 AGG 時,第一層 AGG 發出的更新消息會拆成兩條獨立的消息被下游消費,分別是retract 消息和 accumulate 消息。而當第二層 AGG 消費這兩條消息時也會發出兩條消息。從前端看到就是數據會有抖動的現象。例以下面的例子,統計買家數,這裏作了兩層打散,第一層先作 UV 統計,第二級作SUM。

SELECT day, SUM(cnt) total
FROM (
  SELECT day, MOD(buy_id, 1024), COUNT(DISTINCT buy_id) as cnt
  FROM T GROUP BY day, MOD(buy_id, 1024))
GROUP BY day

當第一層count distinct的結果從100上升到101時,它會發出 -100, +101 的兩條消息。當第二層的 SUM 會依次收到這兩條消息並處理,假設此時 SUM 值是 900,那麼在處理 -100 時,會先發出 800 的結果值,而後處理 +101 時,再發出 901 的結果值。從用戶端的感覺就是買家數從 900 降到了 800 又上升到了 901,咱們稱之爲數據抖動。而理論上買家數只應該只增不減的,因此咱們也一直在思考如何解決這個問題。

數據抖動的本質緣由是 retract 和 accumulate 消息是一個事務中的兩個操做,可是這兩個操做的中間結果被用戶看到了,也就是傳統數據庫 ACID 中的隔離性(I) 中最弱的 READ UNCOMMITTED 的事務保障。要從根本上解決這個問題的思路是,如何原子地處理 retract & accumulate 的消息。如上文所述的 MicroBatch 策略,藉助 watermark 劃批,watermark 不會插在 retract & accumulate 中間,那麼 watermark 就是事務的自然分界。按照 watermark 來處理批次能夠達到原子處理 retract & accumulate 的目的。從而解決抖動問題。

適用場景與使用方式

MicroBatch 是使用必定的延遲來換取大量吞吐的策略,若是用戶有超低延遲的要求的話,不建議開啓微批處理。MicroBatch 目前對於無限流的聚合、Join 都有顯著的性能提高,因此建議開啓。若是遇到了上述的數據抖動問題,也建議開啓。

MicroBatch默認關閉,開啓方式:

# 攢批的間隔時間,使用 microbatch 策略時須要加上該配置,且建議和 blink.miniBatch.allowLatencyMs 保持一致
blink.microBatch.allowLatencyMs=5000
# 使用 microbatch 時須要保留如下兩個 minibatch 配置
blink.miniBatch.allowLatencyMs=5000
# 防止OOM,每一個批次最多緩存多少條數據
blink.miniBatch.size=20000

後續優化

MicroBatch 目前只支持無限流的聚合和 Join,暫不支持 Window Aggregate。因此後續 Window Aggregate 會重點支持 MicroBatch 策略,以提高吞吐性能。另外一方面,MicroBatch 的內存會考慮使用二進制的數據結構管理起來,提高內存的利用率和減輕 GC 的影響。



本文做者:jark

閱讀原文

本文爲雲棲社區原創內容,未經容許不得轉載。

相關文章
相關標籤/搜索