public class BinlogEventListener implements BinaryLogClient.EventListener {
protected final AtomicBoolean stopNow = new AtomicBoolean(false);
private static final int QUEUE_OFFER_TIMEOUT_MSEC = 100;
private final BlockingQueue<RawBinlogEvent> queue;
private final BinaryLogClient client;
public BinlogEventListener(BinaryLogClient client, BlockingQueue<RawBinlogEvent> q) {
this.client = client;
this.queue = q;
public void start() {
public void stop() {
public void onEvent(Event event) {
while (!stopNow.get()) {
RawBinlogEvent ep = new RawBinlogEvent(event, client.getBinlogFilename());
try {
if (queue.offer(ep, QUEUE_OFFER_TIMEOUT_MSEC, TimeUnit.MILLISECONDS)) {
} else {
throw new RuntimeException("Unable to add event to the queue");
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted while adding event to the queue");
public class RawBinlogEvent {
private Event event;
private String binlogFilename;
public RawBinlogEvent(Event event, String binlogFilename) {
this.event = event;
this.binlogFilename = binlogFilename;
public Event getEvent() {
return event;
public void setEvent(Event event) {
this.event = event;
public String getBinlogFilename() {
return binlogFilename;
public class CaptureChangeMySQL extends AbstractSessionFactoryProcessor {
public void outputEvents(ProcessSession session, StateManager stateManager, ComponentLog log) throws IOException {
RawBinlogEvent rawBinlogEvent;
// Drain the queue
while ((rawBinlogEvent = queue.poll()) != null && !doStop.get()) {
Event event = rawBinlogEvent.getEvent();
EventHeaderV4 header = event.getHeader();
long timestamp = header.getTimestamp();
EventType eventType = header.getEventType();
// Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume at the event about to be processed.
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only // advance the position if it is not that type of event. ROTATE events don't generate output CDC events and have the current binlog position in a special field, which
// is filled in during the ROTATE case
if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION) {
currentBinlogPosition = header.getPosition();
log.debug("Got message event type: {} ", new Object[]{header.getEventType().toString()});
switch (eventType) {
// This is sent to inform which table is about to be changed by subsequent events
TableMapEventData data = event.getData();
// Should we skip this table? Yes if we've specified a DB or table name pattern and they don't match
skipTable = (databaseNamePattern != null && !databaseNamePattern.matcher(data.getDatabase()).matches())
|| (tableNamePattern != null && !tableNamePattern.matcher(data.getTable()).matches());
if (!skipTable) {
TableInfoCacheKey key = new TableInfoCacheKey(this.getIdentifier(), data.getDatabase(), data.getTable(), data.getTableId());
if (cacheClient != null) {
try {
currentTable = cacheClient.get(key, cacheKeySerializer, cacheValueDeserializer);
} catch (ConnectException ce) {
throw new IOException("Could not connect to Distributed Map Cache server to get table information", ce);
if (currentTable == null) {
// We don't have an entry for this table yet, so fetch the info from the database and populate the cache try { currentTable = loadTableInfo(key); try { cacheClient.put(key, currentTable, cacheKeySerializer, cacheValueSerializer); } catch (ConnectException ce) { throw new IOException("Could not connect to Distributed Map Cache server to put table information", ce); } } catch (SQLException se) { // Propagate the error up, so things like rollback and logging/bulletins can be handled throw new IOException(se.getMessage(), se); } } } } else { // Clear the current table, to force a reload next time we get a TABLE_MAP event we care about currentTable = null; } break; case QUERY: QueryEventData queryEventData = event.getData(); currentDatabase = queryEventData.getDatabase(); String sql = queryEventData.getSql(); // Is this the start of a transaction? if ("BEGIN".equals(sql)) { // If we're already in a transaction, something bad happened, alert the user
if (inTransaction) {
throw new IOException("BEGIN event received while already processing a transaction. This could indicate that your binlog position is invalid.");
// Mark the current binlog position in case we have to rollback the transaction (if the processor is stopped, e.g.)
xactBinlogFile = currentBinlogFile;
xactBinlogPosition = currentBinlogPosition;
xactSequenceId = currentSequenceId.get();
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
BeginTransactionEventInfo beginEvent = new BeginTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(beginEventWriter.writeEvent(currentSession, transitUri, beginEvent, currentSequenceId.get(), REL_SUCCESS));
inTransaction = true;
} else if ("COMMIT".equals(sql)) {
if (!inTransaction) {
throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). "
+ "This could indicate that your binlog position is invalid.");
// InnoDB generates XID events for "commit", but MyISAM generates Query events with "COMMIT", so handle that here
if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition);
currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS));
// Commit the NiFi session
inTransaction = false;
currentTable = null;
} else {
// Check for DDL events (alter table, e.g.). Normalize the query to do string matching on the type of change
String normalizedQuery = sql.toLowerCase().trim().replaceAll(" {2,}", " ");
if (normalizedQuery.startsWith("alter table")
|| normalizedQuery.startsWith("alter ignore table")
|| normalizedQuery.startsWith("create table")
|| normalizedQuery.startsWith("truncate table")
|| normalizedQuery.startsWith("rename table")
|| normalizedQuery.startsWith("drop table")
|| normalizedQuery.startsWith("drop database")) {
if (includeDDLEvents && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) {
// If we don't have table information, we can still use the database name TableInfo ddlTableInfo = (currentTable != null) ? currentTable : new TableInfo(currentDatabase, null, null, null); DDLEventInfo ddlEvent = new DDLEventInfo(ddlTableInfo, timestamp, currentBinlogFile, currentBinlogPosition, sql); currentSequenceId.set(ddlEventWriter.writeEvent(currentSession, transitUri, ddlEvent, currentSequenceId.get(), REL_SUCCESS)); } // Remove all the keys from the cache that this processor added if (cacheClient != null) { cacheClient.removeByPattern(this.getIdentifier() + ".*"); } // If not in a transaction, commit the session so the DDL event(s) will be transferred if (includeDDLEvents && !inTransaction) { session.commit(); } } } break; case XID: if (!inTransaction) { throw new IOException("COMMIT event received while not processing a transaction (i.e. no corresponding BEGIN event). " + "This could indicate that your binlog position is invalid."); } if (includeBeginCommit && (databaseNamePattern == null || databaseNamePattern.matcher(currentDatabase).matches())) { CommitTransactionEventInfo commitTransactionEvent = new CommitTransactionEventInfo(currentDatabase, timestamp, currentBinlogFile, currentBinlogPosition); currentSequenceId.set(commitEventWriter.writeEvent(currentSession, transitUri, commitTransactionEvent, currentSequenceId.get(), REL_SUCCESS)); } // Commit the NiFi session session.commit(); inTransaction = false; currentTable = null; currentDatabase = null; break; case WRITE_ROWS: case EXT_WRITE_ROWS: case PRE_GA_WRITE_ROWS: case UPDATE_ROWS: case EXT_UPDATE_ROWS: case PRE_GA_UPDATE_ROWS: case DELETE_ROWS: case EXT_DELETE_ROWS: case PRE_GA_DELETE_ROWS: // If we are skipping this table, then don't emit any events related to its modification
if (skipTable) {
if (!inTransaction) {
// These events should only happen inside a transaction, warn the user otherwise
log.warn("Table modification event occurred outside of a transaction.");
if (currentTable == null && cacheClient != null) {
// No Table Map event was processed prior to this event, which should not happen, so throw an error
throw new RowEventException("No table information is available for this event, cannot process further.");
if (eventType == WRITE_ROWS
|| eventType == EXT_WRITE_ROWS
|| eventType == PRE_GA_WRITE_ROWS) {
InsertRowsEventInfo eventInfo = new InsertRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(insertRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
} else if (eventType == DELETE_ROWS
|| eventType == EXT_DELETE_ROWS
|| eventType == PRE_GA_DELETE_ROWS) {
DeleteRowsEventInfo eventInfo = new DeleteRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(deleteRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
} else {
// Update event
UpdateRowsEventInfo eventInfo = new UpdateRowsEventInfo(currentTable, timestamp, currentBinlogFile, currentBinlogPosition, event.getData());
currentSequenceId.set(updateRowsWriter.writeEvent(currentSession, transitUri, eventInfo, currentSequenceId.get(), REL_SUCCESS));
case ROTATE:
// Update current binlog filename
RotateEventData rotateEventData = event.getData();
currentBinlogFile = rotateEventData.getBinlogFilename();
currentBinlogPosition = rotateEventData.getBinlogPosition();
// Advance the current binlog position. This way if no more events are received and the processor is stopped, it will resume after the event that was just processed.
// We always get ROTATE and FORMAT_DESCRIPTION messages no matter where we start (even from the end), and they won't have the correct "next position" value, so only // advance the position if it is not that type of event. if (eventType != ROTATE && eventType != FORMAT_DESCRIPTION) { currentBinlogPosition = header.getNextPosition(); } } } //...... } 複製代碼