public class RecordMakers { private final Logger logger = LoggerFactory.getLogger(getClass()); private final MySqlSchema schema; private final SourceInfo source; private final TopicSelector<TableId> topicSelector; private final boolean emitTombstoneOnDelete; private final Map<Long, Converter> convertersByTableNumber = new HashMap<>(); private final Map<TableId, Long> tableNumbersByTableId = new HashMap<>(); private final Map<Long, TableId> tableIdsByTableNumber = new HashMap<>(); private final Schema schemaChangeKeySchema; private final Schema schemaChangeValueSchema; private final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(logger); private final Map<String, ?> restartOffset; //...... public RecordMakers(MySqlSchema schema, SourceInfo source, TopicSelector<TableId> topicSelector, boolean emitTombstoneOnDelete, Map<String, ?> restartOffset) { this.schema = schema; this.source = source; this.topicSelector = topicSelector; this.emitTombstoneOnDelete = emitTombstoneOnDelete; this.restartOffset = restartOffset; this.schemaChangeKeySchema = SchemaBuilder.struct() .name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.SchemaChangeKey")) .field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA) .build(); this.schemaChangeValueSchema = SchemaBuilder.struct() .name(schemaNameAdjuster.adjust("io.debezium.connector.mysql.SchemaChangeValue")) .field(Fields.SOURCE, source.schema()) .field(Fields.DATABASE_NAME, Schema.STRING_SCHEMA) .field(Fields.DDL_STATEMENTS, Schema.STRING_SCHEMA) .build(); } public RecordsForTable forTable(TableId tableId, BitSet includedColumns, BlockingConsumer<SourceRecord> consumer) { Long tableNumber = tableNumbersByTableId.get(tableId); return tableNumber != null ? forTable(tableNumber, includedColumns, consumer) : null; } //...... }
public final class RecordsForTable { private final BitSet includedColumns; private final Converter converter; private final BlockingConsumer<SourceRecord> consumer; protected RecordsForTable(Converter converter, BitSet includedColumns, BlockingConsumer<SourceRecord> consumer) { this.converter = converter; this.includedColumns = includedColumns; this.consumer = consumer; } public int read(Object[] row, Instant ts) throws InterruptedException { return read(row, ts, 0, 1); } public int read(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException { return converter.read(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer); } public int create(Object[] row, Instant ts) throws InterruptedException { return create(row, ts, 0, 1); } public int create(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException { return converter.insert(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer); } public int update(Object[] before, Object[] after, Instant ts) throws InterruptedException { return update(before, after, ts, 0, 1); } public int update(Object[] before, Object[] after, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException { return converter.update(source, before, after, rowNumber, numberOfRows, includedColumns, ts, consumer); } public int delete(Object[] row, Instant ts) throws InterruptedException { return delete(row, ts, 0, 1); } public int delete(Object[] row, Instant ts, int rowNumber, int numberOfRows) throws InterruptedException { return converter.delete(source, row, rowNumber, numberOfRows, includedColumns, ts, consumer); } }
protected static interface Converter { int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer<SourceRecord> consumer) throws InterruptedException; int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer<SourceRecord> consumer) throws InterruptedException; int update(SourceInfo source, Object[] before, Object[] after, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer<SourceRecord> consumer) throws InterruptedException; int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer<SourceRecord> consumer) throws InterruptedException; }
Converter converter = new Converter() { @Override public int read(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer<SourceRecord> consumer) throws InterruptedException { Object key = tableSchema.keyFromColumnData(row); Struct value = tableSchema.valueFromColumnData(row); if (value != null || key != null) { Schema keySchema = tableSchema.keySchema(); Map<String, ?> partition = source.partition(); Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows); source.tableEvent(id); Struct origin = source.struct(); SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, key, envelope.schema(), envelope.read(value, origin, ts)); consumer.accept(record); return 1; } return 0; } @Override public int insert(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer<SourceRecord> consumer) throws InterruptedException { validateColumnCount(tableSchema, row); Object key = tableSchema.keyFromColumnData(row); Struct value = tableSchema.valueFromColumnData(row); if (value != null || key != null) { Schema keySchema = tableSchema.keySchema(); Map<String, ?> partition = source.partition(); Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows); source.tableEvent(id); Struct origin = source.struct(); SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, key, envelope.schema(), envelope.create(value, origin, ts)); consumer.accept(record); return 1; } return 0; } @Override public int update(SourceInfo source, Object[] before, Object[] after, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer<SourceRecord> consumer) throws InterruptedException { int count = 0; validateColumnCount(tableSchema, after); Object key = tableSchema.keyFromColumnData(after); Struct valueAfter = tableSchema.valueFromColumnData(after); if (valueAfter != null || key != null) { Object oldKey = tableSchema.keyFromColumnData(before); Struct valueBefore = tableSchema.valueFromColumnData(before); Schema keySchema = tableSchema.keySchema(); Map<String, ?> partition = source.partition(); Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows); source.tableEvent(id); Struct origin = source.struct(); if (key != null && !Objects.equals(key, oldKey)) { // The key has changed, so we need to deal with both the new key and old key. // Consumers may push the events into a system that won't allow both records to exist at the same time, // so we first want to send the delete event for the old key... SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, oldKey, envelope.schema(), envelope.delete(valueBefore, origin, ts)); consumer.accept(record); ++count; if (emitTombstoneOnDelete) { // Next send a tombstone event for the old key ... record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, oldKey, null, null); consumer.accept(record); ++count; } // And finally send the create event ... record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, key, envelope.schema(), envelope.create(valueAfter, origin, ts)); consumer.accept(record); ++count; } else { // The key has not changed, so a simple update is fine ... SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, key, envelope.schema(), envelope.update(valueBefore, valueAfter, origin, ts)); consumer.accept(record); ++count; } } return count; } @Override public int delete(SourceInfo source, Object[] row, int rowNumber, int numberOfRows, BitSet includedColumns, Instant ts, BlockingConsumer<SourceRecord> consumer) throws InterruptedException { int count = 0; validateColumnCount(tableSchema, row); Object key = tableSchema.keyFromColumnData(row); Struct value = tableSchema.valueFromColumnData(row); if (value != null || key != null) { Schema keySchema = tableSchema.keySchema(); Map<String, ?> partition = source.partition(); Map<String, Object> offset = source.offsetForRow(rowNumber, numberOfRows); source.tableEvent(id); Struct origin = source.struct(); // Send a delete message ... SourceRecord record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, key, envelope.schema(), envelope.delete(value, origin, ts)); consumer.accept(record); ++count; // And send a tombstone ... if (emitTombstoneOnDelete) { record = new SourceRecord(partition, getSourceRecordOffset(offset), topicName, partitionNum, keySchema, key, null, null); consumer.accept(record); ++count; } } return count; } @Override public String toString() { return "RecordMaker.Converter(" + id + ")"; } private void validateColumnCount(TableSchema tableSchema, Object[] row) { final int expectedColumnsCount = schema.tableFor(tableSchema.id()).columns().size(); if (expectedColumnsCount != row.length) { logger.error("Invalid number of columns, expected '{}' arrived '{}'", expectedColumnsCount, row.length); throw new ConnectException( "The binlog event does not contain expected number of columns; the internal schema representation is probably out of sync with the real database schema, or the binlog contains events recorded with binlog_row_image other than FULL or the table in question is an NDB table"); } } };