Flink不一樣於其餘實時計算的框架之處是它能夠提供針對不一樣的狀態進行編程和計算。本篇文章的主要思路以下,你們能夠選擇性閱讀。html
1. Flink的狀態分類及不一樣點。java
2. Flink針對不一樣的狀態進行編程。node
3. 檢查點機制和配置。數據庫
4. 狀態的存儲。apache
Flink有兩種不一樣的狀態分類,一種是Keyed State(鍵狀態),一種是Operator State(算子狀態)。編程
主要是針對KeyedStream中使用,當使用keyBy方法的時候進行計算。 咱們都知道在計算的過程當中就是將Flink按照<並行operator, key> 進行計算,每一個key又歸屬於單個Operator,因此咱們能夠簡單的理解爲<operator, key>。也就是說首先按Operator分配到不一樣的實例,而後在不一樣的實例中,相同的Key分配到相同的組中,而後這些狀態就能夠在相同的組中進行獲取和計算。後端
主要針對不一樣的算子的狀態計算。按照不一樣的算子如Map, FlatMap,Reduce等算子去分配不一樣的實例羣。像Kafka Connector的例子就很好的應用了這個功能,根據不一樣的topic去讀取不一樣的狀態,好比計算獲取到topic的paritition分區和 offset偏移量。 每一個算子實例會維護着這個topic的partition及offset的Map狀態,這個例子就是很好的使用了Opertator的state。若是Operator並行度發生改變了的話,那麼狀態也會相應的分配好對應的狀態。api
這兩種狀態又分爲 Managed State (可管理狀態)和 Raw State (原生狀態)緩存
全部的流數據功能均可以使用Managed State,這個也是Flink編程所推薦的。由於要使用Raw State的話比較底層也比較複雜,要實現算子方法時才使用。數據結構
咱們只針對可管理的狀態進行操做,不一樣的管理 Keyed State 和Operator State 狀態原始方法定義可參考官網介紹。
咱們針對Keyed managed state進行編程。來個場景,假如Flink計算某個功能的時間,若是某個功能Key時間超過某個閾值了則進行計數,若是數據超過了設置的次數,那麼直接輸出到控制檯。直接參考以下代碼。
代碼大體的思路是:
繼承RichFlatMapFunction, 定義一個ListState<Long>用於記錄當前的狀態。
定義閾值和錯誤次數值,觸發後直接輸出控制檯下。
open方法實例化ListState。在裏邊設置了一下狀態的TTL,即狀態的生命週期。
flatMap方法按key分配後的value進行判斷和記錄。
最後main方法進行數據準備和輸出。
package myflink.state; import org.apache.commons.compress.utils.Lists; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.StateTtlConfig; import org.apache.flink.api.common.time.Time; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; public class ThresholdWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Long>>> { //經過ListState來存儲非正常數據的狀態 private transient ListState<Long> abnormalData; //須要監控的閾值 private Long threshold; //觸發報警的次數 private Integer numberOfTimes; public ThresholdWarning(Long threshold, Integer numberOfTimes) { this.threshold = threshold; this.numberOfTimes = numberOfTimes; } @Override public void open(Configuration parameters) throws Exception { ListStateDescriptor listStateDescriptor = new ListStateDescriptor<Long>("abnormal-state", TypeInformation.of(Long.class)); //狀態存活生命週期設置TTL Time To Live StateTtlConfig ttlConfig = StateTtlConfig //設置有效期爲10秒 .newBuilder(Time.seconds(10L)) //設置有效的更新規則,當建立和寫入的時候須要從新更新爲10S .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) //設置狀態的可見性,設置狀態若是沒有刪除,那麼就是可見的,另一個值:ReturnExpiredIfNotCleanedUp , // 若是沒有清理的話,狀態會一直可見的 .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) .build(); //設置TTL配置 listStateDescriptor.enableTimeToLive(ttlConfig); //經過狀態名稱(句柄)獲取狀態實例,若是不存在則會自動建 abnormalData = getRuntimeContext().getListState(new ListStateDescriptor<Long>("abnormal-state", TypeInformation.of(Long.class))); } @Override public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Long>>> out) throws Exception { Long inputValue = value.f1; //若是輸入的值超過閾值,則記錄該次不正常的數據信息 if(inputValue >= threshold) { abnormalData.add(inputValue); } ArrayList<Long> list = Lists.newArrayList(abnormalData.get().iterator()); //若是不正常的數據超過了指定的數量,則輸出報警信息 if(list.size() >= numberOfTimes) { out.collect(Tuple2.of(value.f0 + " 超過指定閾值數量", list)); //報警信息輸出後,清空狀態 abnormalData.clear(); } } public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置並行度爲1,用於觀察輸出 // env.setParallelism(1); DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements( Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L), Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L), Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L), Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L)); tuple2DataStreamSource .keyBy(0) //超過100的閾值3次後輸出報警信息 .flatMap(new ThresholdWarning(100L, 3)) .printToErr(); env.execute("Managed Keyed State"); } }
輸出的結果以下,大於等於100的出現3次即進行輸出。和咱們想象的都同樣。
1> (b 超過指定閾值數量,[100, 200, 200]) 1> (b 超過指定閾值數量,[500, 600, 700]) 3> (a 超過指定閾值數量,[400, 100, 200])
咱們還在原來基礎的例子上調整一下,不按key,按Operator類型,只要超過期間的次數達到了就要輸出。在其中,把Operator的hashCode進行輸出一下,用於區分是否爲相同的Operator。首先咱們將並行度設置爲1,而後一下子再把並行度調整成2。
代碼的大體思路爲:
繼承RichFlatMapFunction,實現CheckpointedFunction接口,即在觸發檢查點的時候進行操做。
initializeState方法的時候將opertor的狀態和檢查點狀態進行初始化。
snapshotState方法即存儲狀態時將當時的鏡像進行存儲。能夠存儲到外部設備。
flatMap的時候進行閾值判斷和數據收集。
main方法進行檢查點設置,數據準備,執行和輸出。
package myflink.state; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ListState; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.state.FunctionInitializationContext; import org.apache.flink.runtime.state.FunctionSnapshotContext; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; public class ThresholdOperatorWarning extends RichFlatMapFunction<Tuple2<String, Long>, Tuple2<String, List<Tuple2<String, Long>>>> implements CheckpointedFunction { //非正確數據狀態 private List<Tuple2<String, Long>> bufferedData; //檢查點狀態 private transient ListState<Tuple2<String, Long>> checkPointedState; //須要監控的閾值 private Long threshold; //次數 private Integer numberOfTimes; ThresholdOperatorWarning(Long threshold, Integer numberOfTimes) { this.threshold = threshold; this.numberOfTimes = numberOfTimes; this.bufferedData = new ArrayList<>(); } @Override public void flatMap(Tuple2<String, Long> value, Collector<Tuple2<String, List<Tuple2<String, Long>>>> out) throws Exception { Long inputValue = value.f1; //超過閾值則進行記錄 if(inputValue >= threshold) { bufferedData.add(value); } //超過指定次數則進行彙總和彙總輸出 if(bufferedData.size() >= numberOfTimes) { //輸出狀態實例的hashCode out.collect(Tuple2.of(checkPointedState.hashCode() + "閾值報警! " , bufferedData )); //清理緩存 bufferedData.clear(); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { //當數據進行快照時,將數據存儲到checkPointedState checkPointedState.clear(); for (Tuple2<String, Long> element : bufferedData) { checkPointedState.add(element); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { //這裏獲取的是operatorStateStore checkPointedState = context.getOperatorStateStore().getListState(new ListStateDescriptor<Tuple2<String, Long>>( "abnormalData", TypeInformation.of(new TypeHint<Tuple2<String, Long>>() {}) )); //若是發生重啓,則須要從快照中將狀態進行恢復 if(context.isRestored()) { for (Tuple2<String, Long> element : checkPointedState.get()) { bufferedData.add(element); } } } public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //開啓檢查點 env.enableCheckpointing(1000L); // 其餘可選配置以下: // 設置語義,默認是EXACTLY_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 設置檢查點之間最小停頓時間 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 設置執行Checkpoint操做時的超時時間 env.getCheckpointConfig().setCheckpointTimeout(60000); // 設置最大併發執行的檢查點的數量 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 將檢查點持久化到外部存儲 env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設置並行度爲1 env.setParallelism(1); //數據源 DataStreamSource<Tuple2<String, Long>> tuple2DataStreamSource = env.fromElements( Tuple2.of("a", 50L), Tuple2.of("a", 80L), Tuple2.of("a", 400L), Tuple2.of("a", 100L), Tuple2.of("a", 200L), Tuple2.of("a", 200L), Tuple2.of("b", 100L), Tuple2.of("b", 200L), Tuple2.of("b", 200L), Tuple2.of("b", 500L), Tuple2.of("b", 600L), Tuple2.of("b", 700L) ); tuple2DataStreamSource.flatMap(new ThresholdOperatorWarning(100L, 3)) .printToErr(); env.execute("managed Operator State"); } }
當前並行度爲1,結果以下,數據沒有按key統計,而是按照裏邊的值進行統計,符合咱們的要求。由於是同一個Operator,因此hashcode是同樣的。
(1629838640閾值報警! ,[(a,400), (a,100), (a,200)]) (1629838640閾值報警! ,[(a,200), (b,100), (b,200)]) (1629838640閾值報警! ,[(b,200), (b,500), (b,600)])
接下來將並行度設置爲2,結果以下。咱們看一下main裏邊的數據,符合大於等於100的數據一共有10個,那麼兩個不一樣的operator分配的時候這10數據的時候,一個operator分5個,那麼知足超過3個的時候才收集並輸出。由於5個裏邊只有一組3個知足,2個不知足因此不會輸出,因此符合咱們的預期。
1> (475161679閾值報警! ,[(a,100), (a,200), (b,200)])
2> (1633355453閾值報警! ,[(a,400), (a,200), (b,100)])
上邊咱們程序裏邊設置了檢查點,檢查點是當數據進行處理的時候將數據的狀態進行記錄,當程序出現問題的時候方便恢復。
能夠像這樣的狀況: 數據源——> 123456789|12345678| 12341234|——>sink。|即檢查點,是一個checkpoint barrier,當算子運行計算的時候會把當前的狀態進行記錄,好比讀取Kafka的數據,假如讀取到offset=6868,而後將這個值進行了記錄, 當這時有機器出現了問題,程序須要進行恢復並執行,那麼須要從新讀取這條數據再計算。引用一張圖片能夠有更清楚的認識。
默認狀況下,檢查點是關閉着的,咱們須要明確開啓。其餘的一些配置可參考以下內容:
//開啓檢查點 env.enableCheckpointing(1000L); // 其餘可選配置以下: // 設置語義,默認是EXACTLY_ONCE env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 設置檢查點之間最小停頓時間 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 設置執行Checkpoint操做時的超時時間 env.getCheckpointConfig().setCheckpointTimeout(60000); // 設置最大併發執行的檢查點的數量 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 將檢查點持久化到外部存儲 env.getCheckpointConfig().enableExternalizedCheckpoints( CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
flink支持手工將檢查點的狀態存儲到外部,也能夠指定存儲到HDFS文件。存儲到外邊是爲了程序出現了問題時進行恢復,好比OOM問題。程序升級和重啓時也須要從新從檢查點進行恢復。
# 觸發指定id的做業的Savepoint,並將結果存儲到指定目錄下
bin/flink savepoint :jobId [:targetDirectory]
Keyed State和Operator State會存儲在內存中,由於數據是持續不斷的輸入的,當數據量很是大的時候,內存會出現不足的狀況,那麼咱們也是須要將當前的狀態進行保存的。官方稱爲狀態後端。
MemoryStateBackend,這種方式是將數據存儲在JVM中,這種方式是用於開發。
FsStateBackend, 即以文件的形式存儲到磁盤中,能夠是HDFS或本地文件。當JobManger把任務發送給Taskmanger進行計算,此時數據會在JVM中,當觸發了checkpoint後纔會將數據存儲到文件中。
RocksDBStateBackend,這種形式是介於前邊兩種的狀況,這個是將狀態數據到KV數據庫中,當觸發狀態的時候會將數據再持久化到文件中。這樣即提升了速度,空間也變得更大了。
默認狀況是MemoryStateBackend,即內存中。
剩下兩種的配置以下,這種方式只對當前Job有效。RocksDB配置的話須要額外引用一下包。
// 配置 FsStateBackend env.setStateBackend(new FsStateBackend("hdfs://namenode:60060/flink/checkpoints")); // 配置 RocksDBStateBackend env.setStateBackend(new RocksDBStateBackend("hdfs://namenode:60060/flink/checkpoints"))
經過修改flink-yaml.conf能夠對該集羣全部做業生效。
state.backend: filesystem state.checkpoints.dir: hdfs://namenode:60060/flink/checkpoints