本文主要研究一下debezium的BinlogReaderjava
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/Reader.javamysql
public interface Reader { public static enum State { /** * The reader is stopped and static. */ STOPPED, /** * The reader is running and generated records. */ RUNNING, /** * The reader has completed its work or been explicitly stopped, but not all of the generated records have been * consumed via {@link Reader#poll() polling}. */ STOPPING; } public String name(); public State state(); public void uponCompletion(Runnable handler); public default void initialize() { // do nothing } public default void destroy() { // do nothing } public void start(); public void stop(); public List<SourceRecord> poll() throws InterruptedException; }
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/AbstractReader.javagit
public abstract class AbstractReader implements Reader { protected final Logger logger = LoggerFactory.getLogger(getClass()); private final String name; protected final MySqlTaskContext context; protected final MySqlJdbcContext connectionContext; private final BlockingQueue<SourceRecord> records; private final AtomicBoolean running = new AtomicBoolean(false); private final AtomicBoolean success = new AtomicBoolean(false); private final AtomicReference<ConnectException> failure = new AtomicReference<>(); private ConnectException failureException; private final int maxBatchSize; private final Metronome metronome; private final AtomicReference<Runnable> uponCompletion = new AtomicReference<>(); private final Duration pollInterval; protected final ChangeEventQueueMetrics changeEventQueueMetrics; private final HaltingPredicate acceptAndContinue; public AbstractReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue) { this.name = name; this.context = context; this.connectionContext = context.getConnectionContext(); this.records = new LinkedBlockingDeque<>(context.getConnectorConfig().getMaxQueueSize()); this.maxBatchSize = context.getConnectorConfig().getMaxBatchSize(); this.pollInterval = context.getConnectorConfig().getPollInterval(); this.metronome = Metronome.parker(pollInterval, Clock.SYSTEM); this.acceptAndContinue = acceptAndContinue == null ? new AcceptAllPredicate() : acceptAndContinue; this.changeEventQueueMetrics = new ChangeEventQueueMetrics() { @Override public int totalCapacity() { return context.getConnectorConfig().getMaxQueueSize(); } @Override public int remainingCapacity() { return records.remainingCapacity(); } }; } @Override public String name() { return name; } @Override public void uponCompletion(Runnable handler) { assert this.uponCompletion.get() == null; this.uponCompletion.set(handler); } @Override public final void initialize() { doInitialize(); } @Override public final void destroy() { doDestroy(); } @Override public void start() { if (this.running.compareAndSet(false, true)) { this.failure.set(null); this.success.set(false); doStart(); } } @Override public void stop() { try { // Emptying the queue so to make sure that enqueue() won't block indefinitely when adding records after // poll() isn't called anymore but before the binlog reader is stopped; note there's still a tiny chance for // this to happen if enough records are added again between here and the call to disconnect(); protecting // against it seems not worth though it as shouldn't happen for any practical queue size List<SourceRecord> unsent = new ArrayList<>(); records.drainTo(unsent); logger.info("Discarding {} unsent record(s) due to the connector shutting down", unsent.size()); doStop(); running.set(false); } finally { if (failure.get() != null) { // We had a failure and it was propagated via poll(), after which Kafka Connect will stop // the connector, which will stop the task that will then stop this reader via this method. // Since no more records will ever be polled again, we know we can clean up this reader's resources... doCleanup(); } } } @Override public State state() { if (success.get() || failure.get() != null) { // We've either completed successfully or have failed, but either way no more records will be returned ... return State.STOPPED; } if (running.get()) { return State.RUNNING; } // Otherwise, we're in the process of stopping ... return State.STOPPING; } @Override public List<SourceRecord> poll() throws InterruptedException { // Before we do anything else, determine if there was a failure and throw that exception ... failureException = this.failure.get(); if (failureException != null) { // In this case, we'll throw the exception and the Kafka Connect worker or EmbeddedEngine // will then explicitly stop the connector task. Most likely, however, the reader that threw // the exception will have already stopped itself and will generate no additional records. // Regardless, there may be records on the queue that will never be consumed. throw failureException; } // this reader has been stopped before it reached the success or failed end state, so clean up and abort if (!running.get()) { cleanupResources(); throw new InterruptedException("Reader was stopped while polling"); } logger.trace("Polling for next batch of records"); List<SourceRecord> batch = new ArrayList<>(maxBatchSize); final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL)); while (running.get() && (records.drainTo(batch, maxBatchSize) == 0) && !success.get()) { // No records are available even though the snapshot has not yet completed, so sleep for a bit ... metronome.pause(); // Check for failure after waking up ... failureException = this.failure.get(); if (failureException != null) { throw failureException; } if (timeout.expired()) { break; } } if (batch.isEmpty() && success.get() && records.isEmpty()) { // We found no records but the operation completed successfully, so we're done this.running.set(false); cleanupResources(); return null; } pollComplete(batch); logger.trace("Completed batch of {} records", batch.size()); return batch; } @Override public String toString() { return name; } //...... }
debezium-v1.1.1.Final/debezium-connector-mysql/src/main/java/io/debezium/connector/mysql/BinlogReader.javagithub
public class BinlogReader extends AbstractReader { private static final long INITIAL_POLL_PERIOD_IN_MILLIS = TimeUnit.SECONDS.toMillis(5); private static final long MAX_POLL_PERIOD_IN_MILLIS = TimeUnit.HOURS.toMillis(1); private final boolean recordSchemaChangesInSourceRecords; private final RecordMakers recordMakers; private final SourceInfo source; private final EnumMap<EventType, BlockingConsumer<Event>> eventHandlers = new EnumMap<>(EventType.class); private final BinaryLogClient client; //...... public BinlogReader(String name, MySqlTaskContext context, HaltingPredicate acceptAndContinue, long serverId) { super(name, context, acceptAndContinue); connectionContext = context.getConnectionContext(); source = context.source(); recordMakers = context.makeRecord(); recordSchemaChangesInSourceRecords = context.includeSchemaChangeRecords(); clock = context.getClock(); eventDeserializationFailureHandlingMode = connectionContext.eventProcessingFailureHandlingMode(); inconsistentSchemaHandlingMode = connectionContext.inconsistentSchemaHandlingMode(); // Use exponential delay to log the progress frequently at first, but the quickly tapering off to once an hour... pollOutputDelay = ElapsedTimeStrategy.exponential(clock, INITIAL_POLL_PERIOD_IN_MILLIS, MAX_POLL_PERIOD_IN_MILLIS); // Set up the log reader ... client = new BinaryLogClient(connectionContext.hostname(), connectionContext.port(), connectionContext.username(), connectionContext.password()); // BinaryLogClient will overwrite thread names later client.setThreadFactory(Threads.threadFactory(MySqlConnector.class, context.getConnectorConfig().getLogicalName(), "binlog-client", false)); client.setServerId(serverId); client.setSSLMode(sslModeFor(connectionContext.sslMode())); if (connectionContext.sslModeEnabled()) { SSLSocketFactory sslSocketFactory = getBinlogSslSocketFactory(connectionContext); if (sslSocketFactory != null) { client.setSslSocketFactory(sslSocketFactory); } } client.setKeepAlive(context.config().getBoolean(MySqlConnectorConfig.KEEP_ALIVE)); final long keepAliveInterval = context.config().getLong(MySqlConnectorConfig.KEEP_ALIVE_INTERVAL_MS); client.setKeepAliveInterval(keepAliveInterval); // Considering heartbeatInterval should be less than keepAliveInterval, we use the heartbeatIntervalFactor // multiply by keepAliveInterval and set the result value to heartbeatInterval.The default value of heartbeatIntervalFactor // is 0.8, and we believe the left time (0.2 * keepAliveInterval) is enough to process the packet received from the MySQL server. client.setHeartbeatInterval((long) (keepAliveInterval * heartbeatIntervalFactor)); client.registerEventListener(context.bufferSizeForBinlogReader() == 0 ? this::handleEvent : (new EventBuffer(context.bufferSizeForBinlogReader(), this))::add); client.registerLifecycleListener(new ReaderThreadLifecycleListener()); if (logger.isDebugEnabled()) { client.registerEventListener(this::logEvent); } //...... client.setEventDeserializer(eventDeserializer); // Set up for JMX ... metrics = new BinlogReaderMetrics(client, context, name, changeEventQueueMetrics); heartbeat = Heartbeat.create(context.config(), context.topicSelector().getHeartbeatTopic(), context.getConnectorConfig().getLogicalName()); } @Override protected void doStart() { context.dbSchema().assureNonEmptySchema(); // Register our event handlers ... eventHandlers.put(EventType.STOP, this::handleServerStop); eventHandlers.put(EventType.HEARTBEAT, this::handleServerHeartbeat); eventHandlers.put(EventType.INCIDENT, this::handleServerIncident); eventHandlers.put(EventType.ROTATE, this::handleRotateLogsEvent); eventHandlers.put(EventType.TABLE_MAP, this::handleUpdateTableMetadata); eventHandlers.put(EventType.QUERY, this::handleQueryEvent); eventHandlers.put(EventType.WRITE_ROWS, this::handleInsert); eventHandlers.put(EventType.UPDATE_ROWS, this::handleUpdate); eventHandlers.put(EventType.DELETE_ROWS, this::handleDelete); eventHandlers.put(EventType.EXT_WRITE_ROWS, this::handleInsert); eventHandlers.put(EventType.EXT_UPDATE_ROWS, this::handleUpdate); eventHandlers.put(EventType.EXT_DELETE_ROWS, this::handleDelete); eventHandlers.put(EventType.VIEW_CHANGE, this::viewChange); eventHandlers.put(EventType.XA_PREPARE, this::prepareTransaction); eventHandlers.put(EventType.XID, this::handleTransactionCompletion); // Conditionally register ROWS_QUERY handler to parse SQL statements. if (context.includeSqlQuery()) { eventHandlers.put(EventType.ROWS_QUERY, this::handleRowsQuery); } final boolean isGtidModeEnabled = connectionContext.isGtidModeEnabled(); metrics.setIsGtidModeEnabled(isGtidModeEnabled); // Get the current GtidSet from MySQL so we can get a filtered/merged GtidSet based off of the last Debezium checkpoint. String availableServerGtidStr = connectionContext.knownGtidSet(); if (isGtidModeEnabled) { // The server is using GTIDs, so enable the handler ... eventHandlers.put(EventType.GTID, this::handleGtidEvent); // Now look at the GTID set from the server and what we've previously seen ... GtidSet availableServerGtidSet = new GtidSet(availableServerGtidStr); // also take into account purged GTID logs GtidSet purgedServerGtidSet = connectionContext.purgedGtidSet(); logger.info("GTID set purged on server: {}", purgedServerGtidSet); GtidSet filteredGtidSet = context.filterGtidSet(availableServerGtidSet, purgedServerGtidSet); if (filteredGtidSet != null) { // We've seen at least some GTIDs, so start reading from the filtered GTID set ... logger.info("Registering binlog reader with GTID set: {}", filteredGtidSet); String filteredGtidSetStr = filteredGtidSet.toString(); client.setGtidSet(filteredGtidSetStr); source.setCompletedGtidSet(filteredGtidSetStr); gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(filteredGtidSetStr); } else { // We've not yet seen any GTIDs, so that means we have to start reading the binlog from the beginning ... client.setBinlogFilename(source.binlogFilename()); client.setBinlogPosition(source.binlogPosition()); gtidSet = new com.github.shyiko.mysql.binlog.GtidSet(""); } } else { // The server is not using GTIDs, so start reading the binlog based upon where we last left off ... client.setBinlogFilename(source.binlogFilename()); client.setBinlogPosition(source.binlogPosition()); } // We may be restarting in the middle of a transaction, so see how far into the transaction we have already processed... initialEventsToSkip = source.eventsToSkipUponRestart(); // Set the starting row number, which is the next row number to be read ... startingRowNumber = source.rowsToSkipUponRestart(); // Only when we reach the first BEGIN event will we start to skip events ... skipEvent = false; // Initial our poll output delay logic ... pollOutputDelay.hasElapsed(); previousOutputMillis = clock.currentTimeInMillis(); // Start the log reader, which starts background threads ... if (isRunning()) { long timeout = context.getConnectorConfig().getConnectionTimeout().toMillis(); long started = context.getClock().currentTimeInMillis(); try { logger.debug("Attempting to establish binlog reader connection with timeout of {} ms", timeout); client.connect(timeout); } catch (TimeoutException e) { // If the client thread is interrupted *before* the client could connect, the client throws a timeout exception // The only way we can distinguish this is if we get the timeout exception before the specified timeout has // elapsed, so we simply check this (within 10%) ... long duration = context.getClock().currentTimeInMillis() - started; if (duration > (0.9 * timeout)) { double actualSeconds = TimeUnit.MILLISECONDS.toSeconds(duration); throw new ConnectException("Timed out after " + actualSeconds + " seconds while waiting to connect to MySQL at " + connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'", e); } // Otherwise, we were told to shutdown, so we don't care about the timeout exception } catch (AuthenticationException e) { throw new ConnectException("Failed to authenticate to the MySQL database at " + connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "'", e); } catch (Throwable e) { throw new ConnectException("Unable to connect to the MySQL database at " + connectionContext.hostname() + ":" + connectionContext.port() + " with user '" + connectionContext.username() + "': " + e.getMessage(), e); } } } @Override protected void doStop() { try { if (client.isConnected()) { logger.debug("Stopping binlog reader '{}', last recorded offset: {}", this.name(), lastOffset); client.disconnect(); } cleanupResources(); } catch (IOException e) { logger.error("Unexpected error when disconnecting from the MySQL binary log reader '{}'", this.name(), e); } } @Override protected void pollComplete(List<SourceRecord> batch) { // Record a bit about this batch ... int batchSize = batch.size(); recordCounter += batchSize; totalRecordCounter.addAndGet(batchSize); if (batchSize > 0) { SourceRecord lastRecord = batch.get(batchSize - 1); lastOffset = lastRecord.sourceOffset(); if (pollOutputDelay.hasElapsed()) { // We want to record the status ... long millisSinceLastOutput = clock.currentTimeInMillis() - previousOutputMillis; try { if (logger.isInfoEnabled()) { context.temporaryLoggingContext("binlog", () -> { logger.info("{} records sent during previous {}, last recorded offset: {}", recordCounter, Strings.duration(millisSinceLastOutput), lastOffset); }); } } finally { recordCounter = 0; previousOutputMillis += millisSinceLastOutput; } } } } //...... }
handleEvent
)及eventDeserializer;其doStart方法初始化eventHandlers,而後設置gtidSet或者binlogFilename及binlogPosition,而後執行client.connect(timeout);其doStop方法執行client.disconnect();其pollComplete方法主要更新recordCounter及totalRecordCounter等metricsBinlogReader繼承了AbstractReader,其構造器建立了BinaryLogClient並設置了registerEventListener(handleEvent
)及eventDeserializer;其doStart方法初始化eventHandlers,而後設置gtidSet或者binlogFilename及binlogPosition,而後執行client.connect(timeout);其doStop方法執行client.disconnect();其pollComplete方法主要更新recordCounter及totalRecordCounter等metricssql