Flink 1.11.1:flink CDC Debezium自定義修改debezium-json格式

前言

Flink 1.11新增支持CDC,包括Debezium、Canal,現修改debezium-json的format格式java

默認輸出格式

一、插入apache

(true,1,2,3)

二、更新json

(false,1,2,3)
(true,1,2,4

三、刪除bash

(false,1,2,3)
(true,1,2,4

但願修改後的輸出格式

一、插入ide

(true,1,2,3,'c')

備註:‘c’表明新建spa

二、更新scala

(true,1,2,4,'u')

備註:'u’表明更新code

三、刪除
備註:不作任何處理
orm

修改代碼

flink-release-1.11.1/flink-formats/flink-json/src/main/java/org/apache/flink/formats/json/debezium/DebeziumJsonDeserializationSchema.java
@Override
	public void deserialize(byte[] message, Collector<RowData> out) throws IOException { 
		try { 
			GenericRowData row = (GenericRowData) jsonDeserializer.deserialize(message);
			GenericRowData payload;
			if (schemaInclude) { 
				payload = (GenericRowData) row.getField(0);
			} else { 
				payload = row;
			}

			GenericRowData before = (GenericRowData) payload.getField(0);
			GenericRowData after = (GenericRowData) payload.getField(1);
            int len = after.getArity();
            after.setField(len-1,payload.getField(2));
			String op = payload.getField(2).toString();
			if (OP_CREATE.equals(op) || OP_READ.equals(op)) { 
				//after.setRowKind(RowKind.INSERT);
                after.setField(len-1,payload.getField(2));
				out.collect(after);
			} else if (OP_UPDATE.equals(op)) { 
				//before.setRowKind(RowKind.UPDATE_BEFORE);
				//after.setRowKind(RowKind.UPDATE_AFTER);
				//out.collect(before);
				out.collect(after);
			} else if (OP_DELETE.equals(op)) { 
				//before.setRowKind(RowKind.DELETE);
                before.setField(len-1,payload.getField(2));
				out.collect(before);
			} else { 
				if (!ignoreParseErrors) { 
					throw new IOException(format(
						"Unknown \"op\" value \"%s\". The Debezium JSON message is '%s'", op, new String(message)));
				}
			}
		} catch (Throwable t) { 
			// a big try catch to protect the processing.
			if (!ignoreParseErrors) { 
				throw new IOException(format(
					"Corrupt Debezium JSON message '%s'.", new String(message)), t);
			}
		}
	}
相關文章
相關標籤/搜索