文章目錄
前言
Flink 1.11新增支持CDC,包括Debezium、Canal,現修改debezium-json的format格式java
默認輸出格式
一、插入apache
(true,1,2,3)
二、更新json
(false,1,2,3)
(true,1,2,4
三、刪除bash
(false,1,2,3)
(true,1,2,4
但願修改後的輸出格式
一、插入ide
(true,1,2,3,'c')
備註:‘c’表明新建spa
二、更新scala
(true,1,2,4,'u')
備註:'u’表明更新code
三、刪除
備註:不作任何處理
orm
修改代碼
flink-release-1.11.1/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@Override public void deserialize(byte[] message, Collector<RowData> out) throws IOException { try { GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message); GenericRowData payload; if (schemaInclude) { payload = (GenericRowData) row.getField(0); } else { payload = row; } GenericRowData before = (GenericRowData) payload.getField(0); GenericRowData after = (GenericRowData) payload.getField(1); int len = after.getArity(); after.setField(len-1,payload.getField(2)); String op = payload.getField(2).toString(); if (OP_CREATE.equals(op) || OP_READ.equals(op)) { //after.setRowKind(RowKind.INSERT); after.setField(len-1,payload.getField(2)); out.collect(after); } else if (OP_UPDATE.equals(op)) { //before.setRowKind(RowKind.UPDATE_BEFORE); //after.setRowKind(RowKind.UPDATE_AFTER); //out.collect(before); out.collect(after); } else if (OP_DELETE.equals(op)) { //before.setRowKind(RowKind.DELETE); before.setField(len-1,payload.getField(2)); out.collect(before); } else { if (!ignoreParseErrors) { throw new IOException(format( "Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'", op, new String(message))); } } } catch (Throwable t) { // a big try catch to protect the processing. if (!ignoreParseErrors) { throw new IOException(format( "Corrupt Debezium JSON message '%s'.", new String(message)), t); } } }