Flink 從 0 到 1 學習 —— 如何自定義 Data Sink ?

前言

前篇文章 《從0到1學習Flink》—— Data Sink 介紹 介紹了 Flink Data Sink,也介紹了 Flink 自帶的 Sink,那麼如何自定義本身的 Sink 呢?這篇文章將寫一個 demo 教你們將從 Kafka Source 的數據 Sink 到 MySQL 中去。html

<!--more-->java

準備工做

咱們先來看下 Flink 從 Kafka topic 中獲取數據的 demo,首先你須要安裝好了 FLink 和 Kafka 。mysql

運行啓動 Flink、Zookepeer、Kafka,git

好了,都啓動了!github

數據庫建表

DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

實體類

Student.javasql

package com.zhisheng.flink.model;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Student {
    public int id;
    public String name;
    public String password;
    public int age;

    public Student() {
    }

    public Student(int id, String name, String password, int age) {
        this.id = id;
        this.name = name;
        this.password = password;
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", password='" + password + '\'' +
                ", age=" + age +
                '}';
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}

工具類

工具類往 kafka topic student 發送數據數據庫

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import com.zhisheng.flink.model.Student;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 往kafka中寫數據
 * 可使用這個main函數進行測試一下
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class KafkaUtils2 {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "student";  //kafka topic 須要和 flink 程序用同一個 topic

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        for (int i = 1; i <= 100; i++) {
            Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i);
            ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(student));
            producer.send(record);
            System.out.println("發送數據: " + JSON.toJSONString(student));
        }
        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        writeToKafka();
    }
}

SinkToMySQL

該類就是 Sink Function,繼承了 RichSinkFunction ,而後重寫了裏面的方法。在 invoke 方法中將數據插入到 MySQL 中。apache

package com.zhisheng.flink.sink;

import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class SinkToMySQL extends RichSinkFunction<Student> {
    PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中創建鏈接,這樣不用每次 invoke 的時候都要創建鏈接和釋放鏈接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);";
        ps = this.connection.prepareStatement(sql);
    }

    @Override
    public void close() throws Exception {
        super.close();
        //關閉鏈接和釋放資源
        if (connection != null) {
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * 每條數據的插入都要調用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(Student value, Context context) throws Exception {
        //組裝數據,執行插入操做
        ps.setInt(1, value.getId());
        ps.setString(2, value.getName());
        ps.setString(3, value.getPassword());
        ps.setInt(4, value.getAge());
        ps.executeUpdate();
    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
        }
        return con;
    }
}

Flink 程序

這裏的 source 是從 kafka 讀取數據的,而後 Flink 從 Kafka 讀取到數據(JSON)後用阿里 fastjson 來解析成 student 對象,而後在 addSink 中使用咱們建立的 SinkToMySQL,這樣就能夠把數據存儲到 MySQL 了。json

package com.zhisheng.flink;

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Student;
import com.zhisheng.flink.sink.SinkToMySQL;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.PrintSinkFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;

import java.util.Properties;

/**
 * Desc:
 * weixin: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Main3 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest");

        SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>(
                "student",   //這個 kafka topic 須要和上面的工具類的 topic 一致
                new SimpleStringSchema(),
                props)).setParallelism(1)
                .map(string -> JSON.parseObject(string, Student.class)); //Fastjson 解析字符串成 student 對象

        student.addSink(new SinkToMySQL()); //數據 sink 到 mysql

        env.execute("Flink add sink");
    }
}

結果

運行 Flink 程序,而後再運行 KafkaUtils2.java 工具類,這樣就能夠了。bootstrap

若是數據插入成功了,那麼咱們查看下咱們的數據庫:

數據庫中已經插入了 100 條咱們從 Kafka 發送的數據了。證實咱們的 SinkToMySQL 起做用了。是否是很簡單?

項目結構

怕你們不知道個人項目結構,這裏發個截圖看下:

最後

本文主要利用一個 demo,告訴你們如何自定義 Sink Function,將從 Kafka 的數據 Sink 到 MySQL 中,若是你項目中有其餘的數據來源,你也能夠換成對應的 Source,也有可能你的 Sink 是到其餘的地方或者其餘不一樣的方式,那麼依舊是這個套路:繼承 RichSinkFunction 抽象類,重寫 invoke 方法。

關注我

轉載請務必註明原創地址爲:http://www.54tianzhisheng.cn/2018/10/31/flink-create-sink/

微信公衆號:zhisheng

另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號(zhisheng)了,你能夠回覆關鍵字:Flink 便可無條件獲取到。另外也能夠加我微信 你能夠加個人微信:yuanblog_tzs,探討技術!

更多私密資料請加入知識星球!

Github 代碼倉庫

https://github.com/zhisheng17/flink-learning/

之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客

博客

一、Flink 從0到1學習 —— Apache Flink 介紹

二、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門

三、Flink 從0到1學習 —— Flink 配置文件詳解

四、Flink 從0到1學習 —— Data Source 介紹

五、Flink 從0到1學習 —— 如何自定義 Data Source ?

六、Flink 從0到1學習 —— Data Sink 介紹

七、Flink 從0到1學習 —— 如何自定義 Data Sink ?

八、Flink 從0到1學習 —— Flink Data transformation(轉換)

九、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows

十、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解

十一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 ElasticSearch

十二、Flink 從0到1學習 —— Flink 項目如何運行?

1三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Kafka

1四、Flink 從0到1學習 —— Flink JobManager 高可用性配置

1五、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹

1六、Flink 從0到1學習 —— Flink 讀取 Kafka 數據批量寫入到 MySQL

1七、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RabbitMQ

1八、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HBase

1九、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HDFS

20、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Redis

2一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Cassandra

2二、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Flume

2三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 InfluxDB

2四、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RocketMQ

2五、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裏去了

2六、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裏去了

2七、阿里巴巴開源的 Blink 實時計算框架真香

2八、Flink 從0到1學習 —— Flink 中如何管理配置?

2九、Flink 從0到1學習—— Flink 不能夠連續 Split(分流)?

30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文

3一、Flink 架構、原理與部署測試

3二、爲何說流處理即將來?

3三、OPPO 數據中臺之基石:基於 Flink SQL 構建實時數據倉庫

3四、流計算框架 Flink 與 Storm 的性能對比

3五、Flink狀態管理和容錯機制介紹

3六、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理

3七、360深度實踐:Flink與Storm協議級對比

3八、如何基於Flink+TensorFlow打造實時智能異常檢測平臺?只看這一篇就夠了

3九、Apache Flink 1.9 重大特性提早解讀

40、Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)

4一、Flink 靈魂兩百問,這誰頂得住?

4二、Flink 從0到1學習 —— 如何使用 Side Output 來分流?

4三、你公司到底需不須要引入實時計算引擎?

4四、一文讓你完全瞭解大數據實時計算引擎 Flink

源碼解析

一、Flink 源碼解析 —— 源碼編譯運行

二、Flink 源碼解析 —— 項目結構一覽

三、Flink 源碼解析—— local 模式啓動流程

四、Flink 源碼解析 —— standalone session 模式啓動流程

五、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Job Manager 啓動

六、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Task Manager 啓動

七、Flink 源碼解析 —— 分析 Batch WordCount 程序的執行過程

八、Flink 源碼解析 —— 分析 Streaming WordCount 程序的執行過程

九、Flink 源碼解析 —— 如何獲取 JobGraph?

十、Flink 源碼解析 —— 如何獲取 StreamGraph?

十一、Flink 源碼解析 —— Flink JobManager 有什麼做用?

十二、Flink 源碼解析 —— Flink TaskManager 有什麼做用?

1三、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程

1四、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程

1五、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制

1六、Flink 源碼解析 —— 深度解析 Flink 序列化機制

1七、Flink 源碼解析 —— 深度解析 Flink 是如何管理好內存的?

1八、Flink Metrics 源碼解析 —— Flink-metrics-core

1九、Flink Metrics 源碼解析 —— Flink-metrics-datadog

20、Flink Metrics 源碼解析 —— Flink-metrics-dropwizard

2一、Flink Metrics 源碼解析 —— Flink-metrics-graphite

2二、Flink Metrics 源碼解析 —— Flink-metrics-influxdb

2三、Flink Metrics 源碼解析 —— Flink-metrics-jmx

2四、Flink Metrics 源碼解析 —— Flink-metrics-slf4j

2五、Flink Metrics 源碼解析 —— Flink-metrics-statsd

2六、Flink Metrics 源碼解析 —— Flink-metrics-prometheus

2六、Flink Annotations 源碼解析

2七、Flink 源碼解析 —— 如何獲取 ExecutionGraph ?

2八、大數據重磅炸彈——實時計算框架 Flink

2九、Flink Checkpoint-輕量級分佈式快照

30、Flink Clients 源碼解析 原文出處:zhisheng的博客,歡迎關注個人公衆號:zhisheng

原文出處:https://www.cnblogs.com/zhisheng/p/11562566.html

相關文章
相關標籤/搜索