在批處理過程當中,數據是劃分爲塊分片去完成的,而後每個Task去處理一個分片。當分片執行完成後,把輸出聚合起來就是最終的結果。在這個過程中,對於state的需求仍是比較小的。java
在流計算過程當中,對State有很是高的要求,由於在流系統中輸入是一個無限制的流,會持續運行從不間斷。在這個過程中,就須要將狀態數據很好的管理起來。node
Flink的失敗恢復依賴於「檢查點機制+可部分重發的數據源」。算法
檢查點機制:檢查點按期觸發,產生快照,快照中記錄了(1)當前檢查點開始時數據源(例如Kafka)中消息的offset,(2)記錄了全部有狀態的operator當前的狀態信息(例如sum中的數值)。apache
可部分重發的數據源:Flink選擇最近完成的檢查點K。而後系統重放整個分佈式的數據流,而後給予每一個operator他們在檢查點k快照中的狀態。數據源被設置爲從位置Sk開始從新讀取流。例如在Apache Kafka中,那意味着告訴消費者從偏移量Sk開始從新消費。後端
Flink中有兩種基本類型的State,即Keyed State和Operator State。api
State能夠被記錄,在失敗的狀況下數據還能夠恢復數據結構
爲了保證state的容錯性,Flink須要對state進行checkpoint。app
Checkpoint是Flink實現容錯機制最核心的功能,它可以根據配置週期性地基於Stream中各個Operator/task的狀態來生成快照,從而將這些狀態數據按期持久化存儲下來,當Flink程序一旦意外崩潰時,從新運行程序時能夠有選擇地從這些快照進行恢復,從而修正由於故障帶來的程序數據異常框架
Flink的checkpoint機制能夠與(stream和state)的持久化存儲交互的前提是: 持久化的source(如kafka),它須要支持在必定時間內重放事件。這種sources的典型例子是持久化的消息隊列(好比Apache Kafka,RabbitMQ等)或文件系統(好比HDFS,S3,GFS等) 用於state的持久化存儲,例如分佈式文件系統(好比HDFS,S3,GFS等)socket
Flink的檢查點機制實現了標準的Chandy-Lamport算法,並用來實現分佈式快照。在分佈式快照當中,有一個核心的元素:Barrier。
單流的barrier:
1: 屏障做爲數據流的一部分隨着記錄被注入到數據流中。屏障永遠不會趕超一般的流記錄,它會嚴格遵循順序。
2: 屏障將數據流中的記錄隔離成一系列的記錄集合,並將一些集合中的數據加入到當前的快照中,而另外一些數據加入到下一個快照中。
3: 每個屏障攜帶着快照的ID,快照記錄着ID而且將其放在快照數據的前面。
4: 屏障不會中斷流處理,所以很是輕量級。
並行barrier
1:不止一個輸入流的時的operator,須要在快照屏障上對齊(align)輸入流,纔會發射出去。 2:能夠看到1,2,3會一直放在Input buffer,直到另外一個輸入流的快照到達Operator。
Stateful Flink applications are optimized for local state access. Task state is always maintained in memory or, if the state size exceeds the available memory, in access-efficient on-disk data structures. Hence, tasks perform all computations by accessing local, often in-memory, state yielding very low processing latencies. Flink guarantees exactly-once state consistency in case of failures by periodically and asynchronously checkpointing the local state to durable storage. 複製代碼
Keyed State和Operator State,能夠以兩種形式存在:
原始狀態(raw state)
託管狀態(managed state)
託管狀態是由Flink框架管理的狀態
原始狀態,由用戶自行管理狀態具體的數據結構,框架在作checkpoint的時候,使用byte[]來讀寫狀態內容,對其內部數據結構一無所知。
一般在DataStream上的狀態推薦使用託管的狀態。
當實現一個用戶自定義的operator時,會使用到原始狀態
顧名思義,就是基於KeyedStream上的狀態。這個狀態是跟特定的key綁定的,對KeyedStream流上的每個key,都對應一個state。 stream.keyBy(…)
state的數據結構;
(1) ValueState:即類型爲T的單值狀態。這個狀態與對應的key綁定,是最簡單的狀態了。它能夠經過update方法更新狀態值,經過value()方法獲取狀態值
(2) ListState:即key上的狀態值爲一個列表。能夠經過add方法往列表中附加值;也能夠經過get()方法返回一個Iterable來遍歷狀態值
(3) ReducingState:這種狀態經過用戶傳入的reduceFunction,每次調用add方法添加值的時候,會調用reduceFunction,最後合併到一個單一的狀態值
(4) MapState<UK, UV>:即狀態值爲一個map。用戶經過put或putAll方法添加元素
須要注意的是,以上所述的State對象,僅僅用於與狀態進行交互(更新、刪除、清空等),而真正的狀態值,有多是存在內存、磁盤、或者其餘分佈式存儲系統中。至關於咱們只是持有了這個狀態的句柄。實際上:這些狀態有三種存儲方式:
MemoryStateBackend: FsStateBackend RockDBStateBackend 複製代碼
MemoryStateBackend
state數據保存在java堆內存中,執行checkpoint的時候,會把state的快照數據保存到jobmanager的內存中 基於內存的state backend在生產環境下不建議使用。
FsStateBackend
state數據保存在taskmanager的內存中,執行checkpoint的時候,會把state的快照數據保存到配置的文件系統中,可使用hdfs等分佈式文件系統。
RocksDBStateBackend
RocksDB跟上面的都略有不一樣,它會在本地文件系統中維護狀態,state會直接寫入本地rocksdb中。同時RocksDB須要配置一個遠端的filesystem。
uri(通常是HDFS),在作checkpoint的時候,會把本地的數據直接複製到filesystem中。fail over的時候從filesystem中恢復到本地。
RocksDB克服了state受內存限制的缺點,同時又可以持久化到遠端文件系統中,比較適合在生產中使用
package xuwei.tech.streaming; import org.apache.flink.api.common.functions.RichFlatMapFunction; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.configuration.Configuration; import org.apache.flink.util.Collector; /** * qinkaixin 2018 11 24 */ public class CountWindowAverage extends RichFlatMapFunction<Tuple2<Long, Long>, Tuple2<Long, Long>> { /** * The ValueState handle. The first field is the count, the second field a running sum. */ private transient ValueState<Tuple2<Long, Long>> sum; @Override public void flatMap(Tuple2<Long, Long> input, Collector<Tuple2<Long, Long>> out) throws Exception { // access the state value Tuple2<Long, Long> currentSum = sum.value(); // update the count currentSum.f0 += 1; // add the second field of the input value currentSum.f1 += input.f1; // update the state sum.update(currentSum); // if the count reaches 2, emit the average and clear the state if (currentSum.f0 >= 2) { out.collect(new Tuple2<>(input.f0, currentSum.f1 / currentSum.f0)); sum.clear(); } } @Override public void open(Configuration config) { ValueStateDescriptor<Tuple2<Long, Long>> descriptor = new ValueStateDescriptor<>( "average", TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), Tuple2.of(0L, 0L)); // default value of the state, if nothing was set sum = getRuntimeContext().getState(descriptor); } } 複製代碼
public static void main(String[] args) throws Exception{ //獲取Flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.fromElements(Tuple2.of(1L, 3L), Tuple2.of(1L, 5L), Tuple2.of(1L, 7L), Tuple2.of(1L, 4L), Tuple2.of(1L, 2L)) .keyBy(0) .flatMap(new CountWindowAverage()) .print(); env.execute("StafulOperator"); System.out.println("***********"); } 複製代碼
與Key無關的State,與Operator綁定的state,整個operator只對應一個state
保存Operator state的數據結構爲ListState
舉例來講,Flink中的Kafka Connector,就使用了operator state。它會在每一個connector實例中,保存該實例中消費topic的全部(partition, offset)映射
繼承CheckpointedFunction,實現snapshotState和restoreState。
To use managed operator state, a stateful function can implement either the more general CheckpointedFunction interface, or the ListCheckpointed<T extends Serializable> interface. Whenever a checkpoint has to be performed, snapshotState() is called. The counterpart,initializeState(), is called every time the user-defined function is initialized, be that when the function is first initialized or be that when the function is actuallyrecovering from an earlier checkpoint. Given this, initializeState() is not only the place where different types of state are initialized, but also where state recovery logic is included. 複製代碼
public class BufferingSink implements SinkFunction<Tuple2<String, Integer>>, CheckpointedFunction { private final int threshold; private transient ListState<Tuple2<String, Integer>> checkpointedState; private List<Tuple2<String, Integer>> bufferedElements; public BufferingSink(int threshold) { this.threshold = threshold; this.bufferedElements = new ArrayList<>(); } @Override public void invoke(Tuple2<String, Integer> value) throws Exception { bufferedElements.add(value); if (bufferedElements.size() == threshold) { for (Tuple2<String, Integer> element: bufferedElements) { // send it to the sink } bufferedElements.clear(); } } @Override public void snapshotState(FunctionSnapshotContext context) throws Exception { checkpointedState.clear(); for (Tuple2<String, Integer> element : bufferedElements) { checkpointedState.add(element); } } @Override public void initializeState(FunctionInitializationContext context) throws Exception { ListStateDescriptor<Tuple2<String, Integer>> descriptor = new ListStateDescriptor<>( "buffered-elements", TypeInformation.of(new TypeHint<Tuple2<String, Integer>>() {})); checkpointedState = context.getOperatorStateStore().getListState(descriptor); if (context.isRestored()) { for (Tuple2<String, Integer> element : checkpointedState.get()) { bufferedElements.add(element); } } } } 複製代碼
public static class CounterSource extends RichParallelSourceFunction<Long> implements ListCheckpointed<Long> { /** current offset for exactly once semantics */ private Long offset; /** flag for job cancellation */ private volatile boolean isRunning = true; @Override public void run(SourceContext<Long> ctx) { final Object lock = ctx.getCheckpointLock(); while (isRunning) { // output and state update are atomic synchronized (lock) { ctx.collect(offset); offset += 1; } } } @Override public void cancel() { isRunning = false; } @Override public List<Long> snapshotState(long checkpointId, long checkpointTimestamp) { return Collections.singletonList(offset); } @Override public void restoreState(List<Long> state) { for (Long s : state) offset = s; } } 複製代碼
默認checkpoint功能是disabled的,想要使用的時候須要先啓用
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000 ms進行啓動一個檢查點【設置checkpoint的週期】 env.enableCheckpointing(1000); // 高級選項: // 設置模式爲exactly-once (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只容許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink處理程序被cancel後,會保留Checkpoint數據,以便根據實際須要恢復到指定的Checkpoint【詳細解釋見備註】 env.getCheckpointConfig().enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); cancel處理選項: (1)ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION: 表示一旦Flink處理程序被cancel後,會保留Checkpoint數據,以便根據實際須要恢復到指定 的Checkpoint (2)ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel後,會刪除Checkpoint數據,只有job執行失敗的時候纔會 保存checkpoint 複製代碼
修改State Backend的兩種方式
第一種:單任務調整
修改當前任務代碼 env.setStateBackend(new FsStateBackend("hdfs://namenode:9000/flink/checkpoints")); 或者new MemoryStateBackend() 或者new RocksDBStateBackend( hdfs->url, true);【須要添加第三方依賴】 複製代碼
第二種:全局調整
修改flink-conf.yaml state.backend: filesystem state.checkpoints.dir: hdfs://namenode:9000/flink/checkpoints 注意:state.backend的值能夠是下面幾種:jobmanager(MemoryStateBackend), filesystem(FsStateBackend), rocksdb(RocksDBStateBackend) 複製代碼
import org.apache.flink.api.common.functions.FlatMapFunction; import org.apache.flink.api.java.utils.ParameterTool; import org.apache.flink.contrib.streaming.state.RocksDBStateBackend; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.runtime.state.memory.MemoryStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.util.Collector; public class SocketWindowWordCountJavaCheckPoint { public static void main(String[] args) throws Exception{ //獲取須要的端口號 int port; try { ParameterTool parameterTool = ParameterTool.fromArgs(args); port = parameterTool.getInt("port"); }catch (Exception e){ System.err.println("No port set. use default port 9000--java"); port = 9010; } //獲取flink的運行環境 StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // 每隔1000 ms進行啓動一個檢查點【設置checkpoint的週期】 env.enableCheckpointing(1000); // 高級選項: // 設置模式爲exactly-once (這是默認值) env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); // 確保檢查點之間有至少500 ms的間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(500); // 檢查點必須在一分鐘內完成,或者被丟棄【checkpoint的超時時間】 env.getCheckpointConfig().setCheckpointTimeout(60000); // 同一時間只容許進行一個檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); // 表示一旦Flink處理程序被cancel後,會保留Checkpoint數據,以便根據實際須要恢復到指定的Checkpoint【詳細解釋見備註】 //ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION:表示一旦Flink處理程序被cancel後,會保留Checkpoint數據,以便根據實際須要恢復到指定的Checkpoint //ExternalizedCheckpointCleanup.DELETE_ON_CANCELLATION: 表示一旦Flink處理程序被cancel後,會刪除Checkpoint數據,只有job執行失敗的時候纔會保存checkpoint env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設置statebackend //env.setStateBackend(new MemoryStateBackend()); //env.setStateBackend(new FsStateBackend("hdfs://hadoop100:9000/flink/checkpoints")); //env.setStateBackend(new RocksDBStateBackend("hdfs://hadoop100:9000/flink/checkpoints",true)); String hostname = "SparkMaster"; String delimiter = "\n"; //鏈接socket獲取輸入的數據 DataStreamSource<String> text = env.socketTextStream(hostname, port, delimiter); // a a c // a 1 // a 1 // c 1 DataStream<WordWithCount> windowCounts = text.flatMap(new FlatMapFunction<String, WordWithCount>() { public void flatMap(String value, Collector<WordWithCount> out) throws Exception { String[] splits = value.split("\\s"); for (String word : splits) { out.collect(new WordWithCount(word, 1L)); } } }).keyBy("word") .timeWindow(Time.seconds(2), Time.seconds(1))//指定時間窗口大小爲2秒,指定時間間隔爲1秒 .sum("count");//在這裏使用sum或者reduce均可以 /*.reduce(new ReduceFunction<WordWithCount>() { public WordWithCount reduce(WordWithCount a, WordWithCount b) throws Exception { return new WordWithCount(a.word,a.count+b.count); } })*/ //把數據打印到控制檯而且設置並行度 windowCounts.print().setParallelism(1); //這一行代碼必定要實現,不然程序不執行 env.execute("Socket window count"); } public static class WordWithCount{ public String word; public long count; public WordWithCount(){} public WordWithCount(String word,long count){ this.word = word; this.count = count; } @Override public String toString() { return "做者 : 秦凱新 , 窗大小2秒,滑動1秒 {" + " word='" + word + '\'' + ", count=" + count + '}'; } } } 複製代碼