本文主要研究一下debezium的ElapsedTimeStrategyjava
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javagit
@FunctionalInterface public interface ElapsedTimeStrategy { /** * Determine if the time period has elapsed since this method was last called. * * @return {@code true} if this invocation caused the thread to sleep, or {@code false} if this method did not sleep */ boolean hasElapsed(); }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javagithub
public static ElapsedTimeStrategy none() { return () -> true; }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javaide
public static ElapsedTimeStrategy constant(Clock clock, long delayInMilliseconds) { if (delayInMilliseconds <= 0) { throw new IllegalArgumentException("Initial delay must be positive"); } return new ElapsedTimeStrategy() { private long nextTimestamp = 0L; @Override public boolean hasElapsed() { if (nextTimestamp == 0L) { // Initialize ... nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds; return true; } long current = clock.currentTimeInMillis(); if (current >= nextTimestamp) { do { long multiple = 1 + (current - nextTimestamp) / delayInMilliseconds; nextTimestamp += multiple * delayInMilliseconds; } while (current > nextTimestamp); return true; } return false; } }; }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javapost
public static ElapsedTimeStrategy step(Clock clock, long preStepDelayInMilliseconds, BooleanSupplier stepFunction, long postStepDelayInMilliseconds) { if (preStepDelayInMilliseconds <= 0) { throw new IllegalArgumentException("Pre-step delay must be positive"); } if (postStepDelayInMilliseconds <= 0) { throw new IllegalArgumentException("Post-step delay must be positive"); } return new ElapsedTimeStrategy() { private long nextTimestamp = 0L; private boolean elapsed = false; private long delta = 0L; @Override public boolean hasElapsed() { if (nextTimestamp == 0L) { // Initialize ... elapsed = stepFunction.getAsBoolean(); delta = elapsed ? postStepDelayInMilliseconds : preStepDelayInMilliseconds; nextTimestamp = clock.currentTimeInMillis() + delta; return true; } if (!elapsed) { elapsed = stepFunction.getAsBoolean(); if (elapsed) { delta = postStepDelayInMilliseconds; } } long current = clock.currentTimeInMillis(); if (current >= nextTimestamp) { do { assert delta > 0; long multiple = 1 + (current - nextTimestamp) / delta; nextTimestamp += multiple * delta; } while (nextTimestamp <= current); return true; } return false; } }; }
clock.currentTimeInMillis() + delta
,在elapsed爲false時經過stepFunction設置elapsed,若是爲true則更新delta爲postStepDelayInMilliseconds,以後在current小於nextTimestamp返回false,不然更新nextTimestamp,返回truedebezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javathis
public static ElapsedTimeStrategy linear(Clock clock, long delayInMilliseconds) { if (delayInMilliseconds <= 0) { throw new IllegalArgumentException("Initial delay must be positive"); } return new ElapsedTimeStrategy() { private long nextTimestamp = 0L; private long counter = 1L; @Override public boolean hasElapsed() { if (nextTimestamp == 0L) { // Initialize ... nextTimestamp = clock.currentTimeInMillis() + delayInMilliseconds; counter = 1L; return true; } long current = clock.currentTimeInMillis(); if (current >= nextTimestamp) { do { if (counter < Long.MAX_VALUE) { ++counter; } nextTimestamp += (delayInMilliseconds * counter); } while (nextTimestamp <= current); return true; } return false; } }; }
delayInMilliseconds * counter
來遞增nextTimestampdebezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/util/ElapsedTimeStrategy.javacode
public static ElapsedTimeStrategy exponential(Clock clock, long initialDelayInMilliseconds, long maxDelayInMilliseconds, double multiplier) { if (multiplier <= 1.0) { throw new IllegalArgumentException("Multiplier must be greater than 1"); } if (initialDelayInMilliseconds <= 0) { throw new IllegalArgumentException("Initial delay must be positive"); } if (initialDelayInMilliseconds >= maxDelayInMilliseconds) { throw new IllegalArgumentException("Maximum delay must be greater than initial delay"); } return new ElapsedTimeStrategy() { private long nextTimestamp = 0L; private long previousDelay = 0L; @Override public boolean hasElapsed() { if (nextTimestamp == 0L) { // Initialize ... nextTimestamp = clock.currentTimeInMillis() + initialDelayInMilliseconds; previousDelay = initialDelayInMilliseconds; return true; } long current = clock.currentTimeInMillis(); if (current >= nextTimestamp) { do { // Compute how long to delay ... long nextDelay = (long) (previousDelay * multiplier); if (nextDelay >= maxDelayInMilliseconds) { previousDelay = maxDelayInMilliseconds; // If we're not there yet, then we know the increment is linear from here ... if (nextTimestamp < current) { long multiple = 1 + (current - nextTimestamp) / maxDelayInMilliseconds; nextTimestamp += multiple * maxDelayInMilliseconds; } } else { previousDelay = nextDelay; } nextTimestamp += previousDelay; } while (nextTimestamp <= current); return true; } return false; } }; }
multiple * maxDelayInMilliseconds
來遞增nextTimestampElapsedTimeStrategy定義了hasElapsed方法,它提供了none、constant、step、linear、exponential這幾種實現ip