本文主要研究一下maxwell的Recoveryjava
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/recovery/Recovery.javamysql
public class Recovery {
static final Logger LOGGER = LoggerFactory.getLogger(Recovery.class);
private final ConnectionPool replicationConnectionPool;
private final RecoveryInfo recoveryInfo;
private final MaxwellMysqlConfig replicationConfig;
private final String maxwellDatabaseName;
private final RecoverySchemaStore schemaStore;
public Recovery(MaxwellMysqlConfig replicationConfig,
String maxwellDatabaseName,
ConnectionPool replicationConnectionPool,
CaseSensitivity caseSensitivity,
RecoveryInfo recoveryInfo) {
this.replicationConfig = replicationConfig;
this.replicationConnectionPool = replicationConnectionPool;
this.recoveryInfo = recoveryInfo;
this.schemaStore = new RecoverySchemaStore(replicationConnectionPool, maxwellDatabaseName, caseSensitivity);
this.maxwellDatabaseName = maxwellDatabaseName;
}
public HeartbeatRowMap recover() throws Exception {
String recoveryMsg = String.format(
"old-server-id: %d, position: %s",
recoveryInfo.serverID,
recoveryInfo.position
);
LOGGER.warn("attempting to recover from master-change: " + recoveryMsg);
List<BinlogPosition> list = getBinlogInfo();
for ( int i = list.size() - 1; i >= 0 ; i-- ) {
BinlogPosition binlogPosition = list.get(i);
Position position = Position.valueOf(binlogPosition, recoveryInfo.getHeartbeat());
Metrics metrics = new NoOpMetrics();
LOGGER.debug("scanning binlog: " + binlogPosition);
Replicator replicator = new BinlogConnectorReplicator(
this.schemaStore,
null,
null,
replicationConfig,
0L, // server-id of 0 activates "mysqlbinlog" behavior where the server will stop after each binlog
maxwellDatabaseName,
metrics,
position,
true,
recoveryInfo.clientID,
new HeartbeatNotifier(),
null,
new RecoveryFilter(this.maxwellDatabaseName),
new MaxwellOutputConfig(),
0.25f // Default memory usage size, not used
);
HeartbeatRowMap h = findHeartbeat(replicator);
if ( h != null ) {
LOGGER.warn("recovered new master position: " + h.getNextPosition());
return h;
}
}
LOGGER.error("Could not recover from master-change: " + recoveryMsg);
return null;
}
/**
* try to find a given heartbeat value from the replicator.
* @return A BinlogPosition where the heartbeat was found, or null if none was found.
*/
private HeartbeatRowMap findHeartbeat(Replicator r) throws Exception {
r.startReplicator();
for (RowMap row = r.getRow(); row != null ; row = r.getRow()) {
if (!(row instanceof HeartbeatRowMap)) {
continue;
}
HeartbeatRowMap heartbeatRow = (HeartbeatRowMap) row;
if (heartbeatRow.getPosition().getLastHeartbeatRead() == recoveryInfo.getHeartbeat())
return heartbeatRow;
}
return null;
}
/**
* fetch a list of binlog positions representing the start of each binlog file
*
* @return a list of binlog positions to attempt recovery at
* */
private List<BinlogPosition> getBinlogInfo() throws SQLException {
ArrayList<BinlogPosition> list = new ArrayList<>();
try ( Connection c = replicationConnectionPool.getConnection() ) {
ResultSet rs = c.createStatement().executeQuery("SHOW BINARY LOGS");
while ( rs.next() ) {
list.add(BinlogPosition.at(4, rs.getString("Log_name")));
}
}
return list;
}
}
複製代碼
maxwell-1.25.1/src/main/java/com/zendesk/maxwell/Maxwell.javagit
public class Maxwell implements Runnable {
protected MaxwellConfig config;
protected MaxwellContext context;
protected Replicator replicator;
static final Logger LOGGER = LoggerFactory.getLogger(Maxwell.class);
public Maxwell(MaxwellConfig config) throws SQLException, URISyntaxException {
this(new MaxwellContext(config));
}
protected Maxwell(MaxwellContext context) throws SQLException, URISyntaxException {
this.config = context.getConfig();
this.context = context;
this.context.probeConnections();
}
//......
private Position attemptMasterRecovery() throws Exception {
HeartbeatRowMap recoveredHeartbeat = null;
MysqlPositionStore positionStore = this.context.getPositionStore();
RecoveryInfo recoveryInfo = positionStore.getRecoveryInfo(config);
if ( recoveryInfo != null ) {
Recovery masterRecovery = new Recovery(
config.replicationMysql,
config.databaseName,
this.context.getReplicationConnectionPool(),
this.context.getCaseSensitivity(),
recoveryInfo
);
recoveredHeartbeat = masterRecovery.recover();
if (recoveredHeartbeat != null) {
// load up the schema from the recovery position and chain it into the
// new server_id
MysqlSchemaStore oldServerSchemaStore = new MysqlSchemaStore(
context.getMaxwellConnectionPool(),
context.getReplicationConnectionPool(),
context.getSchemaConnectionPool(),
recoveryInfo.serverID,
recoveryInfo.position,
context.getCaseSensitivity(),
config.filter,
false
);
// Note we associate this schema to the start position of the heartbeat event, so that
// we pick it up when resuming at the event after the heartbeat.
oldServerSchemaStore.clone(context.getServerID(), recoveredHeartbeat.getPosition());
return recoveredHeartbeat.getNextPosition();
}
}
return null;
}
//......
}
複製代碼
Recovery提供了recover方法,它先經過getBinlogInfo方法獲取BinlogPosition列表,以後從後往前遍歷BinlogPosition構建BinlogConnectorReplicator,而後最後經過findHeartbeat方法查找heartbeatRow.getPosition().getLastHeartbeatRead()爲recoveryInfo.getHeartbeat()的HeartbeatRowMap,若是不爲null則直接返回github