聊聊debezium的Heartbeat

本文主要研究一下debezium的Heartbeatjava

Heartbeat

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.javagit

public interface Heartbeat {

    public static final String HEARTBEAT_INTERVAL_PROPERTY_NAME = "heartbeat.interval.ms";

    @FunctionalInterface
    public static interface OffsetProducer {
        Map<String, ?> offset();
    }

    void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;

    void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;

    void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException;

    boolean isEnabled();

    //......

 }
  • Heartbeat定義了OffsetProducer接口,該接口定義了offset方法;它還定義了heartbeat、forcedBeat、isEnabled方法

HeartbeatImpl

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.javagithub

class HeartbeatImpl implements Heartbeat {

    private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class);
    private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER);

    /**
     * Default length of interval in which connector generates periodically
     * heartbeat messages. A size of 0 disables heartbeat.
     */
    static final int DEFAULT_HEARTBEAT_INTERVAL = 0;

    /**
     * Default prefix for names of heartbeat topics
     */
    static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat";

    private static final String SERVER_NAME_KEY = "serverName";

    private static Schema KEY_SCHEMA = SchemaBuilder.struct()
            .name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey"))
            .field(SERVER_NAME_KEY, Schema.STRING_SCHEMA)
            .build();
    private static Schema VALUE_SCHEMA = SchemaBuilder.struct()
            .name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat"))
            .field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA)
            .build();

    private final String topicName;
    private final Duration heartbeatInterval;
    private final String key;

    private volatile Timer heartbeatTimeout;

    HeartbeatImpl(Configuration configuration, String topicName, String key) {
        this.topicName = topicName;
        this.key = key;

        heartbeatInterval = configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS);
        heartbeatTimeout = resetHeartbeat();
    }

    @Override
    public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        if (heartbeatTimeout.expired()) {
            forcedBeat(partition, offset, consumer);
            heartbeatTimeout = resetHeartbeat();
        }
    }

    @Override
    public void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        if (heartbeatTimeout.expired()) {
            forcedBeat(partition, offsetProducer.offset(), consumer);
            heartbeatTimeout = resetHeartbeat();
        }
    }

    @Override
    public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer)
            throws InterruptedException {
        LOGGER.debug("Generating heartbeat event");
        if (offset == null || offset.isEmpty()) {
            // Do not send heartbeat message if no offset is available yet
            return;
        }
        consumer.accept(heartbeatRecord(partition, offset));
    }

    @Override
    public boolean isEnabled() {
        return true;
    }

    private SourceRecord heartbeatRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) {
        final Integer partition = 0;

        return new SourceRecord(sourcePartition, sourceOffset,
                topicName, partition, KEY_SCHEMA, serverNameKey(key), VALUE_SCHEMA, messageValue());
    }

    private Timer resetHeartbeat() {
        return Threads.timer(Clock.SYSTEM, heartbeatInterval);
    }

    //......

}
  • HeartbeatImpl實現了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()時執行forcedBeat,而後執行resetHeartbeat;其forcedBeat方法執行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法建立SourceRecord並返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval)

DatabaseHeartbeatImpl

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/DatabaseHeartbeatImpl.javaide

public class DatabaseHeartbeatImpl extends HeartbeatImpl {
    private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHeartbeatImpl.class);

    public static final String HEARTBEAT_ACTION_QUERY_PROPERTY_NAME = "heartbeat.action.query";

    public static final Field HEARTBEAT_ACTION_QUERY = Field.create(HEARTBEAT_ACTION_QUERY_PROPERTY_NAME)
            .withDisplayName("The query to execute with every heartbeat")
            .withType(ConfigDef.Type.STRING)
            .withWidth(ConfigDef.Width.MEDIUM)
            .withImportance(ConfigDef.Importance.LOW)
            .withDescription("The query executed with every heartbeat. Defaults to an empty string.");

    private final String heartBeatActionQuery;
    private final JdbcConnection jdbcConnection;

    DatabaseHeartbeatImpl(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery) {
        super(configuration, topicName, key);

        this.heartBeatActionQuery = heartBeatActionQuery;
        this.jdbcConnection = jdbcConnection;
    }

    @Override
    public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException {
        try {
            jdbcConnection.execute(heartBeatActionQuery);
        }
        catch (Exception e) {
            LOGGER.error("Could not execute heartbeat action", e);
        }
        LOGGER.debug("Executed heartbeat action query");

        super.forcedBeat(partition, offset, consumer);
    }
}
  • DatabaseHeartbeatImpl繼承了HeartbeatImpl,其forcedBeat方法執行jdbcConnection.execute(heartBeatActionQuery),而後再執行super.forcedBeat(partition, offset, consumer)

Heartbeat.create

debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.javaui

public interface Heartbeat {

    //......

    public static Heartbeat create(Configuration configuration, String topicName, String key) {
        return configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero() ? NULL : new HeartbeatImpl(configuration, topicName, key);
    }

    public static Heartbeat create(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection) {
        if (configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero()) {
            return NULL;
        }

        String heartBeatActionQuery = configuration.getString(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY);

        if (heartBeatActionQuery != null) {
            return new DatabaseHeartbeatImpl(configuration, topicName, key, jdbcConnection, heartBeatActionQuery);
        }

        return new HeartbeatImpl(configuration, topicName, key);
    }

    //......

}
  • Heartbeat提供了兩個create靜態方法,一個用於建立HeartbeatImpl,另一個在heartBeatActionQuery不爲hull時建立DatabaseHeartbeatImpl

小結

HeartbeatImpl實現了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()時執行forcedBeat,而後執行resetHeartbeat;其forcedBeat方法執行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法建立SourceRecord並返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval);DatabaseHeartbeatImpl繼承了HeartbeatImpl,其forcedBeat方法執行jdbcConnection.execute(heartBeatActionQuery),而後再執行super.forcedBeat(partition, offset, consumer)this

doc

相關文章
相關標籤/搜索