本文主要研究一下debezium的Heartbeatjava
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.javagit
public interface Heartbeat { public static final String HEARTBEAT_INTERVAL_PROPERTY_NAME = "heartbeat.interval.ms"; @FunctionalInterface public static interface OffsetProducer { Map<String, ?> offset(); } void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException; void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException; void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException; boolean isEnabled(); //...... }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/HeartbeatImpl.javagithub
class HeartbeatImpl implements Heartbeat { private static final Logger LOGGER = LoggerFactory.getLogger(HeartbeatImpl.class); private static final SchemaNameAdjuster schemaNameAdjuster = SchemaNameAdjuster.create(LOGGER); /** * Default length of interval in which connector generates periodically * heartbeat messages. A size of 0 disables heartbeat. */ static final int DEFAULT_HEARTBEAT_INTERVAL = 0; /** * Default prefix for names of heartbeat topics */ static final String DEFAULT_HEARTBEAT_TOPICS_PREFIX = "__debezium-heartbeat"; private static final String SERVER_NAME_KEY = "serverName"; private static Schema KEY_SCHEMA = SchemaBuilder.struct() .name(schemaNameAdjuster.adjust("io.debezium.connector.common.ServerNameKey")) .field(SERVER_NAME_KEY, Schema.STRING_SCHEMA) .build(); private static Schema VALUE_SCHEMA = SchemaBuilder.struct() .name(schemaNameAdjuster.adjust("io.debezium.connector.common.Heartbeat")) .field(AbstractSourceInfo.TIMESTAMP_KEY, Schema.INT64_SCHEMA) .build(); private final String topicName; private final Duration heartbeatInterval; private final String key; private volatile Timer heartbeatTimeout; HeartbeatImpl(Configuration configuration, String topicName, String key) { this.topicName = topicName; this.key = key; heartbeatInterval = configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS); heartbeatTimeout = resetHeartbeat(); } @Override public void heartbeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException { if (heartbeatTimeout.expired()) { forcedBeat(partition, offset, consumer); heartbeatTimeout = resetHeartbeat(); } } @Override public void heartbeat(Map<String, ?> partition, OffsetProducer offsetProducer, BlockingConsumer<SourceRecord> consumer) throws InterruptedException { if (heartbeatTimeout.expired()) { forcedBeat(partition, offsetProducer.offset(), consumer); heartbeatTimeout = resetHeartbeat(); } } @Override public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException { LOGGER.debug("Generating heartbeat event"); if (offset == null || offset.isEmpty()) { // Do not send heartbeat message if no offset is available yet return; } consumer.accept(heartbeatRecord(partition, offset)); } @Override public boolean isEnabled() { return true; } private SourceRecord heartbeatRecord(Map<String, ?> sourcePartition, Map<String, ?> sourceOffset) { final Integer partition = 0; return new SourceRecord(sourcePartition, sourceOffset, topicName, partition, KEY_SCHEMA, serverNameKey(key), VALUE_SCHEMA, messageValue()); } private Timer resetHeartbeat() { return Threads.timer(Clock.SYSTEM, heartbeatInterval); } //...... }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/DatabaseHeartbeatImpl.javaide
public class DatabaseHeartbeatImpl extends HeartbeatImpl { private static final Logger LOGGER = LoggerFactory.getLogger(DatabaseHeartbeatImpl.class); public static final String HEARTBEAT_ACTION_QUERY_PROPERTY_NAME = "heartbeat.action.query"; public static final Field HEARTBEAT_ACTION_QUERY = Field.create(HEARTBEAT_ACTION_QUERY_PROPERTY_NAME) .withDisplayName("The query to execute with every heartbeat") .withType(ConfigDef.Type.STRING) .withWidth(ConfigDef.Width.MEDIUM) .withImportance(ConfigDef.Importance.LOW) .withDescription("The query executed with every heartbeat. Defaults to an empty string."); private final String heartBeatActionQuery; private final JdbcConnection jdbcConnection; DatabaseHeartbeatImpl(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection, String heartBeatActionQuery) { super(configuration, topicName, key); this.heartBeatActionQuery = heartBeatActionQuery; this.jdbcConnection = jdbcConnection; } @Override public void forcedBeat(Map<String, ?> partition, Map<String, ?> offset, BlockingConsumer<SourceRecord> consumer) throws InterruptedException { try { jdbcConnection.execute(heartBeatActionQuery); } catch (Exception e) { LOGGER.error("Could not execute heartbeat action", e); } LOGGER.debug("Executed heartbeat action query"); super.forcedBeat(partition, offset, consumer); } }
debezium-v1.1.1.Final/debezium-core/src/main/java/io/debezium/heartbeat/Heartbeat.javaui
public interface Heartbeat { //...... public static Heartbeat create(Configuration configuration, String topicName, String key) { return configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero() ? NULL : new HeartbeatImpl(configuration, topicName, key); } public static Heartbeat create(Configuration configuration, String topicName, String key, JdbcConnection jdbcConnection) { if (configuration.getDuration(HeartbeatImpl.HEARTBEAT_INTERVAL, ChronoUnit.MILLIS).isZero()) { return NULL; } String heartBeatActionQuery = configuration.getString(DatabaseHeartbeatImpl.HEARTBEAT_ACTION_QUERY); if (heartBeatActionQuery != null) { return new DatabaseHeartbeatImpl(configuration, topicName, key, jdbcConnection, heartBeatActionQuery); } return new HeartbeatImpl(configuration, topicName, key); } //...... }
HeartbeatImpl實現了Heartbeat接口,其heartbeat方法在heartbeatTimeout.expired()時執行forcedBeat,而後執行resetHeartbeat;其forcedBeat方法執行consumer.accept(heartbeatRecord(partition, offset));其isEnabled返回true;heartbeatRecord方法建立SourceRecord並返回;resetHeartbeat方法返回Threads.timer(Clock.SYSTEM, heartbeatInterval);DatabaseHeartbeatImpl繼承了HeartbeatImpl,其forcedBeat方法執行jdbcConnection.execute(heartBeatActionQuery),而後再執行super.forcedBeat(partition, offset, consumer)this