以前其實在 《從0到1學習Flink》—— 如何自定義 Data Sink ? 文章中其實已經寫了點將數據寫入到 MySQL,可是一些配置化的東西當時是寫死的,不可以通用,最近知識星球裏有朋友叫我: 寫個從 kafka 中讀取數據,通過 Flink 作個預聚合,而後建立數據庫鏈接池將數據批量寫入到 mysql 的例子。java
因而纔有了這篇文章,更多提問和想要我寫的文章能夠在知識星球裏像我提問,我會根據提問及時回答和儘量做出文章的修改。mysql
你須要將這兩個依賴添加到 pom.xml 中sql
1 2 3 4 5 |
<dependency> <groupId>mysql</groupId> <artifactId>mysql-connector-java</artifactId> <version>5.1.34</version> </dependency> |
這裏我依舊用的之前的 student 類,本身本地起了 kafka 而後造一些測試數據,這裏咱們測試發送一條數據則 sleep 10s,意味着往 kafka 中一分鐘發 6 條數據。數據庫
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
package com.zhisheng.connectors.mysql.utils; import com.zhisheng.common.utils.GsonUtil; import com.zhisheng.connectors.mysql.model.Student; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import java.util.Properties; /** * Desc: 往kafka中寫數據,能夠使用這個main函數進行測試 * Created by zhisheng on 2019-02-17 * Blog: http://www.54tianzhisheng.cn/tags/Flink/ */ public class KafkaUtil { public static final String broker_list = "localhost:9092"; public static final String topic = "student"; //kafka topic 須要和 flink 程序用同一個 topic public static void writeToKafka() throws InterruptedException { Properties props = new Properties(); props.put("bootstrap.servers", broker_list); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer producer = new KafkaProducer<String, String>(props); for (int i = 1; i <= 100; i++) { Student student = new Student(i, "zhisheng" + i, "password" + i, 18 + i); ProducerRecord record = new ProducerRecord<String, String>(topic, null, null, GsonUtil.toJson(student)); producer.send(record); System.out.println("發送數據: " + GsonUtil.toJson(student)); Thread.sleep(10 * 1000); //發送一條數據 sleep 10s,至關於 1 分鐘 6 條 } producer.flush(); } public static void main(String[] args) throws InterruptedException { writeToKafka(); } } |
從 kafka 中讀取數據,而後序列化成 student 對象。apache
1 2 3 4 5 6 7 8 9 10 11 12 13 14 |
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>( "student", //這個 kafka topic 須要和上面的工具類的 topic 一致 new SimpleStringSchema(), props)).setParallelism(1) .map(string -> GsonUtil.fromJson(string, Student.class)); //,解析字符串成 student 對象 |
由於 RichSinkFunction 中若是 sink 一條數據到 mysql 中就會調用 invoke 方法一次,因此若是要實現批量寫的話,咱們最好在 sink 以前就把數據聚合一下。那這裏咱們開個一分鐘的窗口去聚合 Student 數據。bootstrap
1 2 3 4 5 6 7 8 9 10 |
student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception { ArrayList<Student> students = Lists.newArrayList(values); if (students.size() > 0) { System.out.println("1 分鐘內收集到 student 的數據條數是:" + students.size()); out.collect(students); } } }); |
這裏使用 DBCP 鏈接池鏈接數據庫 mysql,pom.xml 中添加依賴:api
1 2 3 4 5 |
<dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-dbcp2</artifactId> <version>2.1.1</version> </dependency> |
若是你想使用其餘的數據庫鏈接池請加入對應的依賴。app
這裏將數據寫入到 MySQL 中,依舊是和以前文章同樣繼承 RichSinkFunction 類,重寫裏面的方法:ide
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
package com.zhisheng.connectors.mysql.sinks; import com.zhisheng.connectors.mysql.model.Student; import org.apache.commons.dbcp2.BasicDataSource; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import javax.sql.DataSource; import java.sql.Connection; import java.sql.DriverManager; import java.sql.PreparedStatement; import java.util.List; /** * Desc: 數據批量 sink 數據到 mysql * Created by zhisheng_tian on 2019-02-17 * Blog: http://www.54tianzhisheng.cn/tags/Flink/ */ public class SinkToMySQL extends RichSinkFunction<List<Student>> { PreparedStatement ps; BasicDataSource dataSource; private Connection connection; /** * open() 方法中創建鏈接,這樣不用每次 invoke 的時候都要創建鏈接和釋放鏈接 * * @param parameters * @throws Exception */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); dataSource = new BasicDataSource(); connection = getConnection(dataSource); String sql = "insert into Student(id, name, password, age) values(?, ?, ?, ?);"; ps = this.connection.prepareStatement(sql); } @Override public void close() throws Exception { super.close(); //關閉鏈接和釋放資源 if (connection != null) { connection.close(); } if (ps != null) { ps.close(); } } /** * 每條數據的插入都要調用一次 invoke() 方法 * * @param value * @param context * @throws Exception */ @Override public void invoke(List<Student> value, Context context) throws Exception { //遍歷數據集合 for (Student student : value) { ps.setInt(1, student.getId()); ps.setString(2, student.getName()); ps.setString(3, student.getPassword()); ps.setInt(4, student.getAge()); ps.addBatch(); } int[] count = ps.executeBatch();//批量後執行 System.out.println("成功了插入了" + count.length + "行數據"); } private static Connection getConnection(BasicDataSource dataSource) { dataSource.setDriverClassName("com.mysql.jdbc.Driver"); //注意,替換成本身本地的 mysql 數據庫地址和用戶名、密碼 dataSource.setUrl("jdbc:mysql://localhost:3306/test"); dataSource.setUsername("root"); dataSource.setPassword("root123456"); //設置鏈接池的一些參數 dataSource.setInitialSize(10); dataSource.setMaxTotal(50); dataSource.setMinIdle(2); Connection con = null; try { con = dataSource.getConnection(); System.out.println("建立鏈接池:" + con); } catch (Exception e) { System.out.println("-----------mysql get connection has exception , msg = " + e.getMessage()); } return con; } } |
核心程序以下:函數
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 |
public class Main { public static void main(String[] args) throws Exception{ final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("zookeeper.connect", "localhost:2181"); props.put("group.id", "metric-group"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("auto.offset.reset", "latest"); SingleOutputStreamOperator<Student> student = env.addSource(new FlinkKafkaConsumer011<>( "student", //這個 kafka topic 須要和上面的工具類的 topic 一致 new SimpleStringSchema(), props)).setParallelism(1) .map(string -> GsonUtil.fromJson(string, Student.class)); // student.timeWindowAll(Time.minutes(1)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() { @Override public void apply(TimeWindow window, Iterable<Student> values, Collector<List<Student>> out) throws Exception { ArrayList<Student> students = Lists.newArrayList(values); if (students.size() > 0) { System.out.println("1 分鐘內收集到 student 的數據條數是:" + students.size()); out.collect(students); } } }).addSink(new SinkToMySQL()); env.execute("flink learning connectors kafka"); } } |
運行 Main 類後再運行 KafkaUtils.java 類!
下圖是往 Kafka 中發送的數據:
下圖是運行 Main 類的日誌,會建立 4 個鏈接池是由於默認的 4 個並行度,你若是在 addSink 這個算子設置並行度爲 1 的話就會建立一個鏈接池:
下圖是批量插入數據庫的結果:
本文從知識星球一位朋友的疑問來寫的,應該都知足了他的條件(批量/數據庫鏈接池/寫入mysql),的確網上不少的例子都是簡單的 demo 形式,都是單條數據就建立數據庫鏈接插入 MySQL,若是要寫的數據量很大的話,會對 MySQL 的寫有很大的壓力。這也是我以前在 《從0到1學習Flink》—— Flink 寫入數據到 ElasticSearch 中,數據寫 ES 強調過的,若是要提升性能一定要批量的寫。就拿咱們如今這篇文章來講,若是數據量大的話,聚合一分鐘數據達萬條,那麼這樣批量寫會比來一條寫一條性能提升不知道有多少。