public class MasterElection { private static final Logger SWITCH_LOGGER = LogUtils.SWITCH_LOGGER; private static volatile ServerState state = ServerState.BACKUPING; public static void election(final CountDownLatch cdl) { final CuratorFramework client = ZkUtils.getCuratorClient(); final LeaderSelector selector = new LeaderSelector(client, Constants.MASTER_PATH, new LeaderSelectorListenerAdapter() { @Override public void takeLeadership(CuratorFramework curatorFramework) throws Exception { SWITCH_LOGGER.info("take master leadership"); long seekTimestamp = MetaService.getSeekTimestamp(); long zkSeekTimestamp = MetaService.getZkSeekTimestamp(); final long sleepMs = 200; long sleepCount = 0; // 若是zk上的數據丟失了, 則zkSeekTimestamp爲0, 此時chronos則被block住 while (seekTimestamp < zkSeekTimestamp && zkSeekTimestamp > 0) { SWITCH_LOGGER.info("sleep {}ms to wait seekTimestamp:{} to catch up with zkSeekTimestamp:{}", sleepMs, seekTimestamp, zkSeekTimestamp); TimeUnit.MILLISECONDS.sleep(sleepMs); seekTimestamp = MetaService.getSeekTimestamp(); zkSeekTimestamp = MetaService.getZkSeekTimestamp(); sleepCount++; } state = ServerState.MASTERING; SWITCH_LOGGER.info("change server state to {}, totalSleepMs:{}ms", state, sleepCount * sleepMs); cdl.await(); state = ServerState.BACKUPING; SWITCH_LOGGER.info("release master leadership"); } }); selector.autoRequeue(); selector.start(); } public static boolean isMaster() { return state == ServerState.MASTERING; } public static boolean isBackup() { return state == ServerState.BACKUPING; } public static void standAlone() { state = ServerState.MASTERING; } public static ServerState getState() { return state; } }
public class ChronosStartup { private static final Logger LOGGER = LoggerFactory.getLogger(ChronosStartup.class); private CountDownLatch waitForShutdown; private String configFilePath = "chronos.yaml"; private PullWorker pullWorker; private PushWorker pushWorker; private DeleteBgWorker deleteBgWorker; private NettyHttpServer nettyHttpServer; ChronosStartup(final String configFilePath) { if (StringUtils.isNotBlank(configFilePath)) { this.configFilePath = configFilePath; } } public void start() throws Exception { LOGGER.info("start to launch chronos..."); final long start = System.currentTimeMillis(); Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { LOGGER.info("start to stop chronos..."); final long start = System.currentTimeMillis(); ChronosStartup.this.stop(); final long cost = System.currentTimeMillis() - start; LOGGER.info("succ stop chronos, cost:{}ms", cost); } catch (Exception e) { LOGGER.error("error while shutdown chronos, err:{}", e.getMessage(), e); } finally { /* shutdown log4j2 */ LogManager.shutdown(); } } }); /* 注意: 如下初始化順序有前後次序 */ /* init config */ ConfigManager.initConfig(configFilePath); /* init metrics */ if (!MetricService.init()) { System.exit(-1); } /* init rocksdb */ RDB.init(ConfigManager.getConfig().getDbConfig().getDbPath()); /* init zk */ ZkUtils.init(); /* init seektimestamp */ MetaService.load(); waitForShutdown = new CountDownLatch(1); if (ConfigManager.getConfig().isStandAlone()) { /* standalone */ MasterElection.standAlone(); } else { /* 集羣模式 master election */ MasterElection.election(waitForShutdown); } /* init pull worker */ if (ConfigManager.getConfig().isPullOn()) { pullWorker = PullWorker.getInstance(); pullWorker.start(); } /* init push worker */ if (ConfigManager.getConfig().isPushOn()) { pushWorker = PushWorker.getInstance(); pushWorker.start(); } /* init delete worker */ if (ConfigManager.getConfig().isDeleteOn()) { deleteBgWorker = DeleteBgWorker.getInstance(); deleteBgWorker.start(); } final long cost = System.currentTimeMillis() - start; LOGGER.info("succ start chronos, cost:{}ms", cost); /* init http server */ nettyHttpServer = NettyHttpServer.getInstance(); nettyHttpServer.start(); waitForShutdown.await(); } void stop() { /* shutdown netty http server */ if (nettyHttpServer != null) { nettyHttpServer.shutdown(); } /* stop pull from MQ */ if (pullWorker != null) { pullWorker.stop(); } /* stop push to MQ */ if (pushWorker != null) { pushWorker.stop(); } /* stop delete */ if (deleteBgWorker != null) { deleteBgWorker.stop(); } MqConsumeStatService.getInstance().stop(); /* close zk client */ ZkUtils.close(); /* close rocksdb */ RDB.close(); if (waitForShutdown != null) { waitForShutdown.countDown(); waitForShutdown = null; } } }
MasterElection提供了election、isMaster、isBackup、standAlone、getState方法;其中election方法使用的是curator recipes的LeaderSelector,其LeaderSelectorListenerAdapter的takeLeadership方法會先獲取seekTimestamp、zkSeekTimestamp,使用while循環直到seekTimestamp大於等於zkSeekTimestamp,以後更新state爲ServerState.MASTERING,而後調用CountDownLatch的await方法,以後就是更新state爲ServerState.BACKUPING,釋放leadership;建立LeaderSelector以後調用其autoRequeue及start方法segmentfault