17-Flink消費Kafka寫入Mysql

戳更多文章:

1-Flink入門java

2-本地環境搭建&構建第一個Flink應用mysql

3-DataSet API面試

4-DataSteam APIsql

5-集羣部署json

6-分佈式緩存bootstrap

7-重啓策略c#

8-Flink中的窗口緩存

9-Flink中的Time ...bash

本文介紹消費Kafka的消息實時寫入Mysql。maven

  1. maven新增依賴:
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.39</version>
</dependency>

複製代碼

2.重寫RichSinkFunction,實現一個Mysql Sink

public class MysqlSink extends
    RichSinkFunction<Tuple3<Integer, String, Integer>> {
private Connection connection;
private PreparedStatement preparedStatement;
String username = "";
String password = "";
String drivername = "";   //配置改爲本身的配置
String dburl = "";

@Override
public void invoke(Tuple3<Integer, String, Integer> value) throws Exception {
    Class.forName(drivername);
    connection = DriverManager.getConnection(dburl, username, password);
    String sql = "replace into table(id,num,price) values(?,?,?)"; //假設mysql 有3列 id,num,price
    preparedStatement = connection.prepareStatement(sql);
    preparedStatement.setInt(1, value.f0);
    preparedStatement.setString(2, value.f1);
    preparedStatement.setInt(3, value.f2);
    preparedStatement.executeUpdate();
    if (preparedStatement != null) {
        preparedStatement.close();
    }
    if (connection != null) {
        connection.close();
    }
}
}
複製代碼
  1. Flink主類
public class MysqlSinkTest {

public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "localhost:9092");

// 1,abc,100  相似這樣的數據,固然也能夠是很複雜的json數據,去作解析
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("test", new SimpleStringSchema(), properties);
env.getConfig().disableSysoutLogging();  //設置此能夠屏蔽掉日記打印狀況
env.getConfig().setRestartStrategy(
        RestartStrategies.fixedDelayRestart(5, 5000));
env.enableCheckpointing(2000);
DataStream<String> stream = env
        .addSource(consumer);

DataStream<Tuple3<Integer, String, Integer>> sourceStream = stream.filter((FilterFunction<String>) value -> StringUtils.isNotBlank(value))
                                                                        .map((MapFunction<String, Tuple3<Integer, String, Integer>>) value -> {
    String[] args1 = value.split(",");
    return new Tuple3<Integer, String, Integer>(Integer
            .valueOf(args1[0]), args1[1],Integer
            .valueOf(args1[2]));
});

sourceStream.addSink(new MysqlSink());
env.execute("data to mysql start");
}
}

複製代碼

全部代碼,我放在了個人公衆號,回覆Flink能夠下載

  • 海量【java和大數據的面試題+視頻資料】整理在公衆號,關注後能夠下載~
  • 更多大數據技術歡迎和做者一塊兒探討~

相關文章
相關標籤/搜索