本文主要研究一下nifi的AbstractBinlogTableEventWriterjava
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/AbstractBinlogTableEventWriter.javamysql
public abstract class AbstractBinlogTableEventWriter<T extends BinlogTableEventInfo> extends AbstractBinlogEventWriter<T> { protected void writeJson(T event) throws IOException { super.writeJson(event); if (event.getDatabaseName() != null) { jsonGenerator.writeStringField("database", event.getDatabaseName()); } else { jsonGenerator.writeNullField("database"); } if (event.getTableName() != null) { jsonGenerator.writeStringField("table_name", event.getTableName()); } else { jsonGenerator.writeNullField("table_name"); } if (event.getTableId() != null) { jsonGenerator.writeNumberField("table_id", event.getTableId()); } else { jsonGenerator.writeNullField("table_id"); } } // Default implementation for table-related binlog events @Override public long writeEvent(ProcessSession session, String transitUri, T eventInfo, long currentSequenceId, Relationship relationship) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, (outputStream) -> { super.startJson(outputStream, eventInfo); writeJson(eventInfo); // Nothing in the body super.endJson(); }); flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); session.transfer(flowFile, relationship); session.getProvenanceReporter().receive(flowFile, transitUri); return currentSequenceId + 1; } }
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DDLEventWriter.javagit
public class DDLEventWriter extends AbstractBinlogTableEventWriter<DDLEventInfo> { @Override public long writeEvent(ProcessSession session, String transitUri, DDLEventInfo eventInfo, long currentSequenceId, Relationship relationship) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, (outputStream) -> { super.startJson(outputStream, eventInfo); super.writeJson(eventInfo); jsonGenerator.writeStringField("query", eventInfo.getQuery()); super.endJson(); }); flowFile = session.putAllAttributes(flowFile, getCommonAttributes(currentSequenceId, eventInfo)); session.transfer(flowFile, relationship); session.getProvenanceReporter().receive(flowFile, transitUri); return currentSequenceId + 1; } }
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/InsertRowsWriter.javagithub
public class InsertRowsWriter extends AbstractBinlogTableEventWriter<InsertRowsEventInfo> { /** * Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set * * @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred * @param eventInfo An event whose value will become the contents of the flow file * @return The next available CDC sequence ID for use by the CDC processor */ @Override public long writeEvent(final ProcessSession session, String transitUri, final InsertRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) { final AtomicLong seqId = new AtomicLong(currentSequenceId); for (Serializable[] row : eventInfo.getRows()) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> { super.startJson(outputStream, eventInfo); super.writeJson(eventInfo); final BitSet bitSet = eventInfo.getIncludedColumns(); writeRow(eventInfo, row, bitSet); super.endJson(); }); flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo)); session.transfer(flowFile, relationship); session.getProvenanceReporter().receive(flowFile, transitUri); seqId.getAndIncrement(); } return seqId.get(); } protected void writeRow(InsertRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException { jsonGenerator.writeArrayFieldStart("columns"); int i = includedColumns.nextSetBit(0); while (i != -1) { jsonGenerator.writeStartObject(); jsonGenerator.writeNumberField("id", i + 1); ColumnDefinition columnDefinition = event.getColumnByIndex(i); Integer columnType = null; if (columnDefinition != null) { jsonGenerator.writeStringField("name", columnDefinition.getName()); columnType = columnDefinition.getType(); jsonGenerator.writeNumberField("column_type", columnType); } if (row[i] == null) { jsonGenerator.writeNullField("value"); } else { jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i])); } jsonGenerator.writeEndObject(); i = includedColumns.nextSetBit(i + 1); } jsonGenerator.writeEndArray(); } }
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/UpdateRowsWriter.javasql
public class UpdateRowsWriter extends AbstractBinlogTableEventWriter<UpdateRowsEventInfo> { /** * Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set * * @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred * @param eventInfo An event whose value will become the contents of the flow file * @return The next available CDC sequence ID for use by the CDC processor */ @Override public long writeEvent(final ProcessSession session, String transitUri, final UpdateRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) { final AtomicLong seqId = new AtomicLong(currentSequenceId); for (Map.Entry<Serializable[], Serializable[]> row : eventInfo.getRows()) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> { super.startJson(outputStream, eventInfo); super.writeJson(eventInfo); final BitSet bitSet = eventInfo.getIncludedColumns(); writeRow(eventInfo, row, bitSet); super.endJson(); }); flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo)); session.transfer(flowFile, relationship); session.getProvenanceReporter().receive(flowFile, transitUri); seqId.getAndIncrement(); } return seqId.get(); } protected void writeRow(UpdateRowsEventInfo event, Map.Entry<Serializable[], Serializable[]> row, BitSet includedColumns) throws IOException { jsonGenerator.writeArrayFieldStart("columns"); int i = includedColumns.nextSetBit(0); while (i != -1) { jsonGenerator.writeStartObject(); jsonGenerator.writeNumberField("id", i + 1); ColumnDefinition columnDefinition = event.getColumnByIndex(i); Integer columnType = null; if (columnDefinition != null) { jsonGenerator.writeStringField("name", columnDefinition.getName()); columnType = columnDefinition.getType(); jsonGenerator.writeNumberField("column_type", columnType); } Serializable[] oldRow = row.getKey(); Serializable[] newRow = row.getValue(); if (oldRow[i] == null) { jsonGenerator.writeNullField("last_value"); } else { jsonGenerator.writeObjectField("last_value", MySQLCDCUtils.getWritableObject(columnType, oldRow[i])); } if (newRow[i] == null) { jsonGenerator.writeNullField("value"); } else { jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, newRow[i])); } jsonGenerator.writeEndObject(); i = includedColumns.nextSetBit(i + 1); } jsonGenerator.writeEndArray(); } }
nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src/main/java/org/apache/nifi/cdc/mysql/event/io/DeleteRowsWriter.javaapache
public class DeleteRowsWriter extends AbstractBinlogTableEventWriter<DeleteRowsEventInfo> { /** * Creates and transfers a new flow file whose contents are the JSON-serialized value of the specified event, and the sequence ID attribute set * * @param session A reference to a ProcessSession from which the flow file(s) will be created and transferred * @param eventInfo An event whose value will become the contents of the flow file * @return The next available CDC sequence ID for use by the CDC processor */ @Override public long writeEvent(final ProcessSession session, String transitUri, final DeleteRowsEventInfo eventInfo, final long currentSequenceId, Relationship relationship) { final AtomicLong seqId = new AtomicLong(currentSequenceId); for (Serializable[] row : eventInfo.getRows()) { FlowFile flowFile = session.create(); flowFile = session.write(flowFile, outputStream -> { super.startJson(outputStream, eventInfo); super.writeJson(eventInfo); final BitSet bitSet = eventInfo.getIncludedColumns(); writeRow(eventInfo, row, bitSet); super.endJson(); }); flowFile = session.putAllAttributes(flowFile, getCommonAttributes(seqId.get(), eventInfo)); session.transfer(flowFile, relationship); session.getProvenanceReporter().receive(flowFile, transitUri); seqId.getAndIncrement(); } return seqId.get(); } protected void writeRow(DeleteRowsEventInfo event, Serializable[] row, BitSet includedColumns) throws IOException { jsonGenerator.writeArrayFieldStart("columns"); int i = includedColumns.nextSetBit(0); while (i != -1) { jsonGenerator.writeStartObject(); jsonGenerator.writeNumberField("id", i + 1); ColumnDefinition columnDefinition = event.getColumnByIndex(i); Integer columnType = null; if (columnDefinition != null) { jsonGenerator.writeStringField("name", columnDefinition.getName()); columnType = columnDefinition.getType(); jsonGenerator.writeNumberField("column_type", columnType); } if (row[i] == null) { jsonGenerator.writeNullField("value"); } else { jsonGenerator.writeObjectField("value", MySQLCDCUtils.getWritableObject(columnType, row[i])); } jsonGenerator.writeEndObject(); i = includedColumns.nextSetBit(i + 1); } jsonGenerator.writeEndArray(); } }
AbstractBinlogTableEventWriter繼承了AbstractBinlogEventWriter,其泛型基類爲BinlogTableEventInfo,它有四個子類,分別是DDLEventWriter、InsertRowsWriter、UpdateRowsWriter、DeleteRowsWriterjson