本文主要研究一下debezium的OffsetCommitPolicyjava
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.javagit
@Incubating @FunctionalInterface public interface OffsetCommitPolicy { boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit); static OffsetCommitPolicy always() { return new AlwaysCommitOffsetPolicy(); } static OffsetCommitPolicy periodic(Properties config) { return new PeriodicCommitOffsetPolicy(config); } }
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.javagithub
public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy { @Override public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) { return true; } }
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.javaapi
public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy { private final Duration minimumTime; public PeriodicCommitOffsetPolicy(Properties config) { minimumTime = Duration.ofMillis(Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP))); } @Override public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) { return timeSinceLastCommit.compareTo(minimumTime) >= 0; } }
debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.javaide
public static interface RecordCommitter<R> { /** * Marks a single record as processed, must be called for each * record. * * @param record the record to commit */ void markProcessed(R record) throws InterruptedException; /** * Marks a batch as finished, this may result in committing offsets/flushing * data. * <p> * Should be called when a batch of records is finished being processed. */ void markBatchFinished(); }
debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.javaui
@ThreadSafe public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> { //...... protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter, SourceTask task, Duration commitTimeout) { return new RecordCommitter() { @Override public synchronized void markProcessed(SourceRecord record) throws InterruptedException { task.commitRecord(record); recordsSinceLastCommit += 1; offsetWriter.offset(record.sourcePartition(), record.sourceOffset()); } @Override public synchronized void markBatchFinished() { maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeout, task); } }; } protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout, SourceTask task) { // Determine if we need to commit to offset storage ... long timeSinceLastCommitMillis = clock.currentTimeInMillis() - timeOfLastCommitMillis; if (policy.performCommit(recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))) { commitOffsets(offsetWriter, commitTimeout, task); } } protected void commitOffsets(OffsetStorageWriter offsetWriter, Duration commitTimeout, SourceTask task) { long started = clock.currentTimeInMillis(); long timeout = started + commitTimeout.toMillis(); if (!offsetWriter.beginFlush()) { return; } Future<Void> flush = offsetWriter.doFlush(this::completedFlush); if (flush == null) { return; // no offsets to commit ... } // Wait until the offsets are flushed ... try { flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS); // if we've gotten this far, the offsets have been committed so notify the task task.commit(); recordsSinceLastCommit = 0; timeOfLastCommitMillis = clock.currentTimeInMillis(); } catch (InterruptedException e) { logger.warn("Flush of {} offsets interrupted, cancelling", this); offsetWriter.cancelFlush(); } catch (ExecutionException e) { logger.error("Flush of {} offsets threw an unexpected exception: ", this, e); offsetWriter.cancelFlush(); } catch (TimeoutException e) { logger.error("Timed out waiting to flush {} offsets to storage", this); offsetWriter.cancelFlush(); } } //...... }
OffsetCommitPolicy定義了performCommit方法,並提供了always靜態方法用於建立AlwaysCommitOffsetPolicy;提供了periodic靜態方法用於建立PeriodicCommitOffsetPolicythis