聊聊debezium的ChangeEventQueue

本文主要研究一下debezium的ChangeEventQueuejava

ChangeEventQueueMetrics

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.javagit

public interface ChangeEventQueueMetrics {

    int totalCapacity();

    int remainingCapacity();
}
  • ChangeEventQueueMetrics接口定義了totalCapacity、remainingCapacity方法

ChangeEventQueue

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.javagithub

public class ChangeEventQueue<T> implements ChangeEventQueueMetrics {

    private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class);

    private final Duration pollInterval;
    private final int maxBatchSize;
    private final int maxQueueSize;
    private final BlockingQueue<T> queue;
    private final Metronome metronome;
    private final Supplier<PreviousContext> loggingContextSupplier;

    private volatile RuntimeException producerException;

    private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
        this.pollInterval = pollInterval;
        this.maxBatchSize = maxBatchSize;
        this.maxQueueSize = maxQueueSize;
        this.queue = new LinkedBlockingDeque<>(maxQueueSize);
        this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM);
        this.loggingContextSupplier = loggingContextSupplier;
    }

    public static class Builder<T> {

        private Duration pollInterval;
        private int maxQueueSize;
        private int maxBatchSize;
        private Supplier<LoggingContext.PreviousContext> loggingContextSupplier;

        public Builder<T> pollInterval(Duration pollInterval) {
            this.pollInterval = pollInterval;
            return this;
        }

        public Builder<T> maxQueueSize(int maxQueueSize) {
            this.maxQueueSize = maxQueueSize;
            return this;
        }

        public Builder<T> maxBatchSize(int maxBatchSize) {
            this.maxBatchSize = maxBatchSize;
            return this;
        }

        public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext> loggingContextSupplier) {
            this.loggingContextSupplier = loggingContextSupplier;
            return this;
        }

        public ChangeEventQueue<T> build() {
            return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier);
        }
    }

    /**
     * Enqueues a record so that it can be obtained via {@link #poll()}. This method
     * will block if the queue is full.
     *
     * @param record
     *            the record to be enqueued
     * @throws InterruptedException
     *             if this thread has been interrupted
     */
    public void enqueue(T record) throws InterruptedException {
        if (record == null) {
            return;
        }

        // The calling thread has been interrupted, let's abort
        if (Thread.interrupted()) {
            throw new InterruptedException();
        }

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("Enqueuing source record '{}'", record);
        }

        // this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue
        queue.put(record);
    }

    /**
     * Returns the next batch of elements from this queue. May be empty in case no
     * elements have arrived in the maximum waiting time.
     *
     * @throws InterruptedException
     *             if this thread has been interrupted while waiting for more
     *             elements to arrive
     */
    public List<T> poll() throws InterruptedException {
        LoggingContext.PreviousContext previousContext = loggingContextSupplier.get();

        try {
            LOGGER.debug("polling records...");
            List<T> records = new ArrayList<>();
            final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL));
            while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) {
                throwProducerExceptionIfPresent();

                LOGGER.debug("no records available yet, sleeping a bit...");
                // no records yet, so wait a bit
                metronome.pause();
                LOGGER.debug("checking for more records...");
            }
            return records;
        }
        finally {
            previousContext.restore();
        }
    }

    public void producerException(final RuntimeException producerException) {
        this.producerException = producerException;
    }

    private void throwProducerExceptionIfPresent() {
        if (producerException != null) {
            throw producerException;
        }
    }

    @Override
    public int totalCapacity() {
        return maxQueueSize;
    }

    @Override
    public int remainingCapacity() {
        return queue.remainingCapacity();
    }
}
  • ChangeEventQueue實現了ChangeEventQueueMetrics接口,其構造器建立了BlockingQueue、Metronome,並接收了loggingContextSupplier;其enqueue方法執行queue.put(record);其poll方法先經過loggingContextSupplier.get()獲取previousContext,以後建立timeout,並while循環執行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0爲false,最後執行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()

Threads

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Threads.javaide

public class Threads {

    //......

    public static interface TimeSince {
        /**
         * Reset the elapsed time to 0.
         */
        void reset();

        /**
         * Get the time that has elapsed since the last call to {@link #reset() reset}.
         *
         * @return the number of milliseconds
         */
        long elapsedTime();
    }

    public static interface Timer {

        /**
         * @return true if current time is greater than start time plus requested time period
         */
        boolean expired();

        Duration remaining();
    }

    public static Timer timer(Clock clock, Duration time) {
        final TimeSince start = timeSince(clock);
        start.reset();

        return new Timer() {

            @Override
            public boolean expired() {
                return start.elapsedTime() > time.toMillis();
            }

            @Override
            public Duration remaining() {
                return time.minus(start.elapsedTime(), ChronoUnit.MILLIS);
            }
        };
    }

    public static TimeSince timeSince(Clock clock) {
        return new TimeSince() {
            private long lastTimeInMillis;

            @Override
            public void reset() {
                lastTimeInMillis = clock.currentTimeInMillis();
            }

            @Override
            public long elapsedTime() {
                long elapsed = clock.currentTimeInMillis() - lastTimeInMillis;
                return elapsed <= 0L ? 0L : elapsed;
            }
        };
    }

    //......

}
  • Threads定義了Timer接口,該接口定義了expired、remaining方法;timer方法先經過timeSince建立TimeSince,而後建立一個匿名Timer

LoggingContext

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/LoggingContext.javaui

public class LoggingContext {

    /**
     * The key for the connector type MDC property.
     */
    public static final String CONNECTOR_TYPE = "dbz.connectorType";
    /**
     * The key for the connector logical name MDC property.
     */
    public static final String CONNECTOR_NAME = "dbz.connectorName";
    /**
     * The key for the connector context name MDC property.
     */
    public static final String CONNECTOR_CONTEXT = "dbz.connectorContext";

    private LoggingContext() {
    }

    /**
     * A snapshot of an MDC context that can be {@link #restore()}.
     */
    public static final class PreviousContext {
        private static final Map<String, String> EMPTY_CONTEXT = Collections.emptyMap();
        private final Map<String, String> context;

        protected PreviousContext() {
            Map<String, String> context = MDC.getCopyOfContextMap();
            this.context = context != null ? context : EMPTY_CONTEXT;
        }

        /**
         * Restore this logging context.
         */
        public void restore() {
            MDC.setContextMap(context);
        }
    }

    //......

}
  • LoggingContext定義了PreviousContext,其構造器使用MDC.getCopyOfContextMap()拷貝的當前的MDC,其restore方法把以前拷貝的MDC數據再次設置到MDC中

Metronome

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Metronome.javathis

@FunctionalInterface
public interface Metronome {

    public void pause() throws InterruptedException;

    public static Metronome sleeper(Duration period, Clock timeSystem) {
        long periodInMillis = period.toMillis();
        return new Metronome() {
            private long next = timeSystem.currentTimeInMillis() + periodInMillis;

            @Override
            public void pause() throws InterruptedException {
                for (;;) {
                    final long now = timeSystem.currentTimeInMillis();
                    if (next <= now) {
                        break;
                    }
                    Thread.sleep(next - now);
                }
                next = next + periodInMillis;
            }

            @Override
            public String toString() {
                return "Metronome (sleep for " + periodInMillis + " ms)";
            }
        };
    }

    //......

}
  • Metronome接口定義了pause方法;它提供了sleeper靜態方法用於建立匿名的Metronome實現類,該實現類的pause方法經過Thread.sleep來實現pause

小結

ChangeEventQueue實現了ChangeEventQueueMetrics接口,其構造器建立了BlockingQueue、Metronome,並接收了loggingContextSupplier;其enqueue方法執行queue.put(record);其poll方法先經過loggingContextSupplier.get()獲取previousContext,以後建立timeout,並while循環執行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0爲false,最後執行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()spa

doc

相關文章
相關標籤/搜索