《從0到1學習Flink》—— Flink 讀取 Kafka 數據批量寫入到 MySQL

<!-- more -->java

前言

以前其實在 《從0到1學習Flink》—— 如何自定義 Data Sink ? 文章中其實已經寫了點將數據寫入到 MySQL,可是一些配置化的東西當時是寫死的,不可以通用,最近知識星球裏有朋友叫我: 寫個從 kafka 中讀取數據,通過 Flink 作個預聚合,而後建立數據庫鏈接池將數據批量寫入到 mysql 的例子。mysql

因而纔有了這篇文章,更多提問和想要我寫的文章能夠在知識星球裏像我提問,我會根據提問及時回答和儘量做出文章的修改。git

準備

你須要將這兩個依賴添加到 pom.xml 中github

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.34</version>
</dependency>

讀取 kafka 數據

這裏我依舊用的之前的 student 類,本身本地起了 kafka 而後造一些測試數據,這裏咱們測試發送一條數據則 sleep 10s,意味着往 kafka 中一分鐘發 6 條數據。sql

package com.zhisheng.connectors.mysql.utils;

import com.zhisheng.common.utils.GsonUtil;
import com.zhisheng.connectors.mysql.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

/**
 * Desc: 往kafka中寫數據,可使用這個main函數進行測試
 * Created by zhisheng on 2019-02-17
 * Blog: http://www.54tianzhisheng.cn/tags/Flink/
 */
public class KafkaUtil {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "student";  //kafka topic 須要和 flink 程序用同一個 topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        for (int i = 1; i <= 100; i++) {
            Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
            ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(student));
            producer.send(record);
            System.out.println("發送數據: " + GsonUtil.toJson(student));
            Thread.sleep(10 * 1000); //發送一條數據 sleep 10s,至關於 1 分鐘 6 條
        }
        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}

從 kafka 中讀取數據,而後序列化成 student 對象。數據庫

final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("zookeeper.connect", "localhost:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");

SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
        "student",   //這個 kafka topic 須要和上面的工具類的 topic 一致
        new SimpleStringSchema(),
        props)).setParallelism(1)
        .map(string -> GsonUtil.fromJson(string, Student.class)); //,解析字符串成 student 對象

由於 RichSinkFunction 中若是 sink 一條數據到 mysql 中就會調用 invoke 方法一次,因此若是要實現批量寫的話,咱們最好在 sink 以前就把數據聚合一下。那這裏咱們開個一分鐘的窗口去聚合 Student 數據。apache

student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
    @Override
    public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
        ArrayList<Student> students = Lists.newArrayList(values);
        if (students.size() > 0) {
            System.out.println("1 分鐘內收集到 student 的數據條數是:" + students.size());
            out.collect(students);
        }
    }
});

寫入數據庫

這裏使用 DBCP 鏈接池鏈接數據庫 mysql,pom.xml 中添加依賴:bootstrap

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-dbcp2</artifactId>
    <version>2.1.1</version>
</dependency>

若是你想使用其餘的數據庫鏈接池請加入對應的依賴。api

這裏將數據寫入到 MySQL 中,依舊是和以前文章同樣繼承 RichSinkFunction 類,重寫裏面的方法:微信

package com.zhisheng.connectors.mysql.sinks;

import com.zhisheng.connectors.mysql.model.Student;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.util.List;

/**
 * Desc: 數據批量 sink 數據到 mysql
 * Created by zhisheng_tian on 2019-02-17
 * Blog: http://www.54tianzhisheng.cn/tags/Flink/
 */
public class SinkToMySQL extends RichSinkFunction<List<Student>> {
    PreparedStatement ps;
    BasicDataSource dataSource;
    private Connection connection;

    /**
     * open() 方法中創建鏈接,這樣不用每次 invoke 的時候都要創建鏈接和釋放鏈接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        dataSource = new BasicDataSource();
        connection = getConnection(dataSource);
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //關閉鏈接和釋放資源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每條數據的插入都要調用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(List<Student> value, Context context) throws Exception {
        //遍歷數據集合
        for (Student student : value) {
            ps.setInt(1, student.getId());
            ps.setString(2, student.getName());
            ps.setString(3, student.getPassword());
            ps.setInt(4, student.getAge());
            ps.addBatch();
        }
        int[] count = ps.executeBatch();//批量後執行
        System.out.println("成功了插入了" + count.length + "行數據");
    }


    private static Connection getConnection(BasicDataSource dataSource) {
        dataSource.setDriverClassName("com.mysql.jdbc.Driver");
        //注意,替換成本身本地的 mysql 數據庫地址和用戶名、密碼
        dataSource.setUrl("jdbc:mysql://localhost:3306/test");
        dataSource.setUsername("root");
        dataSource.setPassword("root123456");
        //設置鏈接池的一些參數
        dataSource.setInitialSize(10);
        dataSource.setMaxTotal(50);
        dataSource.setMinIdle(2);

        Connection con = null;
        try {
            con = dataSource.getConnection();
            System.out.println("建立鏈接池:" + con);
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage());
        }
        return con;
    }
}

核心類 Main

核心程序以下:

public class Main {
    public static void main(String[] args) throws Exception{
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
                "student",   //這個 kafka topic 須要和上面的工具類的 topic 一致
                new SimpleStringSchema(),
                props)).setParallelism(1)
                .map(string -> GsonUtil.fromJson(string, Student.class)); //
        student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
            @Override
            public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception {
                ArrayList<Student> students = Lists.newArrayList(values);
                if (students.size() > 0) {
                    System.out.println("1 分鐘內收集到 student 的數據條數是:" + students.size());
                    out.collect(students);
                }
            }
        }).addSink(new SinkToMySQL());

        env.execute("flink learning connectors kafka");
    }
}

運行項目

運行 Main 類後再運行 KafkaUtils.java 類!

下圖是往 Kafka 中發送的數據:

下圖是運行 Main 類的日誌,會建立 4 個鏈接池是由於默認的 4 個並行度,你若是在 addSink 這個算子設置並行度爲 1 的話就會建立一個鏈接池:

下圖是批量插入數據庫的結果:

總結

本文從知識星球一位朋友的疑問來寫的,應該都知足了他的條件(批量/數據庫鏈接池/寫入mysql),的確網上不少的例子都是簡單的 demo 形式,都是單條數據就建立數據庫鏈接插入 MySQL,若是要寫的數據量很大的話,會對 MySQL 的寫有很大的壓力。這也是我以前在 《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch 中,數據寫 ES 強調過的,若是要提升性能一定要批量的寫。就拿咱們如今這篇文章來講,若是數據量大的話,聚合一分鐘數據達萬條,那麼這樣批量寫會比來一條寫一條性能提升不知道有多少。

本文原創地址是: http://www.54tianzhisheng.cn/2019/01/15/Flink-MySQL-sink/ , 未經容許禁止轉載。

關注我

微信公衆號:zhisheng

另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號了。你能夠加個人微信:zhisheng_tian,而後回覆關鍵字:Flink 便可無條件獲取到。

更多私密資料請加入知識星球!

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客。

本文的項目代碼在 https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-connectors/flink-learning-connectors-mysql

相關文章

一、《從0到1學習Flink》—— Apache Flink 介紹

二、《從0到1學習Flink》—— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門

三、《從0到1學習Flink》—— Flink 配置文件詳解

四、《從0到1學習Flink》—— Data Source 介紹

五、《從0到1學習Flink》—— 如何自定義 Data Source ?

六、《從0到1學習Flink》—— Data Sink 介紹

七、《從0到1學習Flink》—— 如何自定義 Data Sink ?

八、《從0到1學習Flink》—— Flink Data transformation(轉換)

九、《從0到1學習Flink》—— 介紹Flink中的Stream Windows

十、《從0到1學習Flink》—— Flink 中的幾種 Time 詳解

十一、《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch

十二、《從0到1學習Flink》—— Flink 項目如何運行?

1三、《從0到1學習Flink》—— Flink 寫入數據到 Kafka

1四、《從0到1學習Flink》—— Flink JobManager 高可用性配置

1五、《從0到1學習Flink》—— Flink parallelism 和 Slot 介紹

1六、《從0到1學習Flink》—— Flink 讀取 Kafka 數據批量寫入到 MySQL

相關文章
相關標籤/搜索