前言
回顧第一講:咱們瞭解了流(有界,無界),窗口(翻轉,滑動,會話,全局)等基本概念。
知道了DataStreamAPI的基本用法:
source->transformation->sink的基本步驟
其中
source可使用flink原生的(如kafka,rabbitmq...)還能夠繼承RichSourceFunction
transformation 經常使用的有window keyby map reduce 聚合函數...
sink可使用flink原生的(如kafka,rabbitmq...)還能夠繼承RichSinkFunction
第一講詳見:https://segmentfault.com/a/11...html
可是咱們知道,Flink api 有三大類,分別是DataStreamAPI DataSetAPI tableAPI(sql)
DataSetAPI 和 DataStreamAPI 區別是一個是批處理,一個是流處理。而今天將介紹 table api用法。
另外 咱們知道,flink是有狀態的流式處理計算。那麼它的狀態是桌面管理的?有哪些類型?咱們該怎麼樣去使用?我這一講會以kafka消費位點 爲切入點,一步一步的分析出kafka爲什麼是能夠保證exactly-once 。瞭解了 flink 對 kafka消費位點原理後。咱們怎麼使用flink的state存儲咱們想要的數據呢?除此以外,今天還會介紹flink的水印,觸發器,清除器,吃到生存週期,還會簡單介紹一下flink的processfunction.好比咱們想合併兩個數據源,就會用到coprocessfunction..java
即這一講的內容包含:mysql
一 flink 關係型API (table API sql)
批處理和流式傳輸的全部Table API和SQL程序都遵循相同的模式。如下代碼示例顯示了Table API和SQL程序的常見結構。git
// for batch programs use ExecutionEnvironment instead of StreamExecutionEnvironment StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); // create a TableEnvironment // for batch programs use BatchTableEnvironment instead of StreamTableEnvironment StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); // register a Table tableEnv.registerTable("table1", ...) // or tableEnv.registerTableSource("table2", ...); // or tableEnv.registerExternalCatalog("extCat", ...); // register an output Table tableEnv.registerTableSink("outputTable", ...); // create a Table from a Table API query Table tapiResult = tableEnv.scan("table1").select(...); // create a Table from a SQL query Table sqlResult = tableEnv.sqlQuery("SELECT ... FROM table2 ... "); // emit a Table API result Table to a TableSink, same for SQL result tapiResult.insertInto("outputTable"); // execute env.execute();
注意:表API和SQL查詢能夠輕鬆集成並嵌入到DataStream或DataSet程序中github
下面舉一個例子來講明:sql
package cn.crawler.mft_seconed.demo2; import cn.crawler.mft_seconed.KafkaEntity; import cn.crawler.mft_seconed.KafkaEntityTableSource; import com.alibaba.fastjson.JSON; import org.apache.flink.api.common.functions.MapFunction; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.api.common.typeinfo.Types; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.table.api.Table; import org.apache.flink.table.api.Tumble; import org.apache.flink.table.api.WindowGroupedTable; import org.apache.flink.table.api.java.StreamTableEnvironment; import java.lang.reflect.Type; import java.util.Properties; /** * Created by liuliang * on 2019/7/13 */ public class TableAPIDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env); //1.簡單的kafka->flink sql->mysql //配置正好執行一次策略 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); //1S執行一次checkpoint env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "172.19.141.XX:31090"); prop.setProperty("group.id", "flink-group"); FlinkKafkaConsumer011 myConsumer = new FlinkKafkaConsumer011("fan_or_jia", new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默認消費策略 DataStreamSource<String> kafkaSource = env.addSource(myConsumer); //將kafka中取出的數據流映射爲operator SingleOutputStreamOperator<KafkaEntity> map = kafkaSource.map(new MapFunction<String, KafkaEntity>() { private static final long serialVersionUID = 1471936326697828381L; @Override public KafkaEntity map(String value) { KafkaEntity kafkaEntity = JSON.parseObject(value, KafkaEntity.class); return kafkaEntity; } }); map.print(); //打印operator //註冊爲mft_flink_kafka 表 從map裏面取值,字段分別是id,message,name,time //數據流無縫轉換爲table tableEnv.registerDataStream("mft_flink_kafka",map,"id,message,name,create_time"); // Table table = tableEnv.scan("mft_flink_kafka"); // Table table1 = table.filter("id%2!=0") // .window(Tumble.over("1000.millis").on("rowtime").as("total")) // .groupBy("total") // .select("id,name,total.end as second"); // table1.printSchema(); Table sqlQuery = tableEnv.sqlQuery("select id,message,name,create_time from mft_flink_kafka"); //sink to mysql DataStream<Tuple4<Integer,String,String,Long>> appendStream = tableEnv.toAppendStream(sqlQuery, Types.TUPLE(Types.INT, Types.STRING,Types.STRING, Types.LONG)); appendStream.print(); appendStream.map(new MapFunction<Tuple4<Integer,String,String,Long>, KafkaEntity>() { @Override public KafkaEntity map(Tuple4<Integer, String,String,Long> stringStringTuple4) throws Exception { return new KafkaEntity(stringStringTuple4.f0,stringStringTuple4.f1,stringStringTuple4.f2,stringStringTuple4.f3); } }).addSink(new SinkKafkaEntity2Mysql()); // 2.table api 帶窗口函數的,處理時間屬性由KafkaEntityTableSource實現DefinedProctimeAttribute接口的定義。邏輯時間屬性附加到由返回類型定義的物理模式TableSource。 // tableEnv.registerDataStream("kafka_demo",new KafkaEntityTableSource()); env.execute("kafkaEntity from Kafka"); } }
package cn.crawler.mft_seconed.demo2; import cn.crawler.mft_seconed.KafkaEntity; import com.alibaba.fastjson.JSON; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; import java.util.UUID; /** * Created by liuliang * on 2019/7/13 */ public class SendDataToKafkaSql { public static void main(String[] args){ SendDataToKafkaSql sendDataToKafka = new SendDataToKafkaSql(); for(int i=100;i<200;i++){ String name = ""; if(i%2==0){ name = "范冰冰"; }else { name = "賈玲"; } KafkaEntity build = KafkaEntity.builder() .message("meaasge" + i) .id(i) .name(name+i) .create_time(System.currentTimeMillis()) .build(); System.out.println(build.toString()); sendDataToKafka.send("fan_or_jia", "123", JSON.toJSONString(build)); } } public void send(String topic,String key,String data){ Properties props = new Properties(); props.put("bootstrap.servers", "172.19.141.60:31090"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=1;i<2;i++){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(new ProducerRecord<String, String>(topic, key+i, data)); } prod ucer.close(); } }
package cn.crawler.mft_seconed.demo2; import cn.crawler.mft_seconed.KafkaEntity; import cn.crawler.util.MysqlUtil; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; /** * Created by liuliang * on 2019/7/15 */ public class SinkKafkaEntity2Mysql extends RichSinkFunction<KafkaEntity> { /** * 每條數據得插入都要掉一次invoke方法 * @param kafkaEntity * @param context * @throws Exception */ @Override public void invoke(KafkaEntity kafkaEntity, Context context) throws Exception { MysqlUtil mysqlUtil = new MysqlUtil(); mysqlUtil.insertKafkaEntity(kafkaEntity); } }
注:table api 還能夠有流字段的邏輯時間屬性... 也很簡單,建立一個接受類,繼承StreamTableSource<Row>就好,有機會能夠詳細介紹,詳見官網:https://ci.apache.org/project...apache
二 flink狀態管理(state)
什麼是State(狀態)?
能夠回憶一下http是有狀態的傳輸協議嗎?json
咱們能夠這樣理解: state是flink在某task/operator在某時刻的一箇中間結果 快照(shapshot) 在flink中狀態能夠理解爲一種數據結構
State存在哪裏呢?(這裏能夠畫一下flink使用state時的結構圖引出此問題)
官網爲咱們推薦了state的三種存儲方式(https://ci.apache.org/project...):bootstrap
(說明:因爲目前沒有相似HDFS給我存儲,本博客都是使用的第一種,也就是flink默認的)
總的來講,state分爲兩種,operator state和key state,key state專門對keystream使用,所包含的Sate種類也更多,可理解爲dataStream.keyBy()以後的Operator State,Operator State是對每個Operator的狀態進行記錄,而key State則是在dataSteam進行keyBy()後,記錄相同keyId的keyStream上的狀態key State提供的數據類型:ValueState<T>、ListState<T>、ReducingState<T>、MapState<T>。
operator state種類只有一種就是ListState<T> ,flink官方文檔用kafka的消費者舉例,認爲kafka消費者的partitionId和offset相似flink的operator statesegmentfault
說了這麼多,有些概念第一次見到,會很懵逼,我仍是重複之前的方式,對於難以理解的東西,先舉一個實際例子看效果。而後根據咱們看到的效果簡單分析一下他是怎麼作到的。而後知道state能夠幹什麼之後,再來細細的分析state的基礎知識。
演示程序:
package cn.crawler.mft_seconed.demo1; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Created by liuliang * on 2019/7/12 */ public class SendDataToKafkaDemo { public static void main(String[] args){ SendDataToKafkaDemo sendDataToKafka = new SendDataToKafkaDemo(); for(int i=100;i<200;i++){ sendDataToKafka.send("demo", "123", "這是測試的數據"+i); } } public void send(String topic,String key,String data){ Properties props = new Properties(); props.put("bootstrap.servers", "172.19.141.60:31090"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String,String>(props); for(int i=1;i<2;i++){ try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } producer.send(new ProducerRecord<String, String>(topic, key+i, data)); } producer.close(); } }
package cn.crawler.mft_seconed.demo1; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import java.util.Properties; /** * Created by liuliang * on 2019/6/19 */ public class KafkaDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //配置正好執行一次策略 env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); env.enableCheckpointing(1000); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); Properties prop = new Properties(); prop.setProperty("bootstrap.servers", "172.19.141.60:31090"); prop.setProperty("group.id", "flink-group"); FlinkKafkaConsumer011 myConsumer = new FlinkKafkaConsumer011("mysqltest", new SimpleStringSchema(), prop); myConsumer.setStartFromGroupOffsets();//默認消費策略 DataStreamSource<String> source = env.addSource(myConsumer); source.addSink(new PrintSinkFunction()); env.execute("StreamingFromCollection"); } }
咱們先將KafkaDemo(前者)啓動,而後啓動SendDataToKafkaDemo(後者) 。讓後者像前者發送數據,正常狀況下,前者會接收到後者的全部數據。如今,咱們在後者發送數據的過程當中,將前者程序中止(模擬程序crash) 這時候根據輸出,觀察前者消費了多少數據。記住數字1.而後再啓動後者,觀察開始輸出的數字2是不是接着數字1的。咱們能夠看到,數字2就是接着數字1的。前者在crash的時候保留了狀態!
接下來就是揭祕環節:
咱們以flink消費kafka消費位點爲例。咱們知道flink結合checkpoint能夠將kafka實現僅僅執行一次的原理。
那麼flink是如何管理實現的呢?接下來的知識點可能須要知道kafka的工做原理(不瞭解的須要自行百度一下,我就不介紹了)真的不懂,我就簡單介紹一下,並配合rabbitmq ack後 刪除消息 以示區別。
總之:
kafka:kafka的consumer會記錄當前消費的offset,下次會從offset開始繼續消費
rabbitmq:rabbitmq的consumer在確認消費後,會向Queque發送一個ACK應答,queque收到ACK應答後,會將消息刪除
哦..那必定是flink將offset存起來了!而後根據保存的offset繼續日後消費!那麼flink將offset存在哪裏了呢?固然是state了
Flink 中實現的 Kafka 消費者是一個有狀態的算子(operator),它集成了 Flink 的檢查點機制,它的狀態是全部 Kafka 分區的讀取偏移量。當一個檢查點被觸發時,每個分區的偏移量都被存到了這個檢查點中。Flink 的檢查點機制保證了全部 operator task 的存儲狀態都是一致的。
第一步:
以下所示,一個 Kafka topic,有兩個partition,每一個partition都含有 「A」, 「B」, 「C」, 」D」, 「E」 5條消息。咱們將兩個partition的偏移量(offset)都設置爲0.
第二步:
Kafka comsumer(消費者)開始從 partition 0 讀取消息。消息「A」正在被處理,第一個 consumer 的 offset 變成了1。
第三步:
消息「A」到達了 Flink Map Task。兩個 consumer 都開始讀取他們下一條消息(partition 0 讀取「B」,partition 1 讀取「A」)。各自將 offset 更新成 2 和 1 。同時,Flink 的 JobManager 開始在 source 觸發了一個檢查點。
第四步:
接下來,因爲 source 觸發了檢查點,Kafka consumer 建立了它們狀態的第一個快照(」offset = 2, 1」),並將快照存到了 Flink 的 JobManager 中。Source 在消息「B」和「A」從partition 0 和 1 發出後,發了一個 checkpoint barrier。Checkopint barrier 用於各個 operator task 之間對齊檢查點,保證了整個檢查點的一致性。消息「A」到達了 Flink Map Task,而上面的 consumer 繼續讀取下一條消息(消息「C」)。
第五步:
Flink Map Task 收齊了同一版本的所有 checkpoint barrier 後,那麼就會將它本身的狀態也存儲到 JobManager。同時,consumer 會繼續從 Kafka 讀取消息。
第六步:
Flink Map Task 完成了它本身狀態的快照流程後,會向 Flink JobManager 彙報它已經完成了這個 checkpoint。當全部的 task 都報告完成了它們的狀態 checkpoint 後,JobManager 就會將這個 checkpoint 標記爲成功。今後刻開始,這個 checkpoint 就能夠用於故障恢復了。值得一提的是,Flink 並不依賴 Kafka offset 從系統故障中恢復。
故障恢復
在發生故障時(好比,某個 worker 掛了),全部的 operator task 會被重啓,而他們的狀態會被重置到最近一次成功的 checkpoint。Kafka source 分別從 offset 2 和 1 從新開始讀取消息(由於這是完成的 checkpoint 中存的 offset)。看成業重啓後,咱們能夠期待正常的系統操做,就好像以前沒有發生故障同樣。以下圖所示:
既然flink-kafka是這樣實現的,那麼咱們怎麼自定義去使用state呢?下面也用程序來舉例說明一下:
CheckPointing
(1)介紹,實現方式分類
checkpoint能夠保存窗口和算子的執行狀態,在出現異常以後重啓計算任務,並保證已經執行和不會再重複執行,檢查點能夠分爲兩種,託管的和自定義的,託管檢查點會自動的進行存儲到指定位置:內存、磁盤和分佈式存儲中,自定義就須要自行實現保存相關,實現checkpoint有以下兩種方式:
使用託管State變量
使用自定義State變量實現CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口
下面將會給出兩種方式的使用代碼
(2) 使用Manage State,Flink自動實現state保存和恢復
下面先給出託管狀態變量(manage stata)使用代碼,後面給出了代碼執行的打印日誌。
代碼分析:
Console日誌輸出分析:
總結:source沒有使用manage state狀態丟失,windows使用manage state,異常狀態不丟失
package cn.crawler.mft_seconed.demo3; import org.apache.flink.api.common.state.ValueState; import org.apache.flink.api.common.state.ValueStateDescriptor; import org.apache.flink.api.common.typeinfo.TypeHint; import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple3; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.SourceFunction; import org.apache.flink.streaming.api.functions.windowing.RichWindowFunction; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.util.Collector; /** * Created by liuliang * on 2019/7/15 */ public class StateCheckPoint { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //打開並設置checkpoint // 1.設置checkpoint目錄,這裏我用的是本地路徑,記得本地路徑要file://開頭 // 2.設置checkpoint類型,at lease onece or EXACTLY_ONCE // 3.設置間隔時間,同時打開checkpoint功能 // env.setStateBackend(new FsStateBackend("file:///Users/liuliang/Documents/others/flinkdata/state_checkpoint")); // env.setStateBackend(new FsStateBackend("file://D:\\softs\\flink\\state")); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(1000); //添加source 每一個2s 發送10條數據,key=1,達到100條時候拋出異常 //source發送記錄到達100拋出異常 //source拋出異常以後,count發送統計數丟失,從新從0開始 //windows函數,重啓後調用open函數,獲取state數據,處理記錄數從checkpoint中獲取恢復,因此從100開始 //總結:source沒有使用manage state狀態丟失,windows使用manage state,異常狀態不丟失 //問: 1. state.value()在open()方法中調用的時候,會拋出null異常,而在apply中使用就不會拋出異常。爲何? // 2. 爲何source裏面沒有open方法?source想使用state該桌面操做? env.addSource(new SourceFunction<Tuple3<Integer,String,Integer>>() { private Boolean isRunning = true; private int count = 0; @Override public void run(SourceContext<Tuple3<Integer, String, Integer>> sourceContext) throws Exception { while(isRunning){ for (int i = 0; i < 10; i++) { sourceContext.collect(Tuple3.of(1,"ahah",count)); count++; } if(count>100){ System.out.println("err_________________"); throw new Exception("123"); } System.out.println("source:"+count); Thread.sleep(2000); } } @Override public void cancel() { } }).keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) //窗口函數,好比是richwindowsfunction 否側沒法使用manage state .apply(new RichWindowFunction<Tuple3<Integer,String,Integer>, Integer, Tuple, TimeWindow>() { private transient ValueState<Integer> state; private int count = 0; @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple3<Integer, String, Integer>> iterable, Collector<Integer> collector) throws Exception { //從state中獲取值 count=state.value(); for(Tuple3<Integer, String, Integer> item : iterable){ count++; } //更新state值 state.update(count); System.out.println("windows:"+tuple.toString()+" "+count+" state count:"+state.value()); collector.collect(count); } //獲取state @Override public void open(Configuration parameters) throws Exception { System.out.println("##open"); ValueStateDescriptor<Integer> descriptor = new ValueStateDescriptor<Integer>( "average", // the state name TypeInformation.of(new TypeHint<Integer>() {}), // type information 0); state = getRuntimeContext().getState(descriptor); } }).print(); env.execute(); } }
咱們能夠看一下我這邊打印的日誌:
source:10 ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open windows:(1) 10 state count:10 9> 10 source:20 windows:(1) 20 state count:20 9> 20 source:30 windows:(1) 30 state count:30 9> 30 source:40 windows:(1) 40 state count:40 9> 40 source:50 windows:(1) 50 state count:50 9> 50 source:60 windows:(1) 60 state count:60 9> 60 source:70 windows:(1) 70 state count:70 9> 70 source:80 windows:(1) 80 state count:80 9> 80 source:90 windows:(1) 90 state count:90 9> 90 source:100 windows:(1) 100 state count:100 9> 100 err_________________ //拋出異常 source:10 //沒管理,從頭開始(無狀態) ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open ##open windows:(1) 110 state count:110 //flink管理了,哪裏跌倒的,從哪裏開始(有狀態) 9> 110 source:20 windows:(1) 120 state count:120 9> 120 source:30 windows:(1) 130 state count:130 9> 130 source:40 windows:(1) 140 state count:140 9> 140 source:50 windows:(1) 150 state count:150 9> 150 source:60 Disconnected from the target VM, address: '127.0.0.1:53266', transport: 'socket'
(3) 自定義state 自行實現實現checkpoint接口
實現CheckpointedFunction接口或者ListCheckpoint<T extends Serializable>接口
分析說明:
由於須要實現ListCheckpoint接口,因此source和windows處理代碼,單獨寫成了JAVA類的形似,實現邏輯和驗證方法跟manage state類似,可是在以下代碼中,Source和Window都實現了ListCheckpoint接口,也就是說,Source拋出異常的時候,Source和Window都將能夠從checkpoint中獲取歷史狀態,從而達到不丟失狀態的能力。
代碼列表:
AutoSourceWithCp.java Source代碼
WindowStatisticWithChk.java windows apply函數代碼
CheckPointMain.java 主程序,調用
package cn.crawler.mft_seconed.demo3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows; import org.apache.flink.streaming.api.windowing.time.Time; /** * 基於上面的提問,自定義一個state實現checkpoint接口 * Created by liuliang * on 2019/7/15 */ public class CheckPointMain { public static void main(String[] args) throws Exception{ StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setStateBackend(new FsStateBackend("file:///Users/liuliang/Documents/others/flinkdata/state_checkpoint")); env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); env.getCheckpointConfig().setCheckpointInterval(1000); /** * 說明: * 由於須要實現ListCheckpoint接口,因此source和windows處理代碼,單獨寫成了JAVA類的形似, * 實現邏輯和驗證方法跟manage state類似,可是在以下代碼中,Source和Window都實現了ListCheckpoint接口, * 也就是說,Source拋出異常的時候,Source和Window都將能夠從checkpoint中獲取歷史狀態,從而達到不丟失狀態的能力。 */ DataStream<Tuple4<Integer,String,String,Integer>> data = env.setParallelism(1).addSource(new AutoSourceWithCp()); env.setParallelism(1); data.keyBy(0) .window(TumblingProcessingTimeWindows.of(Time.seconds(2))) .apply(new WindowStatisticWithChk()) .print(); env.execute(); } }
package cn.crawler.mft_seconed.demo3; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import java.util.ArrayList; import java.util.List; /** * Created by liuliang * on 2019/7/15 */ public class AutoSourceWithCp extends RichSourceFunction<Tuple4<Integer,String,String,Integer>> implements ListCheckpointed<UserState> { private int count = 0; private boolean is_running = true; @Override public void run(SourceContext sourceContext) throws Exception { while(is_running){ for (int i = 0; i < 10; i++) { sourceContext.collect(Tuple4.of(1, "hello-" + count, "alphabet", count)); count++; } System.out.println("source:"+count); Thread.sleep(2000); if(count>100){ System.out.println("準備異常啦....."); throw new Exception("exception made by ourself!"); } } } @Override public void cancel() { is_running = false; } @Override public List<UserState> snapshotState(long l, long l1) throws Exception { List<UserState> listState= new ArrayList<>(); UserState state = new UserState(count); listState.add(state); System.out.println(System.currentTimeMillis()+" ############# check point :"+listState.get(0).getCount()); return listState; } @Override public void restoreState(List<UserState> list) throws Exception { count = list.get(0).getCount(); System.out.println("AutoSourceWithCp restoreState:"+count); } }
package cn.crawler.mft_seconed.demo3; import org.apache.flink.api.java.tuple.Tuple; import org.apache.flink.api.java.tuple.Tuple4; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.windowing.WindowFunction; import org.apache.flink.streaming.api.windowing.windows.TimeWindow; import org.apache.flink.streaming.api.windowing.windows.Window; import org.apache.flink.util.Collector; import java.util.ArrayList; import java.util.List; import java.util.ArrayList; /** * Created by liuliang * on 2019/7/15 */ public class WindowStatisticWithChk implements WindowFunction<Tuple4<Integer,String,String,Integer>,Integer,Tuple,TimeWindow> ,ListCheckpointed<UserState> { private int total = 0; @Override public List<UserState> snapshotState(long l, long l1) throws Exception { List<UserState> listState= new ArrayList<>(); UserState state = new UserState(total); listState.add(state); return listState; } @Override public void restoreState(List<UserState> list) throws Exception { total = list.get(0).getCount(); } @Override public void apply(Tuple tuple, TimeWindow timeWindow, Iterable<Tuple4<Integer, String, String, Integer>> iterable, Collector<Integer> collector) throws Exception { int count = 0; for(Tuple4<Integer, String, String, Integer> data : iterable){ count++; System.out.println("apply key"+tuple.toString()+" count:"+data.f3+" "+data.f0); } total = total+count; System.out.println("windows total:"+total+" count:"+count+" "); collector.collect(count); } }
package cn.crawler.mft_seconed.demo3; import lombok.AllArgsConstructor; import lombok.Builder; import lombok.Data; import lombok.NoArgsConstructor; import java.io.Serializable; /** * Created by liuliang * on 2019/7/15 */ @Data @AllArgsConstructor @NoArgsConstructor @Builder public class UserState implements Serializable { private Integer count; }
特別說明:
1.碼字不易。
2.個人代碼都放在了github上:歡迎start。https://github.com/iamcrawler...
3.本文參照了flink官方文檔以及一下文檔:
https://www.jianshu.com/p/efa...
https://blog.csdn.net/u013560...