本文主要研究一下nacos Service的processClientBeatjava
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/core/Service.javagit
public class Service extends com.alibaba.nacos.api.naming.pojo.Service implements Record, RecordListener<Instances> { private static final String SERVICE_NAME_SYNTAX = "[0-9a-zA-Z@\\.:_-]+"; @JSONField(serialize = false) private ClientBeatCheckTask clientBeatCheckTask = new ClientBeatCheckTask(this); private String token; private List<String> owners = new ArrayList<>(); private Boolean resetWeight = false; private Boolean enabled = true; private Selector selector = new NoneSelector(); private String namespaceId; /** * IP will be deleted if it has not send beat for some time, default timeout is 30 seconds. */ private long ipDeleteTimeout = 30 * 1000; private volatile long lastModifiedMillis = 0L; private volatile String checksum; /** * TODO set customized push expire time: */ private long pushCacheMillis = 0L; private Map<String, Cluster> clusterMap = new HashMap<>(); //...... public void processClientBeat(final RsInfo rsInfo) { ClientBeatProcessor clientBeatProcessor = new ClientBeatProcessor(); clientBeatProcessor.setService(this); clientBeatProcessor.setRsInfo(rsInfo); HealthCheckReactor.scheduleNow(clientBeatProcessor); } //...... }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/ClientBeatProcessor.javagithub
public class ClientBeatProcessor implements Runnable { public static final long CLIENT_BEAT_TIMEOUT = TimeUnit.SECONDS.toMillis(15); private RsInfo rsInfo; private Service service; @JSONField(serialize = false) public PushService getPushService() { return SpringContext.getAppContext().getBean(PushService.class); } public RsInfo getRsInfo() { return rsInfo; } public void setRsInfo(RsInfo rsInfo) { this.rsInfo = rsInfo; } public Service getService() { return service; } public void setService(Service service) { this.service = service; } @Override public void run() { Service service = this.service; if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] processing beat: {}", rsInfo.toString()); } String ip = rsInfo.getIp(); String clusterName = rsInfo.getCluster(); int port = rsInfo.getPort(); Cluster cluster = service.getClusterMap().get(clusterName); List<Instance> instances = cluster.allIPs(true); for (Instance instance : instances) { if (instance.getIp().equals(ip) && instance.getPort() == port) { if (Loggers.EVT_LOG.isDebugEnabled()) { Loggers.EVT_LOG.debug("[CLIENT-BEAT] refresh beat: {}", rsInfo.toString()); } instance.setLastBeat(System.currentTimeMillis()); if (!instance.isMarked()) { if (!instance.isHealthy()) { instance.setHealthy(true); Loggers.EVT_LOG.info("service: {} {POS} {IP-ENABLED} valid: {}:{}@{}, region: {}, msg: client beat ok", cluster.getService().getName(), ip, port, cluster.getName(), UtilsAndCommons.LOCALHOST_SITE); getPushService().serviceChanged(service); } } } } } }
nacos-1.1.3/naming/src/main/java/com/alibaba/nacos/naming/healthcheck/HealthCheckReactor.javaapi
public class HealthCheckReactor { private static final ScheduledExecutorService EXECUTOR; private static Map<String, ScheduledFuture> futureMap = new ConcurrentHashMap<>(); static { int processorCount = Runtime.getRuntime().availableProcessors(); EXECUTOR = Executors .newScheduledThreadPool(processorCount <= 1 ? 1 : processorCount / 2, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setDaemon(true); thread.setName("com.alibaba.nacos.naming.health"); return thread; } }); } public static ScheduledFuture<?> scheduleCheck(HealthCheckTask task) { task.setStartTime(System.currentTimeMillis()); return EXECUTOR.schedule(task, task.getCheckRTNormalized(), TimeUnit.MILLISECONDS); } public static void scheduleCheck(ClientBeatCheckTask task) { futureMap.putIfAbsent(task.taskKey(), EXECUTOR.scheduleWithFixedDelay(task, 5000, 5000, TimeUnit.MILLISECONDS)); } public static void cancelCheck(ClientBeatCheckTask task) { ScheduledFuture scheduledFuture = futureMap.get(task.taskKey()); if (scheduledFuture == null) { return; } try { scheduledFuture.cancel(true); } catch (Exception e) { Loggers.EVT_LOG.error("[CANCEL-CHECK] cancel failed!", e); } } public static ScheduledFuture<?> scheduleNow(Runnable task) { return EXECUTOR.schedule(task, 0, TimeUnit.MILLISECONDS); } }