本文主要研究一下maxwell的BinlogConnectorDiagnosticjava
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/monitoring/MaxwellDiagnostic.javamysql
public interface MaxwellDiagnostic { String getName(); boolean isMandatory(); String getResource(); CompletableFuture<MaxwellDiagnosticResult.Check> check(); }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/monitoring/MaxwellDiagnosticResult.javagit
public class MaxwellDiagnosticResult { private final boolean success; private final boolean mandatoryFailed; private final List<Check> checks; public MaxwellDiagnosticResult(List<Check> checks) { success = checks.stream().allMatch(Check::isSuccess); mandatoryFailed = checks.stream().anyMatch(check -> !check.success && check.mandatory); this.checks = checks; } public boolean isSuccess() { return success; } public boolean isMandatoryFailed() { return mandatoryFailed; } public List<Check> getChecks() { return checks; } public static class Check { private final String name; private final boolean success; private final boolean mandatory; private final String resource; private final Map<String, String> info; public Check(MaxwellDiagnostic diagnostic, boolean success, Map<String, String> info) { this.name = diagnostic.getName(); this.success = success; this.mandatory = diagnostic.isMandatory(); this.resource = diagnostic.getResource(); this.info = info; } public String getName() { return name; } public boolean isSuccess() { return success; } public boolean isMandatory() { return mandatory; } public String getResource() { return resource; } public Map<String, String> getInfo() { return info; } } }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorDiagnostic.javagithub
public class BinlogConnectorDiagnostic implements MaxwellDiagnostic { private final MaxwellContext context; public BinlogConnectorDiagnostic(MaxwellContext context) { this.context = context; } @Override public String getName() { return "binlog-connector"; } @Override public CompletableFuture<MaxwellDiagnosticResult.Check> check() { return getLatency().thenApply(this::normalResult).exceptionally(this::exceptionResult); } @Override public boolean isMandatory() { return true; } @Override public String getResource() { MaxwellMysqlConfig mysql = context.getConfig().maxwellMysql; return mysql.host + ":" + mysql.port; } public CompletableFuture<Long> getLatency() { HeartbeatObserver observer = new HeartbeatObserver(context.getHeartbeatNotifier(), Clock.systemUTC()); try { context.heartbeat(); } catch (Exception e) { observer.fail(e); } return observer.latency; } private MaxwellDiagnosticResult.Check normalResult(Long latency) { Map<String, String> info = new HashMap<>(); info.put("message", "Binlog replication lag is " + latency.toString() + "ms"); return new MaxwellDiagnosticResult.Check(this, true, info); } private MaxwellDiagnosticResult.Check exceptionResult(Throwable e) { Map<String, String> info = new HashMap<>(); info.put("error", e.getCause().toString()); return new MaxwellDiagnosticResult.Check(this, false, info); } //...... }
binlog-connector
;其isMandatory方法返回true;其getResource方法返回的是mysql的host和port信息;其check方法經過getLatency方法獲取latency的CompletableFuture,成功時經過normalResult轉換,異常經過exceptionResult轉換;getLatency方法建立了HeartbeatObserver並註冊到HeartbeatNotifier中,而後執行context.heartbeat(),若出現異常執行observer.fail(e),最後返回observer.latencymaxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorDiagnostic.javasql
static class HeartbeatObserver implements Observer { final CompletableFuture<Long> latency; private final HeartbeatNotifier notifier; private final Clock clock; HeartbeatObserver(HeartbeatNotifier notifier, Clock clock) { this.notifier = notifier; this.clock = clock; this.latency = new CompletableFuture<>(); this.latency.whenComplete((value, exception) -> close()); notifier.addObserver(this); } @Override public void update(Observable o, Object arg) { long heartbeatReadTime = clock.millis(); long latestHeartbeat = (long) arg; latency.complete(heartbeatReadTime - latestHeartbeat); } void fail(Exception e) { latency.completeExceptionally(e); } private void close() { notifier.deleteObserver(this); } }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/HeartbeatNotifier.javaide
public class HeartbeatNotifier extends Observable { protected void heartbeat(long heartbeat) { setChanged(); notifyObservers(heartbeat); } }
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/replication/BinlogConnectorReplicator.javaoop
public class BinlogConnectorReplicator extends RunLoopProcess implements Replicator { //...... private RowMap processHeartbeats(RowMap row) { String hbClientID = (String) row.getData("client_id"); if ( !Objects.equals(hbClientID, this.clientID) ) return row; // plain row -- do not process. long lastHeartbeatRead = (Long) row.getData("heartbeat"); LOGGER.debug("replicator picked up heartbeat: " + lastHeartbeatRead); this.lastHeartbeatPosition = row.getPosition().withHeartbeat(lastHeartbeatRead); heartbeatNotifier.heartbeat(lastHeartbeatRead); return HeartbeatRowMap.valueOf(row.getDatabase(), this.lastHeartbeatPosition, row.getNextPosition().withHeartbeat(lastHeartbeatRead)); } //...... }
BinlogConnectorDiagnostic實現了MaxwellDiagnostic接口,它定義了MaxwellContext屬性,其getName返回binlog-connector
;其isMandatory方法返回true;其getResource方法返回的是mysql的host和port信息;其check方法經過getLatency方法獲取latency的CompletableFuture,成功時經過normalResult轉換,異常經過exceptionResult轉換;getLatency方法建立了HeartbeatObserver並註冊到HeartbeatNotifier中,而後執行context.heartbeat(),若出現異常執行observer.fail(e),最後返回observer.latencythis