Flink被譽爲第四代大數據計算引擎組件,便可以用做基於離線分佈式計算,也能夠應用於實時計算。Flink的核心是轉化爲流進行計算。Flink三個核心:Source,Transformation,Sink。其中Source即爲Flink計算的數據源,Transformation即爲進行分佈式流式計算的算子,也是計算的核心,Sink即爲計算後的數據輸出端。Flink Source原生支持包括Kafka,ES,RabbitMQ等一些通用的消息隊列組件或基於文本的高性能非關係型數據庫。而Flink Sink寫原生也只支持相似Redis,Kafka,ES,RabbitMQ等一些通用的消息隊列組件或基於文本的高性能非關係型數據庫。而對於寫入關係型數據庫或Flink不支持的組件中,須要藉助RichSourceFunction去實現,但這部分性能是比原生的差些,雖然Flink不建議這麼作,但在大數據處理過程當中,因爲業務或技術架構的複雜性,有些特定的場景仍是須要這樣作,本篇博客就是介紹如何經過Flink RichSourceFunction來寫關係型數據庫,這裏以寫mysql爲例。java
flink基礎包
flink-jdbc包
mysql-jdbc包
mysql
package com.run; import java.sql.DriverManager; import java.sql.ResultSet; import org.apache.flink.api.java.tuple.Tuple10; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.source.RichSourceFunction; import com.mysql.jdbc.Connection; import com.mysql.jdbc.PreparedStatement; public class Flink2JdbcReader extends RichSourceFunction<Tuple10<String, String, String, String, String, String, String, String, String, String>> { private static final long serialVersionUID = 3334654984018091675L; private Connection connect = null; private PreparedStatement ps = null; /* * (non-Javadoc) * * @see org.apache.flink.api.common.functions.AbstractRichFunction#open(org. * apache.flink.configuration.Configuration) to use open database connect */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("com.mysql.jdbc.Driver"); connect = (Connection) DriverManager.getConnection("jdbc:mysql://192.168.21.11:3306", "root", "flink"); ps = (PreparedStatement) connect .prepareStatement("select col1,col2,col3,col4,col5,col6,col7,col8,col9,col10 from flink.test_tb"); } /* * (non-Javadoc) * * @see * org.apache.flink.streaming.api.functions.source.SourceFunction#run(org. * apache.flink.streaming.api.functions.source.SourceFunction.SourceContext) * to use excuted sql and return result */ @Override public void run( SourceContext<Tuple10<String, String, String, String, String, String, String, String, String, String>> collect) throws Exception { ResultSet resultSet = ps.executeQuery(); while (resultSet.next()) { Tuple10<String, String, String, String, String, String, String, String, String, String> tuple = new Tuple10<String, String, String, String, String, String, String, String, String, String>(); tuple.setFields(resultSet.getString(1), resultSet.getString(2), resultSet.getString(3), resultSet.getString(4), resultSet.getString(5), resultSet.getString(6), resultSet.getString(7), resultSet.getString(8), resultSet.getString(9), resultSet.getString(10)); collect.collect(tuple); } } /* * (non-Javadoc) * * @see * org.apache.flink.streaming.api.functions.source.SourceFunction#cancel() * colse database connect */ @Override public void cancel() { try { super.close(); if (connect != null) { connect.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { e.printStackTrace(); } } }
package com.run; import java.sql.DriverManager; import org.apache.flink.api.java.tuple.Tuple10; import org.apache.flink.configuration.Configuration; import org.apache.flink.streaming.api.functions.sink.RichSinkFunction; import com.mysql.jdbc.Connection; import com.mysql.jdbc.PreparedStatement; public class Flink2JdbcWriter extends RichSinkFunction<Tuple10<String, String, String, String, String, String, String, String, String, String>> { private static final long serialVersionUID = -8930276689109741501L; private Connection connect = null; private PreparedStatement ps = null; /* * (non-Javadoc) * * @see org.apache.flink.api.common.functions.AbstractRichFunction#open(org. * apache.flink.configuration.Configuration) get database connect */ @Override public void open(Configuration parameters) throws Exception { super.open(parameters); Class.forName("com.mysql.jdbc.Driver"); connect = (Connection) DriverManager.getConnection("jdbc:mysql://192.168.21.11:3306", "root", "flink"); ps = (PreparedStatement) connect.prepareStatement("insert into flink.test_tb1 values (?,?,?,?,?,?,?,?,?,?)"); } /* * (non-Javadoc) * * @see * org.apache.flink.streaming.api.functions.sink.SinkFunction#invoke(java. * lang.Object, * org.apache.flink.streaming.api.functions.sink.SinkFunction.Context) read * data from flink DataSet to database */ @Override public void invoke(Tuple10<String, String, String, String, String, String, String, String, String, String> value, Context context) throws Exception { ps.setString(1, value.f0); ps.setString(2, value.f1); ps.setString(3, value.f2); ps.setString(4, value.f3); ps.setString(5, value.f4); ps.setString(6, value.f5); ps.setString(7, value.f6); ps.setString(8, value.f7); ps.setString(9, value.f8); ps.setString(10, value.f9); ps.executeUpdate(); } /* * (non-Javadoc) * * @see org.apache.flink.api.common.functions.AbstractRichFunction#close() * close database connect */ @Override public void close() throws Exception { try { super.close(); if (connect != null) { connect.close(); } if (ps != null) { ps.close(); } } catch (Exception e) { e.printStackTrace(); } } }
對於Flink2JdbcReader的讀
裏面有三個方法open,run,cancel,其中open方法是創建與關係型數據庫的連接,這裏其實就是普通的jdbc連接及mysql的地址,端口,庫等信息。run方法是讀取mysql數據轉化爲Flink獨有的Tuple集合類型,能夠根據代碼看出其中的規律和Tuple8,Tuple9,Tuple10表明什麼含義。cancel就很簡單了關閉數據庫鏈接web
對於Flink2JdbcWriter的寫
裏面有三個方法open,invoke,close,其中open方法是創建與關係型數據庫的連接,這裏其實就是普通的jdbc連接及mysql的地址,端口,庫等信息。invoke方法是將flink的數據類型插入到mysql,這裏的寫法與在web程序中寫jdbc插入數據不太同樣,由於flink獨有的Tuple,能夠根據代碼看出其中的規律和Tuple8,Tuple9,Tuple10表明什麼含義。close關閉數據庫鏈接sql
package com.run; import java.util.Date; import org.apache.flink.api.java.tuple.Tuple10; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; public class FlinkReadDbWriterDb { public static void main(String[] args) throws Exception {。 final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<Tuple10<String, String, String, String, String, String, String, String, String, String>> dataStream = env .addSource(new Flink2JdbcReader()); // tranfomat dataStream.addSink(new Flink2JdbcWriter()); env.execute("Flink cost DB data to write Database"); } }
從測試代碼中能夠很清晰的看出Flink的邏輯:Source->Transformation->Sink
,能夠在addSource到addSink之間加入咱們的業務邏輯算子。同時這裏必須注意env.execute("Flink cost DB data to write Database");
這個必須有並且必需要放到結尾,不然整個代碼是不會執行的,至於爲何在後續的博客會講數據庫