本文主要研究下eureka的renewaljava
eureka client在實例化的時候註冊了一個定時任務,每隔renewalIntervalInSecs,向eureka server發送一次renewal。具體詳見聊聊eureka client的HeartbeatThreadnode
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.javasegmentfault
@Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; initializedResponseCache(); scheduleRenewalThresholdUpdateTask(); initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
這裏設定了更新RenewalThreshold的定時任務app
/** * Schedule the task that updates <em>renewal threshold</em> periodically. * The renewal threshold would be used to determine if the renewals drop * dramatically because of network partition and to protect expiring too * many instances at a time. * */ private void scheduleRenewalThresholdUpdateTask() { timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); }
間隔RenewalThresholdUpdateIntervalMs調度一次dom
numberOfRenewsPerMinThreshold更新
)/** * Updates the <em>renewal threshold</em> based on the current number of * renewals. The threshold is a percentage as specified in * {@link EurekaServerConfig#getRenewalPercentThreshold()} of renewals * received per minute {@link #getNumOfRenewsInLastMin()}. */ private void updateRenewalThreshold() { try { Applications apps = eurekaClient.getApplications(); int count = 0; for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { // Update threshold only if the threshold is greater than the // current expected threshold of if the self preservation is disabled. if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * numberOfRenewsPerMinThreshold) || (!this.isSelfPreservationModeEnabled())) { this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold()); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } }
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.javaide
@Override public boolean isLeaseExpirationEnabled() { if (!isSelfPreservationModeEnabled()) { // The self preservation mode is disabled, hence allowing the instances to expire. return true; } return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold; }
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.javapost
public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { // Renewals happen every 30 seconds and for a minute it should be a factor of 2. this.expectedNumberOfRenewsPerMin = count * 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); logger.info("Got {} instances from neighboring DS node", count); logger.info("Renew threshold is: {}", numberOfRenewsPerMinThreshold); this.startupTime = System.currentTimeMillis(); if (count > 0) { this.peerInstancesTransferEmptyOnStartup = false; } DataCenterInfo.Name selfName = applicationInfoManager.getInfo().getDataCenterInfo().getName(); boolean isAws = Name.Amazon == selfName; if (isAws && serverConfig.shouldPrimeAwsReplicaConnections()) { logger.info("Priming AWS connections for all replicas.."); primeAwsReplicas(applicationInfoManager); } logger.info("Changing status to UP"); applicationInfoManager.setInstanceStatus(InstanceStatus.UP); super.postInit(); }
這個方法調用了super.postInit()this
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/AbstractInstanceRegistry.javadebug
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
這裏調度了EvictionTask,時間間隔爲EvictionIntervalTimerInMscode
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/AbstractInstanceRegistry.java
class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l); @Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } /** * compute a compensation time defined as the actual time this task was executed since the prev iteration, * vs the configured amount of time for execution. This is useful for cases where changes in time (due to * clock skew or gc for example) causes the actual eviction task to execute later than the desired time * according to the configured cycle. */ long getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0l) { return 0l; } long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0l ? 0l : compensationTime; } long getCurrentTimeNano() { // for testing return System.nanoTime(); } }
補償時間=當前時間點-task最近一次執行的時間點-清除間隔,小於0則取0
public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); internalCancel(appName, id, false); } } }
若最後更新時間 + lease的存活時間[默認是90s]+補償時間 < 當前時間,則過時。
對於renewal,一方面是client端定時任務主動去續約,一方面是server端不斷更新renewThreshold,而後又有定時任務去清理過時的renewal。