Flink 從 0 到 1 學習 —— 如何自定義 Data Source ?

前言

《從0到1學習Flink》—— Data Source 介紹 文章中,我給你們介紹了 Flink Data Source 以及簡短的介紹了一下自定義 Data Source,這篇文章更詳細的介紹下,並寫一個 demo 出來讓你們理解。java

Flink Kafka source

準備工做

咱們先來看下 Flink 從 Kafka topic 中獲取數據的 demo,首先你須要安裝好了 FLink 和 Kafka 。mysql

運行啓動 Flink、Zookepeer、Kafka,git

好了,都啓動了!github

maven 依賴

<!--flink java-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-java</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
    <scope>provided</scope>
</dependency>
<!--日誌-->
<dependency>
    <groupId>org.slf4j</groupId>
    <artifactId>slf4j-log4j12</artifactId>
    <version>1.7.7</version>
    <scope>runtime</scope>
</dependency>
<dependency>
    <groupId>log4j</groupId>
    <artifactId>log4j</artifactId>
    <version>1.2.17</version>
    <scope>runtime</scope>
</dependency>
<!--flink kafka connector-->
<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka-0.11_${scala.binary.version}</artifactId>
    <version>${flink.version}</version>
</dependency>
<!--alibaba fastjson-->
<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.51</version>
</dependency>複製代碼

測試發送數據到 kafka topic

實體類,Metric.javasql

package com.zhisheng.flink.model;

import java.util.Map;

/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Metric {
    public String name;
    public long timestamp;
    public Map<String, Object> fields;
    public Map<String, String> tags;

    public Metric() {
    }

    public Metric(String name, long timestamp, Map<String, Object> fields, Map<String, String> tags) {
        this.name = name;
        this.timestamp = timestamp;
        this.fields = fields;
        this.tags = tags;
    }

    @Override
    public String toString() {
        return "Metric{" +
                "name='" + name + '\'' +
                ", timestamp='" + timestamp + '\'' +
                ", fields=" + fields +
                ", tags=" + tags +
                '}';
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public long getTimestamp() {
        return timestamp;
    }

    public void setTimestamp(long timestamp) {
        this.timestamp = timestamp;
    }

    public Map<String, Object> getFields() {
        return fields;
    }

    public void setFields(Map<String, Object> fields) {
        this.fields = fields;
    }

    public Map<String, String> getTags() {
        return tags;
    }

    public void setTags(Map<String, String> tags) {
        this.tags = tags;
    }
}複製代碼

往 kafka 中寫數據工具類:KafkaUtils.java數據庫

import com.alibaba.fastjson.JSON;
import com.zhisheng.flink.model.Metric;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

/**
 * 往kafka中寫數據
 * 可使用這個main函數進行測試一下
 * weixin: zhisheng_tian 
 * blog: http://www.54tianzhisheng.cn/
 */
public class KafkaUtils {
    public static final String broker_list = "localhost:9092";
    public static final String topic = "metric";  // kafka topic,Flink 程序中須要和這個統一 

    public static void writeToKafka() throws InterruptedException {
        Properties props = new Properties();
        props.put("bootstrap.servers", broker_list);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //key 序列化
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //value 序列化
        KafkaProducer producer = new KafkaProducer<String, String>(props);

        Metric metric = new Metric();
        metric.setTimestamp(System.currentTimeMillis());
        metric.setName("mem");
        Map<String, String> tags = new HashMap<>();
        Map<String, Object> fields = new HashMap<>();

        tags.put("cluster", "zhisheng");
        tags.put("host_ip", "101.147.022.106");

        fields.put("used_percent", 90d);
        fields.put("max", 27244873d);
        fields.put("used", 17244873d);
        fields.put("init", 27244873d);

        metric.setTags(tags);
        metric.setFields(fields);

        ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, JSON.toJSONString(metric));
        producer.send(record);
        System.out.println("發送數據: " + JSON.toJSONString(metric));

        producer.flush();
    }

    public static void main(String[] args) throws InterruptedException {
        while (true) {
            Thread.sleep(300);
            writeToKafka();
        }
    }
}複製代碼

運行:apache

若是出現如上圖標記的,即表明可以不斷的往 kafka 發送數據的。json

Flink 程序

Main.javabootstrap

package com.zhisheng.flink;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer011;

import java.util.Properties;

/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Main {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        Properties props = new Properties();
        props.put("bootstrap.servers", "localhost:9092");
        props.put("zookeeper.connect", "localhost:2181");
        props.put("group.id", "metric-group");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");  //key 反序列化
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("auto.offset.reset", "latest"); //value 反序列化

        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer011<>(
                "metric",  //kafka topic
                new SimpleStringSchema(),  // String 序列化
                props)).setParallelism(1);

        dataStreamSource.print(); //把從 kafka 讀取到的數據打印在控制檯

        env.execute("Flink add data source");
    }
}複製代碼

運行起來:api

看到沒程序,Flink 程序控制臺可以源源不斷的打印數據呢。

自定義 Source

上面就是 Flink 自帶的 Kafka source,那麼接下來就模仿着寫一個從 MySQL 中讀取數據的 Source。

首先 pom.xml 中添加 MySQL 依賴

<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.34</version>
</dependency>複製代碼

數據庫建表以下:

DROP TABLE IF EXISTS `student`;
CREATE TABLE `student` (
  `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
  `name` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `password` varchar(25) COLLATE utf8_bin DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;複製代碼

插入數據

INSERT INTO `student` VALUES ('1', 'zhisheng01', '123456', '18'), ('2', 'zhisheng02', '123', '17'), ('3', 'zhisheng03', '1234', '18'), ('4', 'zhisheng04', '12345', '16');
COMMIT;複製代碼

新建實體類:Student.java

package com.zhisheng.flink.model;

/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Student {
    public int id;
    public String name;
    public String password;
    public int age;

    public Student() {
    }

    public Student(int id, String name, String password, int age) {
        this.id = id;
        this.name = name;
        this.password = password;
        this.age = age;
    }

    @Override
    public String toString() {
        return "Student{" +
                "id=" + id +
                ", name='" + name + '\'' +
                ", password='" + password + '\'' +
                ", age=" + age +
                '}';
    }

    public int getId() {
        return id;
    }

    public void setId(int id) {
        this.id = id;
    }

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getPassword() {
        return password;
    }

    public void setPassword(String password) {
        this.password = password;
    }

    public int getAge() {
        return age;
    }

    public void setAge(int age) {
        this.age = age;
    }
}複製代碼

新建 Source 類 SourceFromMySQL.java,該類繼承 RichSourceFunction ,實現裏面的 open、close、run、cancel 方法:

package com.zhisheng.flink.source;

import com.zhisheng.flink.model.Student;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;


/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class SourceFromMySQL extends RichSourceFunction<Student> {

    PreparedStatement ps;
    private Connection connection;

    /**
     * open() 方法中創建鏈接,這樣不用每次 invoke 的時候都要創建鏈接和釋放鏈接。
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();
        String sql = "select * from Student;";
        ps = this.connection.prepareStatement(sql);
    }

    /**
     * 程序執行完畢就能夠進行,關閉鏈接和釋放資源的動做了
     *
     * @throws Exception
     */
    @Override
    public void close() throws Exception {
        super.close();
        if (connection != null) { //關閉鏈接和釋放資源
            connection.close();
        }
        if (ps != null) {
            ps.close();
        }
    }

    /**
     * DataStream 調用一次 run() 方法用來獲取數據
     *
     * @param ctx
     * @throws Exception
     */
    @Override
    public void run(SourceContext<Student> ctx) throws Exception {
        ResultSet resultSet = ps.executeQuery();
        while (resultSet.next()) {
            Student student = new Student(
                    resultSet.getInt("id"),
                    resultSet.getString("name").trim(),
                    resultSet.getString("password").trim(),
                    resultSet.getInt("age"));
            ctx.collect(student);
        }
    }

    @Override
    public void cancel() {
    }

    private static Connection getConnection() {
        Connection con = null;
            try {
                Class.forName("com.mysql.jdbc.Driver");
                con = DriverManager.getConnection("jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=UTF-8", "root", "root123456");
            } catch (Exception e) {
                System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
            }
        return con;
    }
}複製代碼

Flink 程序

package com.zhisheng.flink;

import com.zhisheng.flink.source.SourceFromMySQL;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * Desc:
 * weixi: zhisheng_tian
 * blog: http://www.54tianzhisheng.cn/
 */
public class Main2 {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.addSource(new SourceFromMySQL()).print();

        env.execute("Flink add data sourc");
    }
}複製代碼

運行 Flink 程序,控制檯日誌中能夠看見打印的 student 信息。

RichSourceFunction

從上面自定義的 Source 能夠看到咱們繼承的就是這個 RichSourceFunction 類,那麼來了解一下:

一個抽象類,繼承自 AbstractRichFunction。爲實現一個 Rich SourceFunction 提供基礎能力。該類的子類有三個,兩個是抽象類,在此基礎上提供了更具體的實現,另外一個是 ContinuousFileMonitoringFunction。

  • MessageAcknowledgingSourceBase :它針對的是數據源是消息隊列的場景而且提供了基於 ID 的應答機制。
  • MultipleIdsMessageAcknowledgingSourceBase : 在 MessageAcknowledgingSourceBase 的基礎上針對 ID 應答機制進行了更爲細分的處理,支持兩種 ID 應答模型:session id 和 unique message id。
  • ContinuousFileMonitoringFunction:這是單個(非並行)監視任務,它接受 FileInputFormat,而且根據 FileProcessingMode 和 FilePathFilter,它負責監視用戶提供的路徑;決定應該進一步讀取和處理哪些文件;建立與這些文件對應的 FileInputSplit 拆分,將它們分配給下游任務以進行進一步處理。

最後

本文主要講了下 Flink 使用 Kafka Source 的使用,並提供了一個 demo 教你們如何自定義 Source,從 MySQL 中讀取數據,固然你也能夠從其餘地方讀取,實現本身的數據源 source。可能平時工做會比這個更復雜,須要你們靈活應對!

關注我

轉載請務必註明原創地址爲:www.54tianzhisheng.cn/2018/10/30/…微信公衆號:zhisheng

另外我本身整理了些 Flink 的學習資料,目前已經所有放到微信公衆號(zhisheng)了,你能夠回覆關鍵字:Flink 便可無條件獲取到。另外也能夠加我微信 你能夠加個人微信:yuanblog_tzs,探討技術!

更多私密資料請加入知識星球!

Github 代碼倉庫

github.com/zhisheng17/…

之後這個項目的全部代碼都將放在這個倉庫裏,包含了本身學習 flink 的一些 demo 和博客

博客

一、Flink 從0到1學習 —— Apache Flink 介紹

二、Flink 從0到1學習 —— Mac 上搭建 Flink 1.6.0 環境並構建運行簡單程序入門

三、Flink 從0到1學習 —— Flink 配置文件詳解

四、Flink 從0到1學習 —— Data Source 介紹

五、Flink 從0到1學習 —— 如何自定義 Data Source ?

六、Flink 從0到1學習 —— Data Sink 介紹

七、Flink 從0到1學習 —— 如何自定義 Data Sink ?

八、Flink 從0到1學習 —— Flink Data transformation(轉換)

九、Flink 從0到1學習 —— 介紹 Flink 中的 Stream Windows

十、Flink 從0到1學習 —— Flink 中的幾種 Time 詳解

十一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 ElasticSearch

十二、Flink 從0到1學習 —— Flink 項目如何運行?

1三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Kafka

1四、Flink 從0到1學習 —— Flink JobManager 高可用性配置

1五、Flink 從0到1學習 —— Flink parallelism 和 Slot 介紹

1六、Flink 從0到1學習 —— Flink 讀取 Kafka 數據批量寫入到 MySQL

1七、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RabbitMQ

1八、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HBase

1九、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 HDFS

20、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Redis

2一、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Cassandra

2二、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 Flume

2三、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 InfluxDB

2四、Flink 從0到1學習 —— Flink 讀取 Kafka 數據寫入到 RocketMQ

2五、Flink 從0到1學習 —— 你上傳的 jar 包藏到哪裏去了

2六、Flink 從0到1學習 —— 你的 Flink job 日誌跑到哪裏去了

2七、阿里巴巴開源的 Blink 實時計算框架真香

2八、Flink 從0到1學習 —— Flink 中如何管理配置?

2九、Flink 從0到1學習—— Flink 不能夠連續 Split(分流)?

30、Flink 從0到1學習—— 分享四本 Flink 國外的書和二十多篇 Paper 論文

3一、Flink 架構、原理與部署測試

3二、爲何說流處理即將來?

3三、OPPO 數據中臺之基石:基於 Flink SQL 構建實時數據倉庫

3四、流計算框架 Flink 與 Storm 的性能對比

3五、Flink狀態管理和容錯機制介紹

3六、Apache Flink 結合 Kafka 構建端到端的 Exactly-Once 處理

3七、360深度實踐:Flink與Storm協議級對比

3八、如何基於Flink+TensorFlow打造實時智能異常檢測平臺?只看這一篇就夠了

3九、Apache Flink 1.9 重大特性提早解讀

40、Flink 全網最全資源(視頻、博客、PPT、入門、實戰、源碼解析、問答等持續更新)

4一、Flink 靈魂兩百問,這誰頂得住?

4二、Flink 從0到1學習 —— 如何使用 Side Output 來分流?

4三、你公司到底需不須要引入實時計算引擎?

4四、一文讓你完全瞭解大數據實時計算引擎 Flink

源碼解析

一、Flink 源碼解析 —— 源碼編譯運行

二、Flink 源碼解析 —— 項目結構一覽

三、Flink 源碼解析—— local 模式啓動流程

四、Flink 源碼解析 —— standalone session 模式啓動流程

五、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Job Manager 啓動

六、Flink 源碼解析 —— Standalone Session Cluster 啓動流程深度分析之 Task Manager 啓動

七、Flink 源碼解析 —— 分析 Batch WordCount 程序的執行過程

八、Flink 源碼解析 —— 分析 Streaming WordCount 程序的執行過程

九、Flink 源碼解析 —— 如何獲取 JobGraph?

十、Flink 源碼解析 —— 如何獲取 StreamGraph?

十一、Flink 源碼解析 —— Flink JobManager 有什麼做用?

十二、Flink 源碼解析 —— Flink TaskManager 有什麼做用?

1三、Flink 源碼解析 —— JobManager 處理 SubmitJob 的過程

1四、Flink 源碼解析 —— TaskManager 處理 SubmitJob 的過程

1五、Flink 源碼解析 —— 深度解析 Flink Checkpoint 機制

1六、Flink 源碼解析 —— 深度解析 Flink 序列化機制

1七、Flink 源碼解析 —— 深度解析 Flink 是如何管理好內存的?

1八、Flink Metrics 源碼解析 —— Flink-metrics-core

1九、Flink Metrics 源碼解析 —— Flink-metrics-datadog

20、Flink Metrics 源碼解析 —— Flink-metrics-dropwizard

2一、Flink Metrics 源碼解析 —— Flink-metrics-graphite

2二、Flink Metrics 源碼解析 —— Flink-metrics-influxdb

2三、Flink Metrics 源碼解析 —— Flink-metrics-jmx

2四、Flink Metrics 源碼解析 —— Flink-metrics-slf4j

2五、Flink Metrics 源碼解析 —— Flink-metrics-statsd

2六、Flink Metrics 源碼解析 —— Flink-metrics-prometheus

2六、Flink Annotations 源碼解析

2七、Flink 源碼解析 —— 如何獲取 ExecutionGraph ?

2八、大數據重磅炸彈——實時計算框架 Flink

2九、Flink Checkpoint-輕量級分佈式快照

30、Flink Clients 源碼解析原文出處:zhisheng的博客,歡迎關注個人公衆號:zhisheng

相關文章
相關標籤/搜索