本文主要研究一下claudb的MasterReplicationjava
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/MasterReplication.javagit
public class MasterReplication implements Runnable { private static final Logger LOGGER = LoggerFactory.getLogger(MasterReplication.class); private static final String SELECT_COMMAND = "SELECT"; private static final String PING_COMMAND = "PING"; private static final int TASK_DELAY = 2; private final DBServerContext server; private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor(); public MasterReplication(DBServerContext server) { this.server = server; } public void start() { executor.scheduleWithFixedDelay(this, TASK_DELAY, TASK_DELAY, TimeUnit.SECONDS); } public void stop() { executor.shutdown(); } public void addSlave(String id) { getServerState().addSlave(id); LOGGER.info("new slave: {}", id); } public void removeSlave(String id) { getServerState().removeSlave(id); LOGGER.info("slave revomed: {}", id); } @Override public void run() { List<RedisToken> commands = createCommands(); for (SafeString slave : getServerState().getSlaves()) { for (RedisToken command : commands) { server.publish(slave.toString(), command); } } } private List<RedisToken> createCommands() { List<RedisToken> commands = new LinkedList<>(); commands.add(pingCommand()); commands.addAll(commandsToReplicate()); return commands; } private List<RedisToken> commandsToReplicate() { List<RedisToken> commands = new LinkedList<>(); for (RedisToken command : server.getCommandsToReplicate()) { command.accept(new AbstractRedisTokenVisitor<Void>() { @Override public Void array(ArrayRedisToken token) { commands.add(selectCommand(token)); commands.add(command(token)); return null; } }); } return commands; } private RedisToken selectCommand(ArrayRedisToken token) { return array(string(SELECT_COMMAND), token.getValue().stream().findFirst().orElse(string("0"))); } private RedisToken pingCommand() { return array(string(PING_COMMAND)); } private RedisToken command(ArrayRedisToken token) { return array(token.getValue().stream().skip(1).collect(toList())); } private DBServerState getServerState() { return serverState().getOrElseThrow(() -> new IllegalStateException("missing server state")); } private Option<DBServerState> serverState() { return server.getValue("state"); } }
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.javagithub
public class ClauDB extends RespServerContext implements DBServerContext { //...... @Override public ImmutableList<RedisToken> getCommandsToReplicate() { return executeOn(Observable.<ImmutableList<RedisToken>>create(observable -> { observable.onNext(getState().getCommandsToReplicate()); observable.onComplete(); })).blockingFirst(); } //...... }
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.javasession
public class DBServerState { private static final int RDB_VERSION = 6; private static final SafeString SLAVES = safeString("slaves"); private static final DatabaseKey SLAVES_KEY = safeKey("slaves"); private static final DatabaseKey SCRIPTS_KEY = safeKey("scripts"); private boolean master = true; private final List<Database> databases = new ArrayList<>(); private final Database admin; private final DatabaseFactory factory; private final Queue<RedisToken> queue = new LinkedList<>(); public void append(RedisToken command) { queue.offer(command); } //...... public ImmutableList<RedisToken> getCommandsToReplicate() { ImmutableList<RedisToken> list = ImmutableList.from(queue); queue.clear(); return list; } //...... }
claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.javaapp
public class ClauDB extends RespServerContext implements DBServerContext { //...... protected RedisToken executeCommand(RespCommand command, Request request) { if (!isReadOnly(request.getCommand())) { try { RedisToken response = command.execute(request); replication(request); notification(request); return response; } catch (RuntimeException e) { LOGGER.error("error executing command: " + request, e); return error("error executing command: " + request); } } else { return error("READONLY You can't write against a read only slave"); } } private void replication(Request request) { if (!isReadOnlyCommand(request.getCommand())) { RedisToken array = requestToArray(request); if (hasSlaves()) { getState().append(array); } persistence.ifPresent(manager -> manager.append(array)); } } @Override public void publish(String sourceKey, RedisToken message) { Session session = getSession(sourceKey); if (session != null) { session.publish(message); } } //...... }
MasterReplication實現了Runnable接口,其start方法調度執行自身的runnable,每隔2秒執行一次;其run方法先執行createCommands方法,而後遍歷slaves,而後遍歷commands,執行server.publish(slave.toString(), command);createCommands先添加ping命令,而後再添加commandsToReplicate;commandsToReplicate方法遍歷server.getCommandsToReplicate(),遇到array方法時先添加select命令,再添加command命令,最後返回commandside