Flink實現Kafka到Mysql的Exactly-Once

1、背景java

       最近項目中使用Flink消費kafka消息,並將消費的消息存儲到mysql中,看似一個很簡單的需求,在網上也有不少flink消費kafka的例子,但看了一圈也沒看到能解決重複消費的問題的文章,因而在flink官網中搜索此類場景的處理方式,發現官網也沒有實現flink到mysql的Exactly-Once例子,可是官網卻有相似的例子來解決端到端的僅一次消費問題。這個現成的例子就是FlinkKafkaProducer011這個類,它保證了經過FlinkKafkaProducer011發送到kafka的消息是Exactly-Once的,主要的實現方式就是繼承了TwoPhaseCommitSinkFunction這個類,關於TwoPhaseCommitSinkFunction這個類的做用能夠先看上一篇文章http://www.javashuo.com/article/p-frrxncnk-bd.htmlnode


2、實現思想mysql

      這裏簡單說下這個類的做用就是實現這個類的方法:beginTransaction、preCommit、commit、abort,達到事件(preCommit)預提交的邏輯(當事件進行本身的邏輯處理後進行預提交,若是預提交成功以後才進行真正的(commit)提交,若是預提交失敗則調用abort方法進行事件的回滾操做),結合flink的checkpoint機制,來保存topic中partition的offset。git

達到的效果我舉個例子來講明下:好比checkpoint每10s進行一次,此時用FlinkKafkaConsumer011實時消費kafka中的消息,消費並處理完消息後,進行一次預提交數據庫的操做,若是預提交沒有問題,10s後進行真正的插入數據庫操做,若是插入成功,進行一次checkpoint,flink會自動記錄消費的offset,能夠將checkpoint保存的數據放到hdfs中,若是預提交出錯,好比在5s的時候出錯了,此時Flink程序就會進入不斷的重啓中,重啓的策略能夠在配置中設置,固然下一次的checkpoint也不會作了,checkpoint記錄的仍是上一次成功消費的offset,本次消費的數據由於在checkpoint期間,消費成功,可是預提交過程當中失敗了,注意此時數據並無真正的執行插入操做,由於預提交(preCommit)失敗,提交(commit)過程也不會發生了。等你將異常數據處理完成以後,再從新啓動這個Flink程序,它會自動從上一次成功的checkpoint中繼續消費數據,以此來達到Kafka到Mysql的Exactly-Once。sql


3、具體實現代碼三個類數據庫

一、StreamDemoKafka2Mysql.javaapache

package com.fwmagic.flink.streaming;

import com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink;
import org.apache.flink.runtime.state.filesystem.FsStateBackend;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase;
import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerConfig;

import java.util.Properties;

/**
 * 消費kafka消息,sink(自定義)到mysql中,保證kafka to mysql的Exactly-Once
 */
@SuppressWarnings("all")
public class StreamDemoKafka2Mysql {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //設置並行度,爲了方便測試,查看消息的順序,這裏設置爲1,能夠更改成多並行度
        env.setParallelism(1);
        //checkpoint設置
        //每隔10s進行啓動一個檢查點【設置checkpoint的週期】
        env.enableCheckpointing(10000);
        //設置模式爲:exactly_one,僅一次語義
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
        //確保檢查點之間有1s的時間間隔【checkpoint最小間隔】
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(1000);
        //檢查點必須在10s以內完成,或者被丟棄【checkpoint超時時間】
        env.getCheckpointConfig().setCheckpointTimeout(10000);
        //同一時間只容許進行一次檢查點
        env.getCheckpointConfig().setMaxConcurrentCheckpoints(1);
        //表示一旦Flink程序被cancel後,會保留checkpoint數據,以便根據實際須要恢復到指定的checkpoint
        //env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        //設置statebackend,將檢查點保存在hdfs上面,默認保存在內存中。這裏先保存到本地
        env.setStateBackend(new FsStateBackend("file:///Users/temp/cp/"));
        //設置kafka消費參數
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hd1:9092,hd2:9092,hd3:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "flink-consumer-group1");
        //kafka分區自動發現週期
        props.put(FlinkKafkaConsumerBase.KEY_PARTITION_DISCOVERY_INTERVAL_MILLIS, "3000");

        /*SimpleStringSchema能夠獲取到kafka消息,JSONKeyValueDeserializationSchema能夠獲取都消息的key,value,metadata:topic,partition,offset等信息*/
        // FlinkKafkaConsumer011<String> kafkaConsumer011 = new FlinkKafkaConsumer011<>(topic, new SimpleStringSchema(), props);
        FlinkKafkaConsumer011<ObjectNode> kafkaConsumer011 = new FlinkKafkaConsumer011<>("demo123", new JSONKeyValueDeserializationSchema(true), props);

        //加入kafka數據源
        DataStreamSource<ObjectNode> streamSource = env.addSource(kafkaConsumer011);
        //數據傳輸到下游
        streamSource.addSink(new MySqlTwoPhaseCommitSink()).name("MySqlTwoPhaseCommitSink");
        //觸發執行
        env.execute(StreamDemoKafka2Mysql.class.getName());

    }
}


二、MySqlTwoPhaseCommitSink.javaapi

package com.fwmagic.flink.sink;

import com.fwmagic.flink.util.DBConnectUtil;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.typeutils.base.VoidSerializer;
import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.Timestamp;
import java.text.SimpleDateFormat;
import java.util.Date;

/**
 * 自定義kafka to mysql,繼承TwoPhaseCommitSinkFunction,實現兩階段提交。
 * 功能:保證kafak to mysql 的Exactly-Once
 */
public class MySqlTwoPhaseCommitSink extends TwoPhaseCommitSinkFunction<ObjectNode, Connection, Void> {

    public MySqlTwoPhaseCommitSink() {
        super(new KryoSerializer<>(Connection.class, new ExecutionConfig()), VoidSerializer.INSTANCE);
    }

    /**
     * 執行數據入庫操做
     * @param connection
     * @param objectNode
     * @param context
     * @throws Exception
     */
    @Override
    protected void invoke(Connection connection, ObjectNode objectNode, Context context) throws Exception {
        System.err.println("start invoke.......");
        String date = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").format(new Date());
        System.err.println("===>date:" + date + " " + objectNode);
        String value = objectNode.get("value").toString();
        String sql = "insert into `t_test` (`value`,`insert_time`) values (?,?)";
        PreparedStatement ps = connection.prepareStatement(sql);
        ps.setString(1, value);
        ps.setTimestamp(2, new Timestamp(System.currentTimeMillis()));
        //執行insert語句
        ps.execute();
        //手動製造異常
        if(Integer.parseInt(value) == 15) System.out.println(1/0);
    }

    /**
     * 獲取鏈接,開啓手動提交事物(getConnection方法中)
     * @return
     * @throws Exception
     */
    @Override
    protected Connection beginTransaction() throws Exception {
        String url = "jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8&zeroDateTimeBehavior=convertToNull&useSSL=false&autoReconnect=true";
        Connection connection = DBConnectUtil.getConnection(url, "root", "123456");
        System.err.println("start beginTransaction......."+connection);
        return connection;
    }

    /**
     * 預提交,這裏預提交的邏輯在invoke方法中
     * @param connection
     * @throws Exception
     */
    @Override
    protected void preCommit(Connection connection) throws Exception {
        System.err.println("start preCommit......."+connection);

    }

    /**
     * 若是invoke執行正常則提交事物
     * @param connection
     */
    @Override
    protected void commit(Connection connection) {
        System.err.println("start commit......."+connection);
        DBConnectUtil.commit(connection);

    }
    
    @Override
    protected void recoverAndCommit(Connection connection) {
        System.err.println("start recoverAndCommit......."+connection);

    }


    @Override
    protected void recoverAndAbort(Connection connection) {
        System.err.println("start abort recoverAndAbort......."+connection);
    }

    /**
     * 若是invoke執行異常則回滾事物,下一次的checkpoint操做也不會執行
     * @param connection
     */
    @Override
    protected void abort(Connection connection) {
        System.err.println("start abort rollback......."+connection);
        DBConnectUtil.rollback(connection);
    }

}


三、DBConnectUtil.javaide

package com.fwmagic.flink.util;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;

public class DBConnectUtil {

    /**
     * 獲取鏈接
     *
     * @param url
     * @param user
     * @param password
     * @return
     * @throws SQLException
     */
    public static Connection getConnection(String url, String user, String password) throws SQLException {
        Connection conn = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
        } catch (ClassNotFoundException e) {
            e.printStackTrace();
        }
        conn = DriverManager.getConnection(url, user, password);
        //設置手動提交
        conn.setAutoCommit(false);
        return conn;
    }

    /**
     * 提交事物
     */
    public static void commit(Connection conn) {
        if (conn != null) {
            try {
                conn.commit();
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                close(conn);
            }
        }
    }

    /**
     * 事物回滾
     *
     * @param conn
     */
    public static void rollback(Connection conn) {
        if (conn != null) {
            try {
                conn.rollback();
            } catch (SQLException e) {
                e.printStackTrace();
            } finally {
                close(conn);
            }
        }
    }

    /**
     * 關閉鏈接
     *
     * @param conn
     */
    public static void close(Connection conn) {
        if (conn != null) {
            try {
                conn.close();
            } catch (SQLException e) {
                e.printStackTrace();
            }
        }
    }
}


4、代碼測試oop

爲了方便發送消息,我用一個定時任務每秒發送一個數字,1~16,在發送到數字15以前,應該是作過一次checkpoint了,而且快要到第二次checkpoint的時間,第一次checkpoint的消費數據成功將插入數據庫中,在消費到數字15的時候,手動造一個異常,此時數據庫中應該只有第一次checkpoint後commit的數據,第二次checkpoint的數據並不會插入到數據庫中(由於預提交已經失敗,不會進行真正的提交),我實驗的日誌信息:

start invoke.......
===>date:2019-05-28 18:36:50 {"value":1,"metadata":{"offset":892,"topic":"gaga","partition":0}}
start invoke.......
===>date:2019-05-28 18:36:51 {"value":2,"metadata":{"offset":887,"topic":"gaga","partition":2}}
start invoke.......
===>date:2019-05-28 18:36:52 {"value":3,"metadata":{"offset":889,"topic":"gaga","partition":1}}
start invoke.......
===>date:2019-05-28 18:36:53 {"value":4,"metadata":{"offset":893,"topic":"gaga","partition":0}}
start invoke.......
===>date:2019-05-28 18:36:54 {"value":5,"metadata":{"offset":888,"topic":"gaga","partition":2}}
start invoke.......
===>date:2019-05-28 18:36:55 {"value":6,"metadata":{"offset":890,"topic":"gaga","partition":1}}
start invoke.......
===>date:2019-05-28 18:36:56 {"value":7,"metadata":{"offset":894,"topic":"gaga","partition":0}}
start invoke.......
===>date:2019-05-28 18:36:57 {"value":8,"metadata":{"offset":889,"topic":"gaga","partition":2}}
start preCommit.......
start beginTransaction.......
start commit.......com.mysql.jdbc.JDBC4Connection@3c5ad420
start invoke.......
===>date:2019-05-28 18:36:58 {"value":9,"metadata":{"offset":891,"topic":"gaga","partition":1}}
start invoke.......
===>date:2019-05-28 18:36:59 {"value":10,"metadata":{"offset":895,"topic":"gaga","partition":0}}
start invoke.......
===>date:2019-05-28 18:37:00 {"value":11,"metadata":{"offset":890,"topic":"gaga","partition":2}}
start invoke.......
===>date:2019-05-28 18:37:01 {"value":12,"metadata":{"offset":892,"topic":"gaga","partition":1}}
start invoke.......
===>date:2019-05-28 18:37:02 {"value":13,"metadata":{"offset":896,"topic":"gaga","partition":0}}
start invoke.......
===>date:2019-05-28 18:37:03 {"value":14,"metadata":{"offset":891,"topic":"gaga","partition":2}}
start invoke.......
===>date:2019-05-28 18:37:04 {"value":15,"metadata":{"offset":893,"topic":"gaga","partition":1}}
start abort rollback.......com.mysql.jdbc.JDBC4Connection@5f2afc1b
start commit.......com.mysql.jdbc.JDBC4Connection@71ed09a
java.lang.ArithmeticException: / by zero
	at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:36)
	at com.fwmagic.flink.sink.MySqlTwoPhaseCommitSink.invoke(MySqlTwoPhaseCommitSink.java:16)
	at org.apache.flink.streaming.api.functions.sink.TwoPhaseCommitSinkFunction.invoke(TwoPhaseCommitSinkFunction.java:228)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:56)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.pushToOperator(OperatorChain.java:579)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:554)
	at org.apache.flink.streaming.runtime.tasks.OperatorChain$CopyingChainingOutput.collect(OperatorChain.java:534)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:718)
	at org.apache.flink.streaming.api.operators.AbstractStreamOperator$CountingOutput.collect(AbstractStreamOperator.java:696)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collect(StreamSourceContexts.java:104)
	at org.apache.flink.streaming.api.operators.StreamSourceContexts$NonTimestampContext.collectWithTimestamp(StreamSourceContexts.java:111)
	at org.apache.flink.streaming.connectors.kafka.internals.AbstractFetcher.emitRecordWithTimestamp(AbstractFetcher.java:398)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka010Fetcher.emitRecord(Kafka010Fetcher.java:91)
	at org.apache.flink.streaming.connectors.kafka.internal.Kafka09Fetcher.runFetchLoop(Kafka09Fetcher.java:156)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:711)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93)
	at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57)
	at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:300)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
	at java.lang.Thread.run(Thread.java:748)

從日誌中能夠看到第一次commit時間在2019-05-28 18:36:57,成功入庫到數據爲1-8,第二次消費到數字15的時候,提交失敗,日誌最後一行發生了回滾,關閉了鏈接,而後進行conmit的時候也失敗了,消費的數據9-15不會插入到數據庫中,此時checkpoint也不會作了,checkpoint保存的仍是上一次成功消費後的offset數據。


數據庫表:t_test

CREATE TABLE `t_test` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `value` varchar(255) DEFAULT NULL,
  `insert_time` datetime DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4


表中的數據:

aaa.png


5、完整代碼地址:https://gitee.com/fang_wei/fwmagic-flink

相關文章
相關標籤/搜索