flink寫入mysql的兩種方式

方式一 經過JDBCOutputFormat

在flink中沒有現成的用來寫入MySQL的sink,可是flink提供了一個類,JDBCOutputFormat,經過這個類,若是你提供了jdbc的driver,則能夠當作sink使用。java

JDBCOutputFormat實際上是flink的batch api,但也能夠用來做爲stream的api使用,社區也推薦經過這種方式來進行。mysql

JDBCOutputFormat用起來很簡單,只須要一個prepared statement,driver和database connection,就能夠開始使用了。git

1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2  .setDrivername("com.mysql.jdbc.Driver")
3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4  .setQuery(query)
5  .finish();

以下的sql語句能夠做爲prepared statement:github

String query = "INSERT INTO public.cases (caseid, tracehash) VALUES (?, ?)";

對應的表的結構:sql

1 CREATE TABLE cases
2 (
3  caseid VARCHAR(255),
4  tracehash VARCHAR(255)
5 );

但有一點要明確,JDBCOutputFormat只能處理Row,而Row是對prepared statement的參數的一個包裝類。這意味着咱們須要將流中的case轉換爲row,經過map就能作的。數據庫

1 DataStream<Case> cases = ...
2 
3   DataStream<Row> rows = cases.map((MapFunction<Case, Row>) aCase -> {
4    Row row = new Row(2); // our prepared statement has 2 parameters
5    row.setField(0, aCase.getId()); //first parameter is case ID
6    row.setField(1, aCase.getTraceHash()); //second paramater is tracehash
7    return row;
8   });

這樣,咱們就能添加sink了:apache

1 rows.writeUsingOutputFormat(jdbcOutput);

這樣,你就能夠將數據寫入mysql了。api

可是在你在流上附加了窗口以後,可能會獲得下面的報錯:app

1 "Unknown column type for column %s. Best effort approach to set its value: %s."

由於窗口處理的類型,沒有明確的類型定義,以下修改以前的定義,顯式的指定類型:ide

1 JDBCOutputFormat jdbcOutput = JDBCOutputFormat.buildJDBCOutputFormat()
2  .setDrivername("com.mysql.jdbc.Driver")
3  .setDBUrl("jdbc:mysql://localhost:1234/test?user=xxx&password=xxx")
4  .setQuery(query)
5  .setSqlTypes(new int[] { Types.VARCHAR, Types.VARCHAR }) //set the types
6  .finish();

JDBCOutputFormat has a batchInterval, which you can specify on the JDBCOutputFormatBuilder. If, however, I specify a batch interval of 5000, I would potentially never write anything to the database, or wait a very long time until anything was written.

JDBCOutputFormat 還有一個頗有用的參數,batchInterval,見名知意,就是多少數據提交一次,儘可能高效率的向數據庫提交數據。固然還有好比timeout等其餘參數,能夠探索。

方式二 經過自定義sink提交

咱們經過繼承RichSinkFunction<IN>來實現自定義sink:

 1 public class RichCaseSink extends RichSinkFunction<Case> {
 2 
 3   private static final String UPSERT_CASE = "INSERT INTO public.cases (caseid, tracehash) "
 4       + "VALUES (?, ?) "
 5       + "ON CONFLICT (caseid) DO UPDATE SET "
 6       + "  tracehash=?";
 7 
 8   private PreparedStatement statement;
 9 
10 
11   @Override
12   public void invoke(Case aCase) throws Exception {
13 
14     statement.setString(1, aCase.getId());
15     statement.setString(2, aCase.getTraceHash());
16     statement.setString(3, aCase.getTraceHash());
17     statement.addBatch();
18     statement.executeBatch();
19   }
20 
21   @Override
22   public void open(Configuration parameters) throws Exception {
23     Class.forName("com.mysql.jdbc.Driver");
24     Connection connection =
25         DriverManager.getConnection("jdbc:mysql://localhost:5432/casedb?user=signavio&password=signavio");
26 
27     statement = connection.prepareStatement(UPSERT_CASE);
28   }
29 
30 }

這樣,就能夠在流上添加sink 了:

1 DataStream<Case> cases = ...
2 cases.addSink(new RichCaseSink());

固然,上面的實現很簡略,沒有給出批量提交或者超時提交,這個均可以很容易的添加,好比close()中關閉鏈接。

可是上面的實現中,最大的問題仍是沒有跟flink的狀態管理相結合,這個纔是重頭戲。

方式二 增強版的自定義sink

在checkpoint的時候保存數據,繼承接口CheckpointedFunction :

 1 @Override
 2 public void snapshotState(FunctionSnapshotContext context) throws Exception {
 3   long checkpointId = context.getCheckpointId();
 4   List<Case> cases = pendingCasesPerCheckpoint.get(checkpointId);
 5   if(cases == null){
 6     cases = new ArrayList<>();
 7     pendingCasesPerCheckpoint.put(checkpointId, cases);
 8   }
 9   cases.addAll(pendingCases);
10   pendingCases.clear();
11 }

在消息到達的時候不插入數據,只是留存數據:

1 @Override
2 public void invoke(Case aCase) throws Exception {
3   pendingCases.add(aCase);
4 }

這樣,經過繼承CheckpointListener,咱們就能在某個checkpoint完成的時候插入數據:

 1 @Override
 2 public void notifyCheckpointComplete(long checkpointId) throws Exception {
 3 
 4  Iterator<Map.Entry<Long, List<Case>>> pendingCheckpointsIt =
 5    pendingCasesPerCheckpoint.entrySet().iterator();
 6 
 7  while (pendingCheckpointsIt.hasNext()) {
 8 
 9   Map.Entry<Long, List<Case>> entry = pendingCheckpointsIt.next();
10   Long pastCheckpointId = entry.getKey();
11   List<Case> pendingCases = entry.getValue();
12 
13   if (pastCheckpointId <= checkpointId) {
14 
15    for (Case pendingCase : pendingCases) {
16     statement.setString(1, pendingCase.getId());
17     statement.setString(2, pendingCase.getTraceHash());
18     statement.setString(3, pendingCase.getTraceHash());
19     statement.addBatch();
20    }
21    pendingCheckpointsIt.remove();
22   }
23  }
24  statement.executeBatch();
25 
26 }

前提,是須要設置checkpoint,好比:

ExecutionEnvironment env = ...
env.enableCheckpointing(10000L);

這樣,每隔10s,當一個checkpoint作成功,就會插入一次數據。

固然,上面的代碼驗證可用,但不建議在生產環境使用,生產環境須要考慮更多的問題。

相關文章
相關標籤/搜索