聊聊claudb的MasterReplication

本文主要研究一下claudb的MasterReplicationjava

MasterReplication

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/replication/MasterReplication.javagit

public class MasterReplication implements Runnable {

  private static final Logger LOGGER = LoggerFactory.getLogger(MasterReplication.class);

  private static final String SELECT_COMMAND = "SELECT";
  private static final String PING_COMMAND = "PING";
  private static final int TASK_DELAY = 2;

  private final DBServerContext server;
  private final ScheduledExecutorService executor = Executors.newSingleThreadScheduledExecutor();

  public MasterReplication(DBServerContext server) {
    this.server = server;
  }

  public void start() {
    executor.scheduleWithFixedDelay(this, TASK_DELAY, TASK_DELAY, TimeUnit.SECONDS);
  }

  public void stop() {
    executor.shutdown();
  }

  public void addSlave(String id) {
    getServerState().addSlave(id);
    LOGGER.info("new slave: {}", id);
  }

  public void removeSlave(String id) {
    getServerState().removeSlave(id);
    LOGGER.info("slave revomed: {}", id);
  }

  @Override
  public void run() {
    List<RedisToken> commands = createCommands();

    for (SafeString slave : getServerState().getSlaves()) {
      for (RedisToken command : commands) {
        server.publish(slave.toString(), command);
      }
    }
  }

  private List<RedisToken> createCommands() {
    List<RedisToken> commands = new LinkedList<>();
    commands.add(pingCommand());
    commands.addAll(commandsToReplicate());
    return commands;
  }

  private List<RedisToken> commandsToReplicate() {
    List<RedisToken> commands = new LinkedList<>();

    for (RedisToken command : server.getCommandsToReplicate()) {
      command.accept(new AbstractRedisTokenVisitor<Void>() {
        @Override
        public Void array(ArrayRedisToken token) {
          commands.add(selectCommand(token));
          commands.add(command(token));
          return null;
        }
      });
    }
    return commands;
  }

  private RedisToken selectCommand(ArrayRedisToken token) {
    return array(string(SELECT_COMMAND),
        token.getValue().stream().findFirst().orElse(string("0")));
  }

  private RedisToken pingCommand() {
    return array(string(PING_COMMAND));
  }

  private RedisToken command(ArrayRedisToken token) {
    return array(token.getValue().stream().skip(1).collect(toList()));
  }

  private DBServerState getServerState() {
    return serverState().getOrElseThrow(() -> new IllegalStateException("missing server state"));
  }

  private Option<DBServerState> serverState() {
    return server.getValue("state");
  }
}
  • MasterReplication實現了Runnable接口,其start方法調度執行自身的runnable,每隔2秒執行一次;其run方法先執行createCommands方法,而後遍歷slaves,而後遍歷commands,執行server.publish(slave.toString(), command);createCommands先添加ping命令,而後再添加commandsToReplicate;commandsToReplicate方法遍歷server.getCommandsToReplicate(),遇到array方法時先添加select命令,再添加command命令,最後返回commands

getCommandsToReplicate

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.javagithub

public class ClauDB extends RespServerContext implements DBServerContext {

    //......

  @Override
  public ImmutableList<RedisToken> getCommandsToReplicate() {
    return executeOn(Observable.<ImmutableList<RedisToken>>create(observable -> {
      observable.onNext(getState().getCommandsToReplicate());
      observable.onComplete();
    })).blockingFirst();
  }

    //......

}
  • getCommandsToReplicate方法執行的是getState().getCommandsToReplicate()

getCommandsToReplicate

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/DBServerState.javasession

public class DBServerState {

  private static final int RDB_VERSION = 6;

  private static final SafeString SLAVES = safeString("slaves");
  private static final DatabaseKey SLAVES_KEY = safeKey("slaves");
  private static final DatabaseKey SCRIPTS_KEY = safeKey("scripts");

  private boolean master = true;

  private final List<Database> databases = new ArrayList<>();
  private final Database admin;
  private final DatabaseFactory factory;

  private final Queue<RedisToken> queue = new LinkedList<>();

  public void append(RedisToken command) {
    queue.offer(command);
  }

    //......

  public ImmutableList<RedisToken> getCommandsToReplicate() {
    ImmutableList<RedisToken> list = ImmutableList.from(queue);
    queue.clear();
    return list;
  }

    //......

}
  • getCommandsToReplicate方法會根據queue建立ImmutableList,而後清空queue;而append方法會添加command到queue

executeCommand

claudb-1.7.1/src/main/java/com/github/tonivade/claudb/ClauDB.javaapp

public class ClauDB extends RespServerContext implements DBServerContext {

    //......

  protected RedisToken executeCommand(RespCommand command, Request request) {
    if (!isReadOnly(request.getCommand())) {
      try {
        RedisToken response = command.execute(request);
        replication(request);
        notification(request);
        return response;
      } catch (RuntimeException e) {
        LOGGER.error("error executing command: " + request, e);
        return error("error executing command: " + request);
      }
    } else {
      return error("READONLY You can't write against a read only slave");
    }
  }

  private void replication(Request request) {
    if (!isReadOnlyCommand(request.getCommand())) {
      RedisToken array = requestToArray(request);
      if (hasSlaves()) {
        getState().append(array);
      }
      persistence.ifPresent(manager -> manager.append(array));
    }
  }

  @Override
  public void publish(String sourceKey, RedisToken message) {
    Session session = getSession(sourceKey);
    if (session != null) {
      session.publish(message);
    }
  }

    //......

}
  • executeCommand方法除了執行command.execute,還會執行replication方法,它會在有slaves的條件將非readOnlyCommand追加到state;publish方法執行的是session.publish(message)傳輸給slave

小結

MasterReplication實現了Runnable接口,其start方法調度執行自身的runnable,每隔2秒執行一次;其run方法先執行createCommands方法,而後遍歷slaves,而後遍歷commands,執行server.publish(slave.toString(), command);createCommands先添加ping命令,而後再添加commandsToReplicate;commandsToReplicate方法遍歷server.getCommandsToReplicate(),遇到array方法時先添加select命令,再添加command命令,最後返回commandside

doc

相關文章
相關標籤/搜索