Apache-Flink深度解析-State

轉載自:https://dwz.cn/xrMCqbk5

本系列文章來自雲棲社區,對Flink的解析兼具廣度和深度,適合對Flink有必定研究的同窗學習。

摘要:
實際問題 在流計算場景中,數據會源源不斷的流入Apache Flink系統,每條數據進入Apache Flink系統都會觸發計算。若是咱們想進行一個Count聚合計算,那麼每次觸發計算是將歷史上全部流入的數據從新新計算一次,仍是每次計算都是在上一次計算結果之上進行增量計算呢?答案是確定的,Apache Flink是基於上一次的計算結果進行增量計算的。


實際問題

在流計算場景中,數據會源源不斷的流入Apache Flink系統,每條數據進入Apache Flink系統都會觸發計算。若是咱們想進行一個Count聚合計算,那麼每次觸發計算是將歷史上全部流入的數據從新新計算一次,仍是每次計算都是在上一次計算結果之上進行增量計算呢?答案是確定的,Apache Flink是基於上一次的計算結果進行增量計算的。那麼問題來了: "上一次的計算結果保存在哪裏,保存在內存能夠嗎?",答案是否認的,若是保存在內存,在因爲網絡,硬件等緣由形成某個計算節點失敗的狀況下,上一次計算結果會丟失,在節點恢復的時候,就須要將歷史上全部數據(可能十幾天,上百天的數據)從新計算一次,因此爲了不這種災難性的問題發生,Apache Flink 會利用State存儲計算結果。本篇將會爲你們介紹Apache Flink State的相關內容。算法

什麼是State

這個問題彷佛有些"弱智"?無論問題的答案是否顯而易見,但我仍是想簡單說一下在Apache Flink裏面什麼是State?State是指流計算過程當中計算節點的中間計算結果或元數據屬性,好比 在aggregation過程當中要在state中記錄中間聚合結果,好比 Apache Kafka 做爲數據源時候,咱們也要記錄已經讀取記錄的offset,這些State數據在計算過程當中會進行持久化(插入或更新)。因此Apache Flink中的State就是與時間相關的,Apache Flink任務的內部數據(計算數據和元數據屬性)的快照。
數組

爲何須要State

與批計算相比,State是流計算特有的,批計算沒有failover機制,要麼成功,要麼從新計算。流計算在 大多數場景 下是增量計算,數據逐條處理(大多數場景),每次計算是在上一次計算結果之上進行處理的,這樣的機制勢必要將上一次的計算結果進行存儲(生產模式要持久化),另外因爲 機器,網絡,髒數據等緣由致使的程序錯誤,在重啓job時候須要從成功的檢查點(checkpoint,後面篇章會專門介紹)進行state的恢復。增量計算,Failover這些機制都須要state的支撐。
bash

State 實現

Apache Flink內部有四種state的存儲實現,具體以下:
網絡


  • 基於內存的HeapStateBackend - 在debug模式使用,不 建議在生產模式下應用;數據結構

  • 基於HDFS的FsStateBackend - 分佈式文件持久化,每次讀寫都產生網絡IO,總體性能不佳;併發

  • 基於RocksDB的RocksDBStateBackend - 本地文件+異步HDFS持久化;異步

  • 還有一個是基於Niagara(Alibaba內部實現)NiagaraStateBackend - 分佈式持久化- 在Alibaba生產環境應用;分佈式

State 持久化邏輯

Apache Flink版本選擇用RocksDB+HDFS的方式進行State的存儲,State存儲分兩個階段,首先本地存儲到RocksDB,而後異步的同步到遠程的HDFS。 這樣而設計既消除了HeapStateBackend的侷限(內存大小,機器壞掉丟失等),也減小了純分佈式存儲的網絡IO開銷。

性能

State 分類

Apache Flink 內部按照算子和數據分組角度將State劃分爲以下兩類:
學習


  • KeyedState - 這裏面的key是咱們在SQL語句中對應的GroupBy/PartitioneBy裏面的字段,key的值就是groupby/PartitionBy字段組成的Row的字節數組,每個key都有一個屬於本身的State,key與key之間的State是不可見的;

  • OperatorState - Apache Flink內部的Source Connector的實現中就會用OperatorState來記錄source數據讀取的offset。

State 擴容從新分配

Apache Flink是一個大規模並行分佈式系統,容許大規模的有狀態流處理。 爲了可伸縮性,Apache Flink做業在邏輯上被分解成operator graph,而且每一個operator的執行被物理地分解成多個並行運算符實例。 從概念上講,Apache Flink中的每一個並行運算符實例都是一個獨立的任務,能夠在本身的機器上調度到網絡鏈接的其餘機器運行。


Apache Flink的DAG圖中只有邊相連的節點有網絡通訊,也就是整個DAG在垂直方向有網絡IO,在水平方向以下圖的stateful節點之間沒有網絡通訊,這種模型也保證了每一個operator實例維護一份本身的state,而且保存在本地磁盤(遠程異步同步)。經過這種設計,任務的全部狀態數據都是本地的,而且狀態訪問不須要任務之間的網絡通訊。 避免這種流量對於像Apache Flink這樣的大規模並行分佈式系統的可擴展性相當重要。


如上咱們知道Apache Flink中State有OperatorState和KeyedState,那麼在進行擴容時候(增長併發)State如何分配呢?好比:外部Source有5個partition,在Apache Flink上面由Srouce的1個併發擴容到2個併發,中間Stateful Operation 節點由2個併發並擴容的3個併發,以下圖所示:

在Apache Flink中對不一樣類型的State有不一樣的擴容方法,接下來咱們分別介紹。

OperatorState對擴容的處理

咱們選取Apache Flink中某個具體Connector實現實例進行介紹,以MetaQ爲例,MetaQ以topic方式訂閱數據,每一個topic會有N>0個分區,以上圖爲例,加上咱們訂閱的MetaQ的topic有5個分區,那麼當咱們source由1個併發調整爲2個併發時候,State是怎麼恢復的呢?
state 恢復的方式與Source中OperatorState的存儲結構有必然關係,咱們先看MetaQSource的實現是如何存儲State的。首先MetaQSource 實現了ListCheckpointed<T extends Serializable>,其中的T是Tuple2<InputSplit,Long>,咱們在看ListCheckpointed接口的內部定義以下:


複製代碼
public interface ListCheckpointed<T extends Serializable>; {
List<T> snapshotState(long var1, long var3) throws Exception;

void restoreState(List&lt;T&gt; var1) throws Exception;
}複製代碼

咱們發現 snapshotState方法的返回值是一個List<T>,T是Tuple2<InputSplit,Long>,也就是snapshotState方法返回List<Tuple2<InputSplit,Long>>,這個類型說明state的存儲是一個包含partiton和offset信息的列表,InputSplit表明一個分區,Long表明當前partition讀取的offset。InputSplit有一個方法以下:

public interface InputSplit extends Serializable {
int getSplitNumber();
}複製代碼


也就是說,InputSplit咱們能夠理解爲是一個Partition索引,有了這個數據結構咱們在看看上面圖所示的case是如何工做的?當Source的並行度是1的時候,全部打partition數據都在同一個線程中讀取,全部partition的state也在同一個state中維護,State存儲信息格式以下:

若是咱們如今將併發調整爲2,那麼咱們5個分區的State將會在2個獨立的任務(線程)中進行維護,在內部實現中咱們有以下算法進行分配每一個Task所處理和維護partition的State信息,以下:

List<Integer> assignedPartitions = new LinkedList<>();
for (int i = 0; i < partitions; i++) {
if (i % consumerCount == consumerIndex) {
assignedPartitions.add(i);
}
}複製代碼


這個求mod的算法,決定了每一個併發所處理和維護partition的State信息,針對咱們當前的case具體的存儲狀況以下:

那麼到如今咱們發現上面擴容後State得以很好的分配得益於OperatorState採用了List<T>的數據結構的設計。另外你們注意一個問題,相信你們已經發現上面分配partition的算法有一個限制,那就是Source的擴容(併發數)是否能夠超過Source物理存儲的partition數量呢?答案是否認的,不能。目前Apache Flink的作法是提早報錯,即便不報錯也是資源的浪費,由於超過partition數量的併發永遠分配不到待管理的partition。

KeyedState對擴容的處理

對於KeyedState最容易想到的是hash(key) mod parallelism(operator) 方式分配state,就和OperatorState同樣,這種分配方式大多數狀況是恢復的state不是本地已有的state,須要一次網絡拷貝,這種效率比較低,OperatorState採用這種簡單的方式進行處理是由於OperatorState的state通常都比較小,網絡拉取的成本很小,對於KeyedState每每很大,咱們會有更好的選擇,在Apache Flink中採用的是Key-Groups方式進行分配。

什麼是Key-Groups

Key-Groups 是Apache Flink中對keyed state按照key進行分組的方式,每一個key-group中會包含N>0個key,一個key-group是State分配的原子單位。在Apache Flink中關於Key-Group的對象是 KeyGroupRange, 以下:

public class KeyGroupRange implements KeyGroupsList, Serializable {
...
...
private final int startKeyGroup;
private final int endKeyGroup;
...
...
}複製代碼

KeyGroupRange兩個重要的屬性就是 startKeyGroup和endKeyGroup,定義了startKeyGroup和endKeyGroup屬性後Operator上面的Key-Group的個數也就肯定了。

什麼決定Key-Groups的個數

key-group的數量在job啓動前必須是肯定的且運行中不能改變。因爲key-group是state分配的原子單位,而每一個operator並行實例至少包含一個key-group,所以operator的最大並行度不能超過設定的key-group的個數,那麼在Apache Flink的內部實現上key-group的數量就是最大並行度的值。

GroupRange.of(0, maxParallelism)如何決定key屬於哪一個Key-Group
肯定好GroupRange以後,如何決定每一個Key屬於哪一個Key-Group呢?咱們採起的是取mod的方式,在KeyGroupRangeAssignment中的assignToKeyGroup方法會將key劃分到指定的key-group中,以下:



如上實現咱們瞭解到分配Key到指定的key-group的邏輯是利用key的hashCode和maxParallelism進行取餘操做來分配的。以下圖當parallelism=2,maxParallelism=10的狀況下流上key與key-group的對應關係以下圖所示:

如上圖key(a)的hashCode是97,與最大併發10取餘後是7,被分配到了KG-7中,流上每一個event都會分配到KG-0至KG-9其中一個Key-Group中。
每一個Operator實例如何獲取Key-Groups
瞭解了Key-Groups概念和如何分配每一個Key到指定的Key-Groups以後,咱們看看如何計算每一個Operator實例所處理的Key-Groups。 在KeyGroupRangeAssignment的computeKeyGroupRangeForOperatorIndex方法描述了分配算法:

public static KeyGroupRange computeKeyGroupRangeForOperatorIndex(
int maxParallelism,
int parallelism,
int operatorIndex) {
GroupRange splitRange = GroupRange.of(0, maxParallelism).getSplitRange(parallelism, operatorIndex);
int startGroup = splitRange.getStartGroup();
int endGroup = splitRange.getEndGroup();
return new KeyGroupRange(startGroup, endGroup - 1);
}

public GroupRange getSplitRange(int numSplits, int splitIndex) {
...
final int numGroupsPerSplit = getNumGroups() / numSplits;
final int numFatSplits = getNumGroups() % numSplits;

int startGroupForThisSplit;
int endGroupForThisSplit;
if (splitIndex &lt; numFatSplits) {
startGroupForThisSplit = getStartGroup() + splitIndex * (numGroupsPerSplit + 1);
endGroupForThisSplit = startGroupForThisSplit + numGroupsPerSplit + 1;
} else {
startGroupForThisSplit = getStartGroup() + splitIndex * numGroupsPerSplit + numFatSplits;
endGroupForThisSplit = startGroupForThisSplit + numGroupsPerSplit;
}
if (startGroupForThisSplit &gt;= endGroupForThisSplit) {
return GroupRange.emptyGroupRange();
} else {
return new GroupRange(startGroupForThisSplit, endGroupForThisSplit);
}
}複製代碼


上面代碼的核心邏輯是先計算每一個Operator實例至少分配的Key-Group個數,將不能整除的部分N個,平均分給前N個實例。最終每一個Operator實例管理的Key-Groups會在GroupRange中表示,本質是一個區間值;下面咱們就上圖的case,說明一下如何進行分配以及擴容後如何從新分配。
假設上面的Stateful Operation節點的最大並行度maxParallelism的值是10,也就是咱們一共有10個Key-Group,當咱們併發是2的時候和併發是3的時候分配的狀況以下圖:

如上算法咱們發如今進行擴容時候,大部分state仍是落到本地的,如Task0只有KG-4被分出去,其餘的仍是保持在本地。同時咱們也發現,一個job若是修改了maxParallelism的值那麼會直接影響到Key-Groups的數量和key的分配,也會打亂全部的Key-Group的分配,目前在Apache Flink系統中統一將maxParallelism的默認值調整到4096,最大程度的避免沒法擴容的狀況發生。


本篇簡單介紹了Apache Flink中State的概念,並重點介紹了OperatorState和KeyedState在擴容時候的處理方式。Apache Flink State是支撐Apache Flink中failover,增量計算,Window等重要機制和功能的核心設施。後續介紹failover,增量計算,Window等相關篇章中也會涉及State的利用,當涉及到本篇沒有覆蓋的內容時候再補充介紹。

相關文章
相關標籤/搜索