聊聊debezium的OffsetCommitPolicy

本文主要研究一下debezium的OffsetCommitPolicyjava

OffsetCommitPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.javagit

@Incubating
@FunctionalInterface
public interface OffsetCommitPolicy {

    boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit);

    static OffsetCommitPolicy always() {
        return new AlwaysCommitOffsetPolicy();
    }

    static OffsetCommitPolicy periodic(Properties config) {
        return new PeriodicCommitOffsetPolicy(config);
    }

}
  • OffsetCommitPolicy定義了performCommit方法,並提供了always靜態方法用於建立AlwaysCommitOffsetPolicy;提供了periodic靜態方法用於建立PeriodicCommitOffsetPolicy

AlwaysCommitOffsetPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.javagithub

public static class AlwaysCommitOffsetPolicy implements OffsetCommitPolicy {

        @Override
        public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
            return true;
        }
    }
  • AlwaysCommitOffsetPolicy實現了OffsetCommitPolicy接口,其performCommit返回true

PeriodicCommitOffsetPolicy

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/spi/OffsetCommitPolicy.javaapi

public static class PeriodicCommitOffsetPolicy implements OffsetCommitPolicy {

        private final Duration minimumTime;

        public PeriodicCommitOffsetPolicy(Properties config) {
            minimumTime = Duration.ofMillis(Long.valueOf(config.getProperty(DebeziumEngine.OFFSET_FLUSH_INTERVAL_MS_PROP)));
        }

        @Override
        public boolean performCommit(long numberOfMessagesSinceLastCommit, Duration timeSinceLastCommit) {
            return timeSinceLastCommit.compareTo(minimumTime) >= 0;
        }
    }
  • PeriodicCommitOffsetPolicy實現了OffsetCommitPolicy接口,其performCommit經過timeSinceLastCommit.compareTo(minimumTime)進行判斷,大於等於0返回true

RecordCommitter

debezium-v1.1.1.Final/debezium-api/src/main/java/io/debezium/engine/DebeziumEngine.javaide

public static interface RecordCommitter<R> {

        /**
         * Marks a single record as processed, must be called for each
         * record.
         *
         * @param record the record to commit
         */
        void markProcessed(R record) throws InterruptedException;

        /**
         * Marks a batch as finished, this may result in committing offsets/flushing
         * data.
         * <p>
         * Should be called when a batch of records is finished being processed.
         */
        void markBatchFinished();
    }
  • RecordCommitter接口定義了markProcessed、markBatchFinished方法

EmbeddedEngine

debezium-v1.1.1.Final/debezium-embedded/src/main/java/io/debezium/embedded/EmbeddedEngine.javaui

@ThreadSafe
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord> {

    //......

    protected RecordCommitter buildRecordCommitter(OffsetStorageWriter offsetWriter, SourceTask task, Duration commitTimeout) {
        return new RecordCommitter() {

            @Override
            public synchronized void markProcessed(SourceRecord record) throws InterruptedException {
                task.commitRecord(record);
                recordsSinceLastCommit += 1;
                offsetWriter.offset(record.sourcePartition(), record.sourceOffset());
            }

            @Override
            public synchronized void markBatchFinished() {
                maybeFlush(offsetWriter, offsetCommitPolicy, commitTimeout, task);
            }
        };
    }

    protected void maybeFlush(OffsetStorageWriter offsetWriter, OffsetCommitPolicy policy, Duration commitTimeout,
                              SourceTask task) {
        // Determine if we need to commit to offset storage ...
        long timeSinceLastCommitMillis = clock.currentTimeInMillis() - timeOfLastCommitMillis;
        if (policy.performCommit(recordsSinceLastCommit, Duration.ofMillis(timeSinceLastCommitMillis))) {
            commitOffsets(offsetWriter, commitTimeout, task);
        }
    }

    protected void commitOffsets(OffsetStorageWriter offsetWriter, Duration commitTimeout, SourceTask task) {
        long started = clock.currentTimeInMillis();
        long timeout = started + commitTimeout.toMillis();
        if (!offsetWriter.beginFlush()) {
            return;
        }
        Future<Void> flush = offsetWriter.doFlush(this::completedFlush);
        if (flush == null) {
            return; // no offsets to commit ...
        }

        // Wait until the offsets are flushed ...
        try {
            flush.get(Math.max(timeout - clock.currentTimeInMillis(), 0), TimeUnit.MILLISECONDS);
            // if we've gotten this far, the offsets have been committed so notify the task
            task.commit();
            recordsSinceLastCommit = 0;
            timeOfLastCommitMillis = clock.currentTimeInMillis();
        }
        catch (InterruptedException e) {
            logger.warn("Flush of {} offsets interrupted, cancelling", this);
            offsetWriter.cancelFlush();
        }
        catch (ExecutionException e) {
            logger.error("Flush of {} offsets threw an unexpected exception: ", this, e);
            offsetWriter.cancelFlush();
        }
        catch (TimeoutException e) {
            logger.error("Timed out waiting to flush {} offsets to storage", this);
            offsetWriter.cancelFlush();
        }
    }

    //......

}
  • EmbeddedEngine的buildRecordCommitter方法建立了一個匿名RecordCommitter實現,其markBatchFinished方法會執行maybeFlush方法,該方法會經過policy.performCommit方法來判斷是否執行commitOffsets;commitOffsets方法主要執行offsetWriter.doFlush

小結

OffsetCommitPolicy定義了performCommit方法,並提供了always靜態方法用於建立AlwaysCommitOffsetPolicy;提供了periodic靜態方法用於建立PeriodicCommitOffsetPolicythis

doc

相關文章
相關標籤/搜索