本文主要研究一下debezium的eventHandlersjava
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.javamysql
public class BinlogReader extends AbstractReader { //...... protected void handleInsert(Event event) throws InterruptedException { if (skipEvent) { // We can skip this because we should already be at least this far ... logger.debug("Skipping previously processed row event: {}", event); return; } if (ignoreDmlEventByGtidSource) { logger.debug("Skipping DML event because this GTID source is filtered: {}", event); return; } WriteRowsEventData write = unwrapData(event); long tableNumber = write.getTableId(); BitSet includedColumns = write.getIncludedColumns(); RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord); if (recordMaker != null) { List<Serializable[]> rows = write.getRows(); final Instant ts = context.getClock().currentTimeAsInstant(); int count = 0; int numRows = rows.size(); if (startingRowNumber < numRows) { for (int row = startingRowNumber; row != numRows; ++row) { count += recordMaker.create(rows.get(row), ts, row, numRows); } if (logger.isDebugEnabled()) { if (startingRowNumber != 0) { logger.debug("Recorded {} insert record(s) for last {} row(s) in event: {}", count, numRows - startingRowNumber, event); } else { logger.debug("Recorded {} insert record(s) for event: {}", count, event); } } } else { // All rows were previously processed ... logger.debug("Skipping previously processed insert event: {}", event); } } else { informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "insert row"); } startingRowNumber = 0; } //...... }
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.javagit
public class BinlogReader extends AbstractReader { //...... protected void handleUpdate(Event event) throws InterruptedException { if (skipEvent) { // We can skip this because we should already be at least this far ... logger.debug("Skipping previously processed row event: {}", event); return; } if (ignoreDmlEventByGtidSource) { logger.debug("Skipping DML event because this GTID source is filtered: {}", event); return; } UpdateRowsEventData update = unwrapData(event); long tableNumber = update.getTableId(); BitSet includedColumns = update.getIncludedColumns(); // BitSet includedColumnsBefore = update.getIncludedColumnsBeforeUpdate(); RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord); if (recordMaker != null) { List<Entry<Serializable[], Serializable[]>> rows = update.getRows(); final Instant ts = context.getClock().currentTimeAsInstant(); int count = 0; int numRows = rows.size(); if (startingRowNumber < numRows) { for (int row = startingRowNumber; row != numRows; ++row) { Map.Entry<Serializable[], Serializable[]> changes = rows.get(row); Serializable[] before = changes.getKey(); Serializable[] after = changes.getValue(); count += recordMaker.update(before, after, ts, row, numRows); } if (logger.isDebugEnabled()) { if (startingRowNumber != 0) { logger.debug("Recorded {} update record(s) for last {} row(s) in event: {}", count, numRows - startingRowNumber, event); } else { logger.debug("Recorded {} update record(s) for event: {}", count, event); } } } else { // All rows were previously processed ... logger.debug("Skipping previously processed update event: {}", event); } } else { informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "update row"); } startingRowNumber = 0; } //...... }
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.javagithub
public class BinlogReader extends AbstractReader { //...... protected void handleDelete(Event event) throws InterruptedException { if (skipEvent) { // We can skip this because we should already be at least this far ... logger.debug("Skipping previously processed row event: {}", event); return; } if (ignoreDmlEventByGtidSource) { logger.debug("Skipping DML event because this GTID source is filtered: {}", event); return; } DeleteRowsEventData deleted = unwrapData(event); long tableNumber = deleted.getTableId(); BitSet includedColumns = deleted.getIncludedColumns(); RecordsForTable recordMaker = recordMakers.forTable(tableNumber, includedColumns, super::enqueueRecord); if (recordMaker != null) { List<Serializable[]> rows = deleted.getRows(); final Instant ts = context.getClock().currentTimeAsInstant(); int count = 0; int numRows = rows.size(); if (startingRowNumber < numRows) { for (int row = startingRowNumber; row != numRows; ++row) { count += recordMaker.delete(rows.get(row), ts, row, numRows); } if (logger.isDebugEnabled()) { if (startingRowNumber != 0) { logger.debug("Recorded {} delete record(s) for last {} row(s) in event: {}", count, numRows - startingRowNumber, event); } else { logger.debug("Recorded {} delete record(s) for event: {}", count, event); } } } else { // All rows were previously processed ... logger.debug("Skipping previously processed delete event: {}", event); } } else { informAboutUnknownTableIfRequired(event, recordMakers.getTableIdFromTableNumber(tableNumber), "delete row"); } startingRowNumber = 0; } //...... }
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.javasql
public class BinlogReader extends AbstractReader { //...... protected void handleQueryEvent(Event event) throws InterruptedException { QueryEventData command = unwrapData(event); logger.debug("Received query command: {}", event); String sql = command.getSql().trim(); if (sql.equalsIgnoreCase("BEGIN")) { // We are starting a new transaction ... source.startNextTransaction(); source.setBinlogThread(command.getThreadId()); if (initialEventsToSkip != 0) { logger.debug("Restarting partially-processed transaction; change events will not be created for the first {} events plus {} more rows in the next event", initialEventsToSkip, startingRowNumber); // We are restarting, so we need to skip the events in this transaction that we processed previously... skipEvent = true; } return; } if (sql.equalsIgnoreCase("COMMIT")) { handleTransactionCompletion(event); return; } String upperCasedStatementBegin = Strings.getBegin(sql, 7).toUpperCase(); if (upperCasedStatementBegin.startsWith("XA ")) { // This is an XA transaction, and we currently ignore these and do nothing ... return; } if (context.ddlFilter().test(sql)) { logger.debug("DDL '{}' was filtered out of processing", sql); return; } if (upperCasedStatementBegin.equals("INSERT ") || upperCasedStatementBegin.equals("UPDATE ") || upperCasedStatementBegin.equals("DELETE ")) { throw new ConnectException( "Received DML '" + sql + "' for processing, binlog probably contains events generated with statement or mixed based replication format"); } if (sql.equalsIgnoreCase("ROLLBACK")) { // We have hit a ROLLBACK which is not supported logger.warn("Rollback statements cannot be handled without binlog buffering, the connector will fail. Please check '{}' to see how to enable buffering", MySqlConnectorConfig.BUFFER_SIZE_FOR_BINLOG_READER.name()); } context.dbSchema().applyDdl(context.source(), command.getDatabase(), command.getSql(), (dbName, tables, statements) -> { if (recordSchemaChangesInSourceRecords && recordMakers.schemaChanges(dbName, tables, statements, super::enqueueRecord) > 0) { logger.debug("Recorded DDL statements for database '{}': {}", dbName, statements); } }); } //...... }
BinlogReader註冊了增刪改查的eventHandlers,它們分別解析event對對應的data,對於增刪改則執行recordMakers的對應方法,對於查詢則做用對應的ddl語句app