https://github.com/zq2599/blog_demosjava
內容:全部原創文章分類彙總及配套源碼,涉及Java、Docker、Kubernetes、DevOPS等;mysql
Flink官方提供的sink服務可能知足不了咱們的須要,此時能夠開發自定義的sink,文本就來一塊兒實戰;git
@Override public void invoke(IN record) { writer.write(record); }
本次實戰很簡單:自定義sink,用於將數據寫入MySQL,涉及的版本信息以下:程序員
若是您不想寫代碼,整個系列的源碼可在GitHub下載到,地址和連接信息以下表所示(https://github.com/zq2599/blo...:github
名稱 | 連接 | 備註 |
---|---|---|
項目主頁 | https://github.com/zq2599/blo... | 該項目在GitHub上的主頁 |
git倉庫地址(https) | https://github.com/zq2599/blo... | 該項目源碼的倉庫地址,https協議 |
git倉庫地址(ssh) | git@github.com:zq2599/blog_demos.git | 該項目源碼的倉庫地址,ssh協議 |
這個git項目中有多個文件夾,本章的應用在<font color="blue">flinksinkdemo</font>文件夾下,以下圖紅框所示:web
請您將MySQL準備好,並執行如下sql,用於建立數據庫flinkdemo和表student:sql
create database if not exists flinkdemo; USE flinkdemo; DROP TABLE IF EXISTS `student`; CREATE TABLE `student` ( `id` int(11) unsigned NOT NULL AUTO_INCREMENT, `name` varchar(25) COLLATE utf8_bin DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>8.0.11</version> </dependency>
package com.bolingcavalry.customize; public class Student { private int id; private String name; private int age; public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } public int getAge() { return age; } public void setAge(int age) { this.age = age; } public Student(String name, int age) { this.name = name; this.age = age; } }
package com.bolingcavalry.customize; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; public class MySQLSinkFunction extends RichSinkFunction<Student> { PreparedStatement preparedStatement; private Connection connection; private ReentrantLock reentrantLock = new ReentrantLock(); @Override public void open(Configuration parameters) throws Exception { super.open(parameters); //準備數據庫相關實例 buildPreparedStatement(); } @Override public void close() throws Exception { super.close(); try{ if(null!=preparedStatement) { preparedStatement.close(); preparedStatement = null; } } catch(Exception e) { e.printStackTrace(); } try{ if(null!=connection) { connection.close(); connection = null; } } catch(Exception e) { e.printStackTrace(); } } @Override public void invoke(Student value, Context context) throws Exception { preparedStatement.setString(1, value.getName()); preparedStatement.setInt(2, value.getAge()); preparedStatement.executeUpdate(); } /** * 準備好connection和preparedStatement * 獲取mysql鏈接實例,考慮多線程同步, * 不用synchronize是由於獲取數據庫鏈接是遠程操做,耗時不肯定 * @return */ private void buildPreparedStatement() { if(null==connection) { boolean hasLock = false; try { hasLock = reentrantLock.tryLock(10, TimeUnit.SECONDS); if(hasLock) { Class.forName("com.mysql.cj.jdbc.Driver"); connection = DriverManager.getConnection("jdbc:mysql://192.168.50.43:3306/flinkdemo?useUnicode=true&characterEncoding=UTF-8&useSSL=false&serverTimezone=UTC", "root", "123456"); } if(null!=connection) { preparedStatement = connection.prepareStatement("insert into student (name, age) values (?, ?)"); } } catch (Exception e) { //生產環境慎用 e.printStackTrace(); } finally { if(hasLock) { reentrantLock.unlock(); } } } } }
package com.bolingcavalry.customize; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import java.util.ArrayList; import java.util.List; public class StudentSink { public static void main(String[] args) throws Exception { final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); //並行度爲1 env.setParallelism(1); List<Student> list = new ArrayList<>(); list.add(new Student("aaa", 11)); list.add(new Student("bbb", 12)); list.add(new Student("ccc", 13)); list.add(new Student("ddd", 14)); list.add(new Student("eee", 15)); list.add(new Student("fff", 16)); env.fromCollection(list) .addSink(new MySQLSinkFunction()) .disableChaining(); env.execute("sink demo : customize mysql obj"); } }
至此,自定義sink的實戰已經完成,但願本文能給您一些參考;數據庫
微信搜索「程序員欣宸」,我是欣宸,期待與您一同暢遊Java世界...
https://github.com/zq2599/blog_demos