在flink中沒有現成的用來寫入MySQL的sink,可是flink提供了一個類,JDBCOutputFormat,經過這個類,若是你提供了jdbc的driver,則能夠當作sink使用。
java
JDBCOutputFormat實際上是flink的batch api,但也能夠用來做爲stream的api使用,社區也推薦經過這種方式來進行。mysql
JDBCOutputFormat用起來很簡單,只須要一個prepared statement,driver和database connection,就能夠開始使用了。git
1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() 2 .setDrivername("com.mysql.jdbc.Driver") 3 .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx") 4 .setQuery(query) 5 .finish();
以下的sql語句能夠做爲prepared statement:github
String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";
對應的表的結構:sql
1 CREATE TABLE cases 2 ( 3 caseid VARCHAR(255), 4 tracehash VARCHAR(255) 5 );
但有一點要明確,JDBCOutputFormat只能處理Row,而Row是對prepared statement的參數的一個包裝類。這意味着咱們須要將流中的case轉換爲row,經過map就能作的。數據庫
1 DataStream<Case> cases = ... 2 3 DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> { 4 Row row = new Row(2); // our prepared statement has 2 parameters 5 row.setField(0, aCase.getId()); //first parameter is case ID 6 row.setField(1, aCase.getTraceHash()); //second paramater is tracehash 7 return row; 8 });
這樣,咱們就能添加sink了:apache
1 rows.writeUsingOutputFormat(jdbcOutput);
這樣,你就能夠將數據寫入mysql了。api
可是在你在流上附加了窗口以後,可能會獲得下面的報錯:app
1 "Unknown column type for column %s. Best effort approach to set its value: %s."
由於窗口處理的類型,沒有明確的類型定義,以下修改以前的定義,顯式的指定類型:ide
1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat() 2 .setDrivername("com.mysql.jdbc.Driver") 3 .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx") 4 .setQuery(query) 5 .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types 6 .finish();
JDBCOutputFormat
has a batchInterval
, which you can specify on the JDBCOutputFormatBuilder
. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.
JDBCOutputFormat
還有一個頗有用的參數,batchInterval,見名知意,就是多少數據提交一次,儘可能高效率的向數據庫提交數據。固然還有好比timeout等其餘參數,能夠探索。
咱們經過繼承RichSinkFunction<IN>來實現自定義sink:
1 public class RichCaseSink extends RichSinkFunction<Case> { 2 3 private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) " 4 + "VALUES (?, ?) " 5 + "ON CONFLICT (caseid) DO UPDATE SET " 6 + " tracehash=?"; 7 8 private PreparedStatement statement; 9 10 11 @Override 12 public void invoke(Case aCase) throws Exception { 13 14 statement.setString(1, aCase.getId()); 15 statement.setString(2, aCase.getTraceHash()); 16 statement.setString(3, aCase.getTraceHash()); 17 statement.addBatch(); 18 statement.executeBatch(); 19 } 20 21 @Override 22 public void open(Configuration parameters) throws Exception { 23 Class.forName("com.mysql.jdbc.Driver"); 24 Connection connection = 25 DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio"); 26 27 statement = connection.prepareStatement(UPSERT_CASE); 28 } 29 30 }
這樣,就能夠在流上添加sink 了:
1 DataStream<Case> cases = ... 2 cases.addSink(new RichCaseSink());
固然,上面的實現很簡略,沒有給出批量提交或者超時提交,這個均可以很容易的添加,好比close()中關閉鏈接。
可是上面的實現中,最大的問題仍是沒有跟flink的狀態管理相結合,這個纔是重頭戲。
在checkpoint的時候保存數據,繼承接口CheckpointedFunction
:
1 @Override 2 public void snapshotState(FunctionSnapshotContext context) throws Exception { 3 long checkpointId = context.getCheckpointId(); 4 List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId); 5 if(cases == null){ 6 cases = new ArrayList<>(); 7 pendingCasesPerCheckpoint.put(checkpointId, cases); 8 } 9 cases.addAll(pendingCases); 10 pendingCases.clear(); 11 }
在消息到達的時候不插入數據,只是留存數據:
1 @Override 2 public void invoke(Case aCase) throws Exception { 3 pendingCases.add(aCase); 4 }
這樣,經過繼承CheckpointListener,咱們就能在某個checkpoint完成的時候插入數據:
1 @Override 2 public void notifyCheckpointComplete(long checkpointId) throws Exception { 3 4 Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt = 5 pendingCasesPerCheckpoint.entrySet().iterator(); 6 7 while (pendingCheckpointsIt.hasNext()) { 8 9 Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next(); 10 Long pastCheckpointId = entry.getKey(); 11 List<Case> pendingCases = entry.getValue(); 12 13 if (pastCheckpointId <= checkpointId) { 14 15 for (Case pendingCase : pendingCases) { 16 statement.setString(1, pendingCase.getId()); 17 statement.setString(2, pendingCase.getTraceHash()); 18 statement.setString(3, pendingCase.getTraceHash()); 19 statement.addBatch(); 20 } 21 pendingCheckpointsIt.remove(); 22 } 23 } 24 statement.executeBatch(); 25 26 }
前提,是須要設置checkpoint,好比:
ExecutionEnvironment env = ...
env.enableCheckpointing(10000L);
這樣,每隔10s,當一個checkpoint作成功,就會插入一次數據。
固然,上面的代碼驗證可用,但不建議在生產環境使用,生產環境須要考慮更多的問題。