本文主要研究一下debezium的ChangeEventQueuejava
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueueMetrics.javagit
public interface ChangeEventQueueMetrics { int totalCapacity(); int remainingCapacity(); }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/connector/base/ChangeEventQueue.javagithub
public class ChangeEventQueue<T> implements ChangeEventQueueMetrics { private static final Logger LOGGER = LoggerFactory.getLogger(ChangeEventQueue.class); private final Duration pollInterval; private final int maxBatchSize; private final int maxQueueSize; private final BlockingQueue<T> queue; private final Metronome metronome; private final Supplier<PreviousContext> loggingContextSupplier; private volatile RuntimeException producerException; private ChangeEventQueue(Duration pollInterval, int maxQueueSize, int maxBatchSize, Supplier<LoggingContext.PreviousContext> loggingContextSupplier) { this.pollInterval = pollInterval; this.maxBatchSize = maxBatchSize; this.maxQueueSize = maxQueueSize; this.queue = new LinkedBlockingDeque<>(maxQueueSize); this.metronome = Metronome.sleeper(pollInterval, Clock.SYSTEM); this.loggingContextSupplier = loggingContextSupplier; } public static class Builder<T> { private Duration pollInterval; private int maxQueueSize; private int maxBatchSize; private Supplier<LoggingContext.PreviousContext> loggingContextSupplier; public Builder<T> pollInterval(Duration pollInterval) { this.pollInterval = pollInterval; return this; } public Builder<T> maxQueueSize(int maxQueueSize) { this.maxQueueSize = maxQueueSize; return this; } public Builder<T> maxBatchSize(int maxBatchSize) { this.maxBatchSize = maxBatchSize; return this; } public Builder<T> loggingContextSupplier(Supplier<LoggingContext.PreviousContext> loggingContextSupplier) { this.loggingContextSupplier = loggingContextSupplier; return this; } public ChangeEventQueue<T> build() { return new ChangeEventQueue<T>(pollInterval, maxQueueSize, maxBatchSize, loggingContextSupplier); } } /** * Enqueues a record so that it can be obtained via {@link #poll()}. This method * will block if the queue is full. * * @param record * the record to be enqueued * @throws InterruptedException * if this thread has been interrupted */ public void enqueue(T record) throws InterruptedException { if (record == null) { return; } // The calling thread has been interrupted, let's abort if (Thread.interrupted()) { throw new InterruptedException(); } if (LOGGER.isDebugEnabled()) { LOGGER.debug("Enqueuing source record '{}'", record); } // this will also raise an InterruptedException if the thread is interrupted while waiting for space in the queue queue.put(record); } /** * Returns the next batch of elements from this queue. May be empty in case no * elements have arrived in the maximum waiting time. * * @throws InterruptedException * if this thread has been interrupted while waiting for more * elements to arrive */ public List<T> poll() throws InterruptedException { LoggingContext.PreviousContext previousContext = loggingContextSupplier.get(); try { LOGGER.debug("polling records..."); List<T> records = new ArrayList<>(); final Timer timeout = Threads.timer(Clock.SYSTEM, Temporals.max(pollInterval, ConfigurationDefaults.RETURN_CONTROL_INTERVAL)); while (!timeout.expired() && queue.drainTo(records, maxBatchSize) == 0) { throwProducerExceptionIfPresent(); LOGGER.debug("no records available yet, sleeping a bit..."); // no records yet, so wait a bit metronome.pause(); LOGGER.debug("checking for more records..."); } return records; } finally { previousContext.restore(); } } public void producerException(final RuntimeException producerException) { this.producerException = producerException; } private void throwProducerExceptionIfPresent() { if (producerException != null) { throw producerException; } } @Override public int totalCapacity() { return maxQueueSize; } @Override public int remainingCapacity() { return queue.remainingCapacity(); } }
queue.drainTo(records, maxBatchSize) == 0
爲false,最後執行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Threads.javaide
public class Threads { //...... public static interface TimeSince { /** * Reset the elapsed time to 0. */ void reset(); /** * Get the time that has elapsed since the last call to {@link #reset() reset}. * * @return the number of milliseconds */ long elapsedTime(); } public static interface Timer { /** * @return true if current time is greater than start time plus requested time period */ boolean expired(); Duration remaining(); } public static Timer timer(Clock clock, Duration time) { final TimeSince start = timeSince(clock); start.reset(); return new Timer() { @Override public boolean expired() { return start.elapsedTime() > time.toMillis(); } @Override public Duration remaining() { return time.minus(start.elapsedTime(), ChronoUnit.MILLIS); } }; } public static TimeSince timeSince(Clock clock) { return new TimeSince() { private long lastTimeInMillis; @Override public void reset() { lastTimeInMillis = clock.currentTimeInMillis(); } @Override public long elapsedTime() { long elapsed = clock.currentTimeInMillis() - lastTimeInMillis; return elapsed <= 0L ? 0L : elapsed; } }; } //...... }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/LoggingContext.javaui
public class LoggingContext { /** * The key for the connector type MDC property. */ public static final String CONNECTOR_TYPE = "dbz.connectorType"; /** * The key for the connector logical name MDC property. */ public static final String CONNECTOR_NAME = "dbz.connectorName"; /** * The key for the connector context name MDC property. */ public static final String CONNECTOR_CONTEXT = "dbz.connectorContext"; private LoggingContext() { } /** * A snapshot of an MDC context that can be {@link #restore()}. */ public static final class PreviousContext { private static final Map<String, String> EMPTY_CONTEXT = Collections.emptyMap(); private final Map<String, String> context; protected PreviousContext() { Map<String, String> context = MDC.getCopyOfContextMap(); this.context = context != null ? context : EMPTY_CONTEXT; } /** * Restore this logging context. */ public void restore() { MDC.setContextMap(context); } } //...... }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/Metronome.javathis
@FunctionalInterface public interface Metronome { public void pause() throws InterruptedException; public static Metronome sleeper(Duration period, Clock timeSystem) { long periodInMillis = period.toMillis(); return new Metronome() { private long next = timeSystem.currentTimeInMillis() + periodInMillis; @Override public void pause() throws InterruptedException { for (;;) { final long now = timeSystem.currentTimeInMillis(); if (next <= now) { break; } Thread.sleep(next - now); } next = next + periodInMillis; } @Override public String toString() { return "Metronome (sleep for " + periodInMillis + " ms)"; } }; } //...... }
ChangeEventQueue實現了ChangeEventQueueMetrics接口,其構造器建立了BlockingQueue、Metronome,並接收了loggingContextSupplier;其enqueue方法執行queue.put(record);其poll方法先經過loggingContextSupplier.get()獲取previousContext,以後建立timeout,並while循環執行queue.drainTo(records, maxBatchSize)及metronome.pause(),直到timeout.expired()或者queue.drainTo(records, maxBatchSize) == 0
爲false,最後執行previousContext.restore();其totalCapacity返回maxQueueSize;其remainingCapacity返回queue.remainingCapacity()spa