1、背景java
最近項目中使用Flink消費kafka消息,並將消費的消息存儲到mysql中,看似一個很簡單的需求,在網上也有不少flink消費kafka的例子,但看了一圈也沒看到能解決重複消費的問題的文章,因而在flink官網中搜索此類場景的處理方式,發現官網也沒有實現flink到mysql的Exactly-Once例子,可是官網卻有相似的例子來解決端到端的僅一次消費問題。這個現成的例子就是FlinkKafkaProducer011這個類,它保證了經過FlinkKafkaProducer011發送到kafka的消息是Exactly-Once的,主要的實現方式就是繼承了TwoPhaseCommitSinkFunction這個類,關於TwoPhaseCommitSinkFunction這個類的做用能夠先看上一篇文章http://www.javashuo.com/article/p-frrxncnk-bd.html。node
2、實現思想mysql
這裏簡單說下這個類的做用就是實現這個類的方法:beginTransaction、preCommit、commit、abort,達到事件(preCommit)預提交的邏輯(當事件進行本身的邏輯處理後進行預提交,若是預提交成功以後才進行真正的(commit)提交,若是預提交失敗則調用abort方法進行事件的回滾操做),結合flink的checkpoint機制,來保存topic中partition的offset。git
達到的效果我舉個例子來講明下:好比checkpoint每10s進行一次,此時用FlinkKafkaConsumer011實時消費kafka中的消息,消費並處理完消息後,進行一次預提交數據庫的操做,若是預提交沒有問題,10s後進行真正的插入數據庫操做,若是插入成功,進行一次checkpoint,flink會自動記錄消費的offset,能夠將checkpoint保存的數據放到hdfs中,若是預提交出錯,好比在5s的時候出錯了,此時Flink程序就會進入不斷的重啓中,重啓的策略能夠在配置中設置,固然下一次的checkpoint也不會作了,checkpoint記錄的仍是上一次成功消費的offset,本次消費的數據由於在checkpoint期間,消費成功,可是預提交過程當中失敗了,注意此時數據並無真正的執行插入操做,由於預提交(preCommit)失敗,提交(commit)過程也不會發生了。等你將異常數據處理完成以後,再從新啓動這個Flink程序,它會自動從上一次成功的checkpoint中繼續消費數據,以此來達到Kafka到Mysql的Exactly-Once。sql
3、具體實現代碼三個類數據庫
一、StreamDemoKafka2Mysql.javaapache
package com.fwmagic.flink.streaming; import com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink; import org.apache.flink.runtime.state.filesystem.FsStateBackend; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.CheckpointingMode; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011; import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase; import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema; import org.apache.kafka.clients.consumer.ConsumerConfig; import java.util.Properties; /** * 消費kafka消息,sink(自定義)到mysql中,保證kafka to mysql的Exactly-Once */ @SuppressWarnings("all") public class StreamDemoKafka2Mysql { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //設置並行度,爲了方便測試,查看消息的順序,這裏設置爲1,能夠更改成多並行度 env.setParallelism(1); //checkpoint設置 //每隔10s進行啓動一個檢查點【設置checkpoint的週期】 env.enableCheckpointing(10000); //設置模式爲:exactly_one,僅一次語義 env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE); //確保檢查點之間有1s的時間間隔【checkpoint最小間隔】 env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000); //檢查點必須在10s以內完成,或者被丟棄【checkpoint超時時間】 env.getCheckpointConfig().setCheckpointTimeout(10000); //同一時間只容許進行一次檢查點 env.getCheckpointConfig().setMaxConcurrentCheckpoints(1); //表示一旦Flink程序被cancel後,會保留checkpoint數據,以便根據實際須要恢復到指定的checkpoint //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //設置statebackend,將檢查點保存在hdfs上面,默認保存在內存中。這裏先保存到本地 env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/")); //設置kafka消費參數 Properties props = new Properties(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hd1:9092,hd2:9092,hd3:9092"); props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1"); //kafka分區自動發現週期 props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000"); /*SimpleStringSchema能夠獲取到kafka消息,JSONKeyValueDeserializationSchema能夠獲取都消息的key,value,metadata:topic,partition,offset等信息*/ // FlinkKafkaConsumer011<String> kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props); FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 = new FlinkKafkaConsumer011<>("demo123", new JSONKeyValueDeserializationSchema(true), props); //加入kafka數據源 DataStreamSource<ObjectNode> streamSource = env.addSource(kafkaConsumer011); //數據傳輸到下游 streamSource.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink"); //觸發執行 env.execute(StreamDemoKafka2Mysql.class.getName()); } }
二、MySqlTwoPhaseCommitSink.javaapi
package com.fwmagic.flink.sink; import com.fwmagic.flink.util.DBConnectUtil; import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeutils.base.VoidSerializer; import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode; import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.Timestamp; import java.text.SimpleDateFormat; import java.util.Date; /** * 自定義kafka to mysql,繼承TwoPhaseCommitSinkFunction,實現兩階段提交。 * 功能:保證kafak to mysql 的Exactly-Once */ public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> { public MySqlTwoPhaseCommitSink() { super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE); } /** * 執行數據入庫操做 * @param connection * @param objectNode * @param context * @throws Exception */ @Override protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception { System.err.println("start invoke......."); String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date()); System.err.println("===>date:" + date + " " + objectNode); String value = objectNode.get("value").toString(); String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)"; PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, value); ps.setTimestamp(2, new Timestamp(System.currentTimeMillis())); //執行insert語句 ps.execute(); //手動製造異常 if(Integer.parseInt(value) == 15) System.out.println(1/0); } /** * 獲取鏈接,開啓手動提交事物(getConnection方法中) * @return * @throws Exception */ @Override protected Connection beginTransaction() throws Exception { String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true"; Connection connection = DBConnectUtil.getConnection(url, "root", "123456"); System.err.println("start beginTransaction......."+connection); return connection; } /** * 預提交,這裏預提交的邏輯在invoke方法中 * @param connection * @throws Exception */ @Override protected void preCommit(Connection connection) throws Exception { System.err.println("start preCommit......."+connection); } /** * 若是invoke執行正常則提交事物 * @param connection */ @Override protected void commit(Connection connection) { System.err.println("start commit......."+connection); DBConnectUtil.commit(connection); } @Override protected void recoverAndCommit(Connection connection) { System.err.println("start recoverAndCommit......."+connection); } @Override protected void recoverAndAbort(Connection connection) { System.err.println("start abort recoverAndAbort......."+connection); } /** * 若是invoke執行異常則回滾事物,下一次的checkpoint操做也不會執行 * @param connection */ @Override protected void abort(Connection connection) { System.err.println("start abort rollback......."+connection); DBConnectUtil.rollback(connection); } }
三、DBConnectUtil.javaide
package com.fwmagic.flink.util; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; public class DBConnectUtil { /** * 獲取鏈接 * * @param url * @param user * @param password * @return * @throws SQLException */ public static Connection getConnection(String url, String user, String password) throws SQLException { Connection conn = null; try { Class.forName("com.mysql.jdbc.Driver"); } catch (ClassNotFoundException e) { e.printStackTrace(); } conn = DriverManager.getConnection(url, user, password); //設置手動提交 conn.setAutoCommit(false); return conn; } /** * 提交事物 */ public static void commit(Connection conn) { if (conn != null) { try { conn.commit(); } catch (SQLException e) { e.printStackTrace(); } finally { close(conn); } } } /** * 事物回滾 * * @param conn */ public static void rollback(Connection conn) { if (conn != null) { try { conn.rollback(); } catch (SQLException e) { e.printStackTrace(); } finally { close(conn); } } } /** * 關閉鏈接 * * @param conn */ public static void close(Connection conn) { if (conn != null) { try { conn.close(); } catch (SQLException e) { e.printStackTrace(); } } } }
4、代碼測試oop
爲了方便發送消息,我用一個定時任務每秒發送一個數字,1~16,在發送到數字15以前,應該是作過一次checkpoint了,而且快要到第二次checkpoint的時間,第一次checkpoint的消費數據成功將插入數據庫中,在消費到數字15的時候,手動造一個異常,此時數據庫中應該只有第一次checkpoint後commit的數據,第二次checkpoint的數據並不會插入到數據庫中(由於預提交已經失敗,不會進行真正的提交),我實驗的日誌信息:
start invoke....... ===>date:2019-05-28 18:36:50 {"value":1,"metadata":{"offset":892,"topic":"gaga","partition":0}} start invoke....... ===>date:2019-05-28 18:36:51 {"value":2,"metadata":{"offset":887,"topic":"gaga","partition":2}} start invoke....... ===>date:2019-05-28 18:36:52 {"value":3,"metadata":{"offset":889,"topic":"gaga","partition":1}} start invoke....... ===>date:2019-05-28 18:36:53 {"value":4,"metadata":{"offset":893,"topic":"gaga","partition":0}} start invoke....... ===>date:2019-05-28 18:36:54 {"value":5,"metadata":{"offset":888,"topic":"gaga","partition":2}} start invoke....... ===>date:2019-05-28 18:36:55 {"value":6,"metadata":{"offset":890,"topic":"gaga","partition":1}} start invoke....... ===>date:2019-05-28 18:36:56 {"value":7,"metadata":{"offset":894,"topic":"gaga","partition":0}} start invoke....... ===>date:2019-05-28 18:36:57 {"value":8,"metadata":{"offset":889,"topic":"gaga","partition":2}} start preCommit....... start beginTransaction....... start commit.......com.mysql.jdbc.JDBC4Connection@3c5ad420 start invoke....... ===>date:2019-05-28 18:36:58 {"value":9,"metadata":{"offset":891,"topic":"gaga","partition":1}} start invoke....... ===>date:2019-05-28 18:36:59 {"value":10,"metadata":{"offset":895,"topic":"gaga","partition":0}} start invoke....... ===>date:2019-05-28 18:37:00 {"value":11,"metadata":{"offset":890,"topic":"gaga","partition":2}} start invoke....... ===>date:2019-05-28 18:37:01 {"value":12,"metadata":{"offset":892,"topic":"gaga","partition":1}} start invoke....... ===>date:2019-05-28 18:37:02 {"value":13,"metadata":{"offset":896,"topic":"gaga","partition":0}} start invoke....... ===>date:2019-05-28 18:37:03 {"value":14,"metadata":{"offset":891,"topic":"gaga","partition":2}} start invoke....... ===>date:2019-05-28 18:37:04 {"value":15,"metadata":{"offset":893,"topic":"gaga","partition":1}} start abort rollback.......com.mysql.jdbc.JDBC4Connection@5f2afc1b start commit.......com.mysql.jdbc.JDBC4Connection@71ed09a java.lang.ArithmeticException: / by zero at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:36) at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:16) at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228) at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554) at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718) at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104) at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111) at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398) at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91) at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:748)
從日誌中能夠看到第一次commit時間在2019-05-28 18:36:57,成功入庫到數據爲1-8,第二次消費到數字15的時候,提交失敗,日誌最後一行發生了回滾,關閉了鏈接,而後進行conmit的時候也失敗了,消費的數據9-15不會插入到數據庫中,此時checkpoint也不會作了,checkpoint保存的仍是上一次成功消費後的offset數據。
數據庫表:t_test
CREATE TABLE `t_test` ( `id` bigint(20) NOT NULL AUTO_INCREMENT, `value` varchar(255) DEFAULT NULL, `insert_time` datetime DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
表中的數據: