經過本文你能 get 到如下幾點:node
Flink 支持多種 StateBackend,當狀態比較大時目前只有 RocksDBStateBackend 可供選擇。算法
RocksDB 是基於 LSM 樹原理實現的 KV 數據庫,LSM 樹讀放大問題比較嚴重,所以對磁盤性能要求比較高,強烈建議生產環境使用 SSD 做爲 RocksDB 的存儲介質。可是有些集羣可能並無配置 SSD,僅僅是普通的機械硬盤,當 Flink 任務比較大,且對狀態訪問比較頻繁時,機械硬盤的磁盤 IO 可能成爲性能瓶頸。在這種狀況下,該如何解決此瓶頸呢?數據庫
RocksDB 使用內存加磁盤的方式存儲數據,當狀態比較大時,磁盤佔用空間會比較大。若是對 RocksDB 有頻繁的讀取請求,那麼磁盤 IO 會成爲 Flink 任務瓶頸。後端
強烈建議在 flink-conf.yaml 中配置 state.backend.rocksdb.localdir 參數來指定 RocksDB 在磁盤中的存儲目錄。當一個 TaskManager 包含 3 個 slot 時,那麼單個服務器上的三個並行度都對磁盤形成頻繁讀寫,從而致使三個並行度的之間相互爭搶同一個磁盤 io,這樣一定致使三個並行度的吞吐量都會降低。api
慶幸的是 Flink 的 state.backend.rocksdb.localdir 參數能夠指定多個目錄,通常大數據服務器都會掛載不少塊硬盤,咱們指望同一個 TaskManager 的三個 slot 使用不一樣的硬盤從而減小資源競爭。具體參數配置以下所示:服務器
state.backend.rocksdb.localdir: /data1/flink/rocksdb,/data2/flink/rocksdb,/data3/flink/rocksdb,/data4/flink/rocksdb,/data5/flink/rocksdb,/data6/flink/rocksdb,/data7/flink/rocksdb,/data8/flink/rocksdb,/data9/flink/rocksdb,/data10/flink/rocksdb,/data11/flink/rocksdb,/data12/flink/rocksdb
注意:務必將目錄配置到多塊不一樣的磁盤上,不要配置單塊磁盤的多個目錄,這裏配置多個目錄是爲了讓多塊磁盤來分擔壓力。負載均衡
以下圖所示是筆者測試過程當中磁盤的 IO 使用率,能夠看出三個大狀態算子的並行度分別對應了三塊磁盤,這三塊磁盤的 IO 平均使用率都保持在 45% 左右,IO 最高使用率幾乎都是 100%,而其餘磁盤的 IO 平均使用率爲 10% 左右,相對低不少。因而可知使用 RocksDB 作爲狀態後端且有大狀態的頻繁讀寫操做時,對磁盤 IO 性能消耗確實比較大。dom
上述屬於理想狀況,當設置多個 RocksDB 本地磁盤目錄時,Flink 會隨機選擇要使用的目錄,因此就可能存在三個並行度共用同一目錄的狀況。分佈式
以下圖所示,其中兩個並行度共用了 sdb 磁盤,一個並行度使用 sdj 磁盤。能夠看到 sdb 磁盤的 IO 平均使用率已經達到了 91.6%,此時 sdb 的磁盤 IO 確定會成爲整個 Flink 任務的瓶頸,會致使 sdb 磁盤對應的兩個並行度吞吐量大大下降,從而使得整個 Flink 任務吞吐量下降。性能
若是服務器掛載的硬盤數量較多,通常不會出現該狀況,可是若是任務重啓後吞吐量較低,能夠檢查是否發生了多個並行度共用同一塊磁盤的狀況。
Flink 可能會出現多個並行度共用同一塊磁盤的問題,那該如何解決呢?
從現象來看,爲 RocksDB 分配了 12 塊磁盤,僅僅有 3 個並行度須要使用 3 塊磁盤,可是有必定概率 2 個並行度共用同一塊磁盤,甚至可能會有很小的概率 3 個並行度共用同一塊磁盤。這樣咱們的 Flink 任務很容易由於磁盤 IO 成爲瓶頸。
上述分配磁盤的策略,實際上就是業界的負載均衡策略。通用的負載均衡策略有 hash、隨機以及輪循等策略。
任務自己通過某種 hash 策略後,將壓力分擔到多個 Worker 上。對應到上述場景,就是將多個 slot 使用的 RocksDB 目錄壓力分擔到多塊磁盤上。可是 hash 可能會有衝突的狀況,hash 衝突表示多個不一樣的 Flink 並行度,通過 hash 後獲得的 hashCode 同樣,或者 hashCode 對硬盤數量求餘後被分配到同一塊硬盤。
隨機策略是每來一個 Flink 任務,生成一個隨機數,將壓力隨機分配到某個 Worker 上,也就是將壓力隨機分配到某塊磁盤。可是隨機數也會存在衝突的狀況。
輪循策略比較容易理解,多個 Worker 輪流接收數據便可,Flink 任務第一次申請 RocksDB 目錄時使用目錄1,第二次申請目錄時使用目錄2,依次申請便可。該策略是分配任務數最均勻的策略,若是使用該策略會保證全部硬盤分配到的任務數相差最大爲 1。
根據 Worker 的響應時間來分配任務,響應時間短說明負載能力強,應該多分配一些任務。對應到上述場景就是檢測各個磁盤的 IO 使用率,使用率低表示磁盤 IO 比較空閒,應該多分配任務。
爲每一個 Worker 分配不一樣的權重值,權重值高的任務分配更多的任務,通常分配的任務數與權重值成正比。
例如 Worker0 權重值爲 2,Worker1 權重爲 1,則分配任務時 Worker0 分配的任務數儘可能分配成 Worker1 任務數的兩倍。該策略可能並不適合當前業務場景,通常相同服務器上每一個硬盤的負載能力相差不會很大,除非 RocksDB 的 local dir 既包含 SSD 也包含 HDD。
筆者線上使用 Flink 1.8.1 版本,出現了有些硬盤分配了多個並行度,有些硬盤一個並行度都沒有分配。能夠大膽的猜想一下,源碼中使用 hash 或者 random 的機率比較高,由於大多數狀況下,每一個硬盤只分到一個任務,小概率分配多個任務(要解決的就是這個小概率分配多個任務的問題)。
若是使用輪循策略,確定會保證每一個硬盤都分配一個並行度之後,纔會出現單硬盤分配兩個任務的狀況。並且輪循策略能夠保證分配的硬盤是連續的。
直接看 RocksDBStateBackend 類的部分源碼:
/** Base paths for RocksDB directory, as initialized. 這裏就是咱們上述設置的 12 個 rocksdb local dir */ private transient File[] initializedDbBasePaths; /** The index of the next directory to be used from {@link #initializedDbBasePaths}. 下一次要使用 dir 的 index,若是 nextDirectory = 2, 則使用 initializedDbBasePaths 中下標爲 2 的那個目錄作爲 rocksdb 的存儲目錄 */ private transient int nextDirectory; // lazyInitializeForJob 方法中, 經過這一行代碼決定下一次要使用 dir 的 index, // 根據 initializedDbBasePaths.length 生成隨機數, // 若是 initializedDbBasePaths.length = 12,生成隨機數的範圍爲 0-11 nextDirectory = new Random().nextInt(initializedDbBasePaths.length);
分析完簡單的源碼後,咱們知道了源碼中使用了 random 的策略來分配 dir,跟咱們所看到的現象可以匹配。隨機分配有小几率會出現衝突。(寫這篇文章時,Flink 最新的 master 分支代碼仍然是上述策略,還沒有作任何改動)
random 和 hash 策略在任務數量比較大時,能夠保證每一個 Worker 承擔的任務量基本同樣,可是若是任務量比較小,例如將 20 個任務經過隨機算法分配給 10 個 Worker 時,就會出現有的 Worker 分配不到任務,有的 Worker 可能分配到 3 或 4 個任務。因此 random 和 hash 策略不能解決 rocksdb 分配磁盤不均的痛點,那輪循策略和最低負載策略呢?
輪循策略能夠解決上述問題,解決方式以下:
// 在 RocksDBStateBackend 類中定義了 private static final AtomicInteger DIR_INDEX = new AtomicInteger(0); // nextDirectory 的分配策略變成了以下代碼,每次將 DIR_INDEX + 1,而後對 dir 的總數求餘 nextDirectory = DIR_INDEX.getAndIncrement() % initializedDbBasePaths.length;
經過上述便可實現輪循策略,申請磁盤時,從 0 號磁盤開始申請,每次使用下一塊磁盤便可。
Java 中靜態變量屬於 JVM 級別的,每一個 TaskManager 屬於單獨的 JVM,因此 TaskManager 內部保證了輪循策略。若是同一臺服務器上運行多個 TaskManager,那麼多個 TaskManager 都會從 index 爲 0 的磁盤開始使用,因此致使 index 較小的磁盤會被常用,而 index 較大的磁盤可能常常不會被使用到。
DIR_INDEX 初始化時,不要每次初始化爲 0,能夠生成一個隨機數,這樣能夠保證不會每次使用 index 較小的磁盤,實現代碼以下所示:
// 在 RocksDBStateBackend 類中定義了 private static final AtomicInteger DIR_INDEX = new AtomicInteger(new Random().nextInt(100));
可是上述方案不能徹底解決磁盤衝突的問題,同一臺機器上 12 塊磁盤,TaskManager0 使用 index 爲 0、一、2 的三塊磁盤,TaskManager1 可能使用 index 爲 一、二、3 的三塊磁盤。結果就是 TaskManager 內部來看,實現了輪循策略保證負載均衡,可是全局來看,負載並不均衡。
爲了全局負載均衡,因此多個 TaskManager 之間必須通訊才能作到絕對的負載均衡,能夠藉助第三方的存儲進行通訊,例如在 Zookeeper 中,爲每一個服務器生成一個 znode,znode 命名能夠是 host 或者 ip。使用 Curator 的 DistributedAtomicInteger 來維護 DIR_INDEX 變量,存儲在當前服務器對應的 znode 中,不管是哪一個 TaskManager 申請磁盤,均可以使用 DistributedAtomicInteger 將當前服務器對應的 DIR_INDEX + 1,從而就能夠實現全局的輪循策略。
DistributedAtomicInteger 的 increment 的思路:先使用 Zookeeper 的 withVersion api 進行 +1 操做(也就是 Zookeeper 提供的 CAS api),若是成功則成功;若是失敗,則使用分佈式互斥鎖進行 +1 操做。
基於上述描述,咱們獲得兩種策略來實現輪循,AtomicInteger 只能保證 TaskManager 內部的輪循,不能保證全局輪循。若是要基於全局輪循,須要藉助 Zookeeper 或其餘組件來實現。若是對輪循策略要求比較苛刻,可使用基於 Zookeeper 的輪循策略,若是不想依賴外部組件則只能使用 AtomicInteger 來實現。
思想就是 TaskManager 啓動時,監測全部 rocksdb local dir 對應的磁盤最近 1 分鐘或 5 分鐘的 IO 平均使用率,篩掉 IO 使用率較高的磁盤,優先選擇 IO 平均使用率較低的磁盤,同時在 IO 平均使用率較低的磁盤中,依然要使用輪循策略來實現。
啓動階段不採集硬盤的負載壓力,使用以前的 DistributedAtomicInteger 基本就能夠保證每一個硬盤負載均衡。可是任務啓動後一段時間,若是由於 Flink 任務致使某個磁盤 IO 的平均使用率相對其餘磁盤而言很是高。咱們能夠選擇遷移高負載硬盤的數據到低負載硬盤。
基於此分析,最低負載策略比較麻煩,筆者目前還沒有實現此策略。
本文分析了目前 Flink 使用大狀態時遇到的問題,並給出了多種解決方案。
目前筆者已經實現了隨機、TaskManager 內輪循、基於 Zookeeper 的全局輪循三種策略,並應用到生產環境,能夠直接在 flink-conf.yaml 文件中配置策略。目前來看基於 Zookeeper 的全局輪循策略很是好。以後儘可能會回饋給社區。