eureka-server
是經過提供rest
接口來跟eureka-client
交互,咱們重點關注跟eureka-client
相關的幾個接口。java
註冊的uri
是GET /apps
。入口在ApplicationResource
。node
@Path("/{version}/apps") @Produces({"application/xml", "application/json"}) public class ApplicationsResource { private final ResponseCache responseCache; @GET public Response getContainers(@PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_WITH_REMOTE_REGIONS.increment(); } // Check if the server allows the access to the registry. The server can // restrict access if it is not // ready to serve traffic depending on various reasons. if (!registry.shouldAllowAccess(isRemoteRegionRequested)) { return Response.status(Status.FORBIDDEN).build(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); Response response; if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { response = Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { response = Response.ok(responseCache.get(cacheKey)) .build(); } return response; } }
version
爲eureka
版本號,最新API默認爲V2
,Spring Cloud Netfix Eureka 跟 Spring Boot 適配以後,提供的 REST API 與原始的 REST API 有一點點不一樣,其路徑中的 version
值固定爲eureka
。responseCache
裏獲取,從key
的建立參數上能夠看到,緩存里根據key
分紅了不少個類別。public class ResponseCacheImpl implements ResponseCache { private final ConcurrentMap<Key, Value> readOnlyCacheMap = new ConcurrentHashMap<Key, Value>(); private final LoadingCache<Key, Value> readWriteCacheMap; private final AbstractInstanceRegistry registry; public String get(final Key key) { return get(key, shouldUseReadOnlyResponseCache); } @VisibleForTesting String get(final Key key, boolean useReadOnlyCache) { Value payload = getValue(key, useReadOnlyCache); if (payload == null || payload.getPayload().equals(EMPTY_PAYLOAD)) { return null; } else { return payload.getPayload(); } } @VisibleForTesting Value getValue(final Key key, boolean useReadOnlyCache) { Value payload = null; try { if (useReadOnlyCache) { final Value currentPayload = readOnlyCacheMap.get(key); if (currentPayload != null) { payload = currentPayload; } else { payload = readWriteCacheMap.get(key); readOnlyCacheMap.put(key, payload); } } else { payload = readWriteCacheMap.get(key); } } catch (Throwable t) { logger.error("Cannot get value for key : {}", key, t); } return payload; } private Value generatePayload(Key key) { Stopwatch tracer = null; try { String payload; switch (key.getEntityType()) { case Application: boolean isRemoteRegionRequested = key.hasRegions(); if (ALL_APPS.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeAllAppsWithRemoteRegionTimer.start(); payload = getPayLoad(key, registry.getApplicationsFromMultipleRegions(key.getRegions())); } else { tracer = serializeAllAppsTimer.start(); payload = getPayLoad(key, registry.getApplications()); } } else if (ALL_APPS_DELTA.equals(key.getName())) { if (isRemoteRegionRequested) { tracer = serializeDeltaAppsWithRemoteRegionTimer.start(); versionDeltaWithRegions.incrementAndGet(); versionDeltaWithRegionsLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltasFromMultipleRegions(key.getRegions())); } else { tracer = serializeDeltaAppsTimer.start(); versionDelta.incrementAndGet(); versionDeltaLegacy.incrementAndGet(); payload = getPayLoad(key, registry.getApplicationDeltas()); } } else { tracer = serializeOneApptimer.start(); payload = getPayLoad(key, registry.getApplication(key.getName())); } break; case VIP: case SVIP: tracer = serializeViptimer.start(); payload = getPayLoad(key, getApplicationsForVip(key, registry)); break; default: logger.error("Unidentified entity type: {} found in the cache key.", key.getEntityType()); payload = ""; break; } return new Value(payload); } finally { if (tracer != null) { tracer.stop(); } } } }
eureka
爲了保證性能,引入了三級緩存機制:web
readOnlyCacheMap
,是ConcurrentHashMap
,springcloud項目能夠經過配置eureka.server.use-read-only-response-cache
來指定是否開啓,默認爲true
。readWriteCacheMap
,是guava cache
。register
存儲在線服務信息,ConcurrentHashMap<String,ConcurrentHashMap<String, Lease<InstanceInfo>>>
類型,存儲格式爲ConcurrentHashMap<服務名,ConcurrentHashMap<機器ID(默認爲「IP:服務名:配置的端口」), 服務信息(Lease)>>
,例如<SERVICE-TEST,ConcurrentHashMap<localhost:SERVICE-TEST:6666, Lease<InstanceInfo>>>
。總體流程:spring
useReadOnlyCache
,則從readOnlyCacheMap
獲取數據。useReadOnlyCache
,或者readOnlyCacheMap
獲取不到數據,則從readWriteCacheMap
獲取;獲取到數據後,若是開啓了useReadOnlyCache
,則將結果設置到readOnlyCacheMap
中。readWriteCacheMap
獲取不到數據,則從register
中獲取數據,獲取到數據後,將結果設置到readWriteCacheMap
中。public class ResponseCacheImpl implements ResponseCache { ResponseCacheImpl(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, AbstractInstanceRegistry registry) { this.serverConfig = serverConfig; this.serverCodecs = serverCodecs; this.shouldUseReadOnlyResponseCache = serverConfig.shouldUseReadOnlyResponseCache(); this.registry = registry; long responseCacheUpdateIntervalMs = serverConfig.getResponseCacheUpdateIntervalMs(); this.readWriteCacheMap = CacheBuilder.newBuilder().initialCapacity(serverConfig.getInitialCapacityOfResponseCache()) .expireAfterWrite(serverConfig.getResponseCacheAutoExpirationInSeconds(), TimeUnit.SECONDS) .removalListener(new RemovalListener<Key, Value>() { @Override public void onRemoval(RemovalNotification<Key, Value> notification) { Key removedKey = notification.getKey(); if (removedKey.hasRegions()) { Key cloneWithNoRegions = removedKey.cloneWithoutRegions(); regionSpecificKeys.remove(cloneWithNoRegions, removedKey); } } }) .build(new CacheLoader<Key, Value>() { @Override public Value load(Key key) throws Exception { if (key.hasRegions()) { Key cloneWithNoRegions = key.cloneWithoutRegions(); regionSpecificKeys.put(cloneWithNoRegions, key); } Value value = generatePayload(key); return value; } }); if (shouldUseReadOnlyResponseCache) { timer.schedule(getCacheUpdateTask(), new Date(((System.currentTimeMillis() / responseCacheUpdateIntervalMs) * responseCacheUpdateIntervalMs) + responseCacheUpdateIntervalMs), responseCacheUpdateIntervalMs); } try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry", e); } } private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } } } }; } }
readOnlyCacheMap
,是經過定時任務,定時從readWriteCacheMap
讀取數據刷新值,定時任務默認30秒
間隔。readWriteCacheMap
,經過guava cache
自身的機制來實現過時並刷新,默認是從寫入時間起算,30秒以後過時
。註冊的uri
是GET /apps/delta
。入口在ApplicationResource
。json
@Path("/{version}/apps") @Produces({"application/xml", "application/json"}) public class ApplicationsResource { @Path("delta") @GET public Response getContainerDifferential( @PathParam("version") String version, @HeaderParam(HEADER_ACCEPT) String acceptHeader, @HeaderParam(HEADER_ACCEPT_ENCODING) String acceptEncoding, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept, @Context UriInfo uriInfo, @Nullable @QueryParam("regions") String regionsStr) { boolean isRemoteRegionRequested = null != regionsStr && !regionsStr.isEmpty(); // If the delta flag is disabled in discovery or if the lease expiration // has been disabled, redirect clients to get all instances if ((serverConfig.shouldDisableDelta()) || (!registry.shouldAllowAccess(isRemoteRegionRequested))) { return Response.status(Status.FORBIDDEN).build(); } String[] regions = null; if (!isRemoteRegionRequested) { EurekaMonitors.GET_ALL_DELTA.increment(); } else { regions = regionsStr.toLowerCase().split(","); Arrays.sort(regions); // So we don't have different caches for same regions queried in different order. EurekaMonitors.GET_ALL_DELTA_WITH_REMOTE_REGIONS.increment(); } CurrentRequestVersion.set(Version.toEnum(version)); KeyType keyType = Key.KeyType.JSON; String returnMediaType = MediaType.APPLICATION_JSON; if (acceptHeader == null || !acceptHeader.contains(HEADER_JSON_VALUE)) { keyType = Key.KeyType.XML; returnMediaType = MediaType.APPLICATION_XML; } Key cacheKey = new Key(Key.EntityType.Application, ResponseCacheImpl.ALL_APPS_DELTA, keyType, CurrentRequestVersion.get(), EurekaAccept.fromString(eurekaAccept), regions ); if (acceptEncoding != null && acceptEncoding.contains(HEADER_GZIP_VALUE)) { return Response.ok(responseCache.getGZIP(cacheKey)) .header(HEADER_CONTENT_ENCODING, HEADER_GZIP_VALUE) .header(HEADER_CONTENT_TYPE, returnMediaType) .build(); } else { return Response.ok(responseCache.get(cacheKey)) .build(); } } }
增量的邏輯基本上跟全量同樣,差異在於cacheKey
的KeyType
爲ResponseCacheImpl.ALL_APPS_DELTA
。ResponseCache
裏,全量獲取是經過registry.getApplications()
遍歷registry
的Map
獲取全部的服務;而增量的registry.getApplicationDeltas()
方法,則是有專門的queue
存儲最近變化的信息。緩存
public abstract class AbstractInstanceRegistry implements InstanceRegistry { public Applications getApplicationDeltas() { GET_ALL_CACHE_MISS_DELTA.increment(); Applications apps = new Applications(); apps.setVersion(responseCache.getVersionDelta().get()); Map<String, Application> applicationInstancesMap = new HashMap<String, Application>(); try { write.lock(); Iterator<RecentlyChangedItem> iter = this.recentlyChangedQueue.iterator(); logger.debug("The number of elements in the delta queue is : {}", this.recentlyChangedQueue.size()); while (iter.hasNext()) { Lease<InstanceInfo> lease = iter.next().getLeaseInfo(); InstanceInfo instanceInfo = lease.getHolder(); logger.debug( "The instance id {} is found with status {} and actiontype {}", instanceInfo.getId(), instanceInfo.getStatus().name(), instanceInfo.getActionType().name()); Application app = applicationInstancesMap.get(instanceInfo .getAppName()); if (app == null) { app = new Application(instanceInfo.getAppName()); applicationInstancesMap.put(instanceInfo.getAppName(), app); apps.addApplication(app); } app.addInstance(new InstanceInfo(decorateInstanceInfo(lease))); } boolean disableTransparentFallback = serverConfig.disableTransparentFallbackToOtherRegion(); if (!disableTransparentFallback) { Applications allAppsInLocalRegion = getApplications(false); for (RemoteRegionRegistry remoteRegistry : this.regionNameVSRemoteRegistry.values()) { Applications applications = remoteRegistry.getApplicationDeltas(); for (Application application : applications.getRegisteredApplications()) { Application appInLocalRegistry = allAppsInLocalRegion.getRegisteredApplications(application.getName()); if (appInLocalRegistry == null) { apps.addApplication(application); } } } } Applications allApps = getApplications(!disableTransparentFallback); apps.setAppsHashCode(allApps.getReconcileHashCode()); return apps; } finally { write.unlock(); } } }
recentlyChangedQueue
由定時任務定時清理過時數據,定時任務30
秒執行一次,清理180
秒以前的數據。安全
public abstract class AbstractInstanceRegistry implements InstanceRegistry { protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) { this.serverConfig = serverConfig; this.clientConfig = clientConfig; this.serverCodecs = serverCodecs; this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000); this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000); this.renewsLastMin = new MeasuredRate(1000 * 60 * 1); this.deltaRetentionTimer.schedule(getDeltaRetentionTask(), serverConfig.getDeltaRetentionTimerIntervalInMs(), serverConfig.getDeltaRetentionTimerIntervalInMs()); } private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; } }
獲取服務總體流程以下:
網絡
註冊的uri
是POST /apps/{serviceId}
。入口在ApplicationResource
。app
@Produces({"application/xml", "application/json"}) public class ApplicationResource { private static final Logger logger = LoggerFactory.getLogger(ApplicationResource.class); private final PeerAwareInstanceRegistry registry; /** * Registers information about a particular instance for an * {@link com.netflix.discovery.shared.Application}. * * @param info * {@link InstanceInfo} information of the instance. * @param isReplication * a header parameter containing information whether this is * replicated from other nodes. */ @POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // validate that the instanceinfo contains all the necessary required fields if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // handle cases where clients may be registering with bad DataCenterInfo with missing data DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible } }
InstanceInfo info
是服務節點信息,String isReplication
用來標識是否集羣內其餘eureka-server
節點發送過來的複製信息。接口主要代碼在PeerAwareInstanceRegistry.register
。dom
@Singleton public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); } }
PeerAwareInstanceRegistry.register
主要兩件事,一是調用父類的register
,再就是向集羣內其餘節點傳播註冊信息。先看下父類的註冊方法。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>(); protected volatile ResponseCache responseCache; public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { read.lock(); Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // Retain the last dirty timestamp without overwriting it, if there is already a lease if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); // this is a > instead of a >= because if the timestamps are equal, we still take the remote transmitted // InstanceInfo instead of the server local copy. if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { // The lease does not exist and hence it is a new registration synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } logger.debug("No previous lease information found; it is new registration"); } Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } } }
方法用lock
保證線程安全,大概流程以下:
register
,register
存儲在線服務信息,ConcurrentHashMap<String,ConcurrentHashMap<String, Lease<InstanceInfo>>>
類型,存儲格式爲ConcurrentHashMap<服務名,ConcurrentHashMap<機器ID(默認爲「IP:服務名:配置的端口」), 服務信息(Lease)>>
,例如<SERVICE-TEST,ConcurrentHashMap<localhost:SERVICE-TEST:6666, Lease<InstanceInfo>>>
。最近註冊隊列-recentRegisteredQueue
中。最近變動隊列-recentlyChangedQueue
中。eureka
爲了保證性能,引入了三級緩存機制(後續會講到)。進入清理緩存看下:
protected volatile ResponseCache responseCache; private void invalidateCache(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { // invalidate cache responseCache.invalidate(appName, vipAddress, secureVipAddress); }
實際清理代碼在ResponseCache
內。
public class ResponseCacheImpl implements ResponseCache { private final LoadingCache<Key, Value> readWriteCacheMap; @Override public void invalidate(String appName, @Nullable String vipAddress, @Nullable String secureVipAddress) { for (Key.KeyType type : Key.KeyType.values()) { for (Version v : Version.values()) { invalidate( new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, appName, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS, type, v, EurekaAccept.compact), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.full), new Key(Key.EntityType.Application, ALL_APPS_DELTA, type, v, EurekaAccept.compact) ); if (null != vipAddress) { invalidate(new Key(Key.EntityType.VIP, vipAddress, type, v, EurekaAccept.full)); } if (null != secureVipAddress) { invalidate(new Key(Key.EntityType.SVIP, secureVipAddress, type, v, EurekaAccept.full)); } } } } public void invalidate(Key... keys) { for (Key key : keys) { logger.debug("Invalidating the response cache key : {} {} {} {}, {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(key); Collection<Key> keysWithRegions = regionSpecificKeys.get(key); if (null != keysWithRegions && !keysWithRegions.isEmpty()) { for (Key keysWithRegion : keysWithRegions) { logger.debug("Invalidating the response cache key : {} {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType(), key.getEurekaAccept()); readWriteCacheMap.invalidate(keysWithRegion); } } } } ,}
再回到PeerAwareInstanceRegistry.replicateToPeers
看怎麼傳播信息到集羣其餘節點。
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // If it is a replication already, do not replicate again as this will create a poison replication if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } } private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
Action
爲Register
,進入到PeerEurekaNode.register
方法
public class PeerEurekaNode { public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); } }
傳播節點是經過TaskExecutors
異步執行(也就是即便當eureka-client
成功調用註冊接口,也並不表明全部的eureka-server
都已經有這個client
的信息),最終執行方法是HttpReplicationClient.register
,HttpReplicationClient
默認是JerseyReplicationClient
。
public class JerseyReplicationClient extends AbstractJerseyEurekaHttpClient implements HttpReplicationClient { @Override protected void addExtraHeaders(Builder webResource) { webResource.header(PeerEurekaNode.HEADER_REPLICATION, "true"); } }
JerseyReplicationClient
是AbstractJerseyEurekaHttpClient
,因此傳播實際上也是調用其餘節點的註冊接口,只是額外在頭部添加了x-netflix-discovery-replication
爲true
,而經過前面的代碼@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication
,咱們知道若是isReplication
爲true
時,表示這是一個集羣其餘節點的複製信息
,就不會再作傳播這一步。
因爲傳播是異步進行的,異步有失敗的風險,這個在續約(renew)
時會作補充。
註冊的uri
是PUT apps/{serviceId}/{instanceId}
。入口在InstanceResource
。
public class InstanceResource { public Response renewLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) { boolean isFromReplicaNode = "true".equals(isReplication); boolean isSuccess = registry.renew(app.getName(), id, isFromReplicaNode); // Not found in the registry, immediately ask for a register if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } // Check if we need to sync based on dirty time stamp, the client // instance might have changed some value Response response; if (lastDirtyTimestamp != null && serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); // Store the overridden status since the validation found out the node that replicates wins if (response.getStatus() == Response.Status.NOT_FOUND.getStatusCode() && (overriddenStatus != null) && !(InstanceStatus.UNKNOWN.name().equals(overriddenStatus)) && isFromReplicaNode) { registry.storeOverriddenStatusIfRequired(app.getAppName(), id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", app.getName(), id, response.getStatus()); return response; } }
renewLease
方法主要作兩件事:
registry.renew
執行續約。lastDirtyTimestamp
。404
。public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { public boolean renew(final String appName, final String id, final boolean isReplication) { if (super.renew(appName, id, isReplication)) { replicateToPeers(Action.Heartbeat, appName, id, null, null, isReplication); return true; } return false; } }
跟註冊同樣,續約也是調用父類的方法,而後傳播到集羣其餘節點。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { public boolean renew(String appName, String id, boolean isReplication) { RENEW.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToRenew = null; if (gMap != null) { leaseToRenew = gMap.get(id); } if (leaseToRenew == null) { RENEW_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: lease doesn't exist, registering resource: {} - {}", appName, id); return false; } else { InstanceInfo instanceInfo = leaseToRenew.getHolder(); if (instanceInfo != null) { // touchASGCache(instanceInfo.getASGName()); InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus( instanceInfo, leaseToRenew, isReplication); if (overriddenInstanceStatus == InstanceStatus.UNKNOWN) { logger.info("Instance status UNKNOWN possibly due to deleted override for instance {}" + "; re-register required", instanceInfo.getId()); RENEW_NOT_FOUND.increment(isReplication); return false; } if (!instanceInfo.getStatus().equals(overriddenInstanceStatus)) { logger.info( "The instance status {} is different from overridden instance status {} for instance {}. " + "Hence setting the status to overridden status", instanceInfo.getStatus().name(), instanceInfo.getOverriddenStatus().name(), instanceInfo.getId()); instanceInfo.setStatusWithoutDirty(overriddenInstanceStatus); } } renewsLastMin.increment(); leaseToRenew.renew(); return true; } } } public class Lease<T> { private volatile long lastUpdateTimestamp; public void renew() { lastUpdateTimestamp = System.currentTimeMillis() + duration; } }
Lease
,若是沒有,就返回失敗。最終狀態
不一致,則用最終狀態
覆蓋調當前狀態。傳播續租
給集羣其餘節點,跟註冊也同樣:異步調用其餘節點的續租
接口,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication
參數爲true
。
註冊的uri
是DELETE apps/{serviceId}/{instanceId}
。入口在InstanceResource
。
public class InstanceResource { @DELETE public Response cancelLease( @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { try { boolean isSuccess = registry.cancel(app.getName(), id, "true".equals(isReplication)); if (isSuccess) { logger.debug("Found (Cancel): {} - {}", app.getName(), id); return Response.ok().build(); } else { logger.info("Not Found (Cancel): {} - {}", app.getName(), id); return Response.status(Status.NOT_FOUND).build(); } } catch (Throwable e) { logger.error("Error (cancel): {} - {}", app.getName(), id, e); return Response.serverError().build(); } } }
一樣的,registry
是PeerAwareInstanceRegistryImpl
。
public class PeerAwareInstanceRegistryImpl extends AbstractInstanceRegistry implements PeerAwareInstanceRegistry { @Override public boolean cancel(final String appName, final String id, final boolean isReplication) { if (super.cancel(appName, id, isReplication)) { replicateToPeers(Action.Cancel, appName, id, null, null, isReplication); synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to cancel it, reduce the number of clients to send renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews - 1; updateRenewsPerMinThreshold(); } } return true; } return false; } protected void updateRenewsPerMinThreshold() { this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); } }
PeerAwareInstanceRegistryImpl
除了調用父類的cancel
、傳播信息給集羣其餘節點以外,還會從新計算numberOfRenewsPerMinThreshold
,numberOfRenewsPerMinThreshold
在自我保護機子裏會用到。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { @Override public boolean cancel(String appName, String id, boolean isReplication) { return internalCancel(appName, id, isReplication); } protected boolean internalCancel(String appName, String id, boolean isReplication) { try { read.lock(); CANCEL.increment(isReplication); Map<String, Lease<InstanceInfo>> gMap = registry.get(appName); Lease<InstanceInfo> leaseToCancel = null; if (gMap != null) { leaseToCancel = gMap.remove(id); } synchronized (recentCanceledQueue) { recentCanceledQueue.add(new Pair<Long, String>(System.currentTimeMillis(), appName + "(" + id + ")")); } InstanceStatus instanceStatus = overriddenInstanceStatusMap.remove(id); if (instanceStatus != null) { logger.debug("Removed instance id {} from the overridden map which has value {}", id, instanceStatus.name()); } if (leaseToCancel == null) { CANCEL_NOT_FOUND.increment(isReplication); logger.warn("DS: Registry: cancel failed because Lease is not registered for: {}/{}", appName, id); return false; } else { leaseToCancel.cancel(); InstanceInfo instanceInfo = leaseToCancel.getHolder(); String vip = null; String svip = null; if (instanceInfo != null) { instanceInfo.setActionType(ActionType.DELETED); recentlyChangedQueue.add(new RecentlyChangedItem(leaseToCancel)); instanceInfo.setLastUpdatedTimestamp(); vip = instanceInfo.getVIPAddress(); svip = instanceInfo.getSecureVipAddress(); } invalidateCache(appName, vip, svip); logger.info("Cancelled instance {}/{} (replication={})", appName, id, isReplication); return true; } } finally { read.unlock(); } } }
super.cancel
主要作:清理數據(registry)、添加事件、清理緩存(清理readWriteCacheMap,但不清理readOnlyCacheMap,readOnlyCacheMap只經過定時任務來刷新)。
再回到PeerAwareInstanceRegistryImpl.replicateToPeers
方法,跟註冊/續租同樣,是異步調其餘eureka-server
節點的接口,@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication
參數爲true
。
正常狀況下,應用實例下線時候會主動向eureka-Server
發起下線請求。但實際狀況下,應用實例可能異常崩潰,又或者是網絡異常等緣由,致使下線請求沒法被成功提交。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); } }
AbstractInstanceRegistry
裏會啓動定時任務EvictionTask
,按期下線/清理失效的服務信息,默認是60秒指定一次清理任務,清理超過90秒
未續約服務,但實際上計算時,還會算上補償時間(補償時間 = 當前時間 - 最後任務執行時間 - 任務執行頻率),至於爲何要算上補償事件,能夠看Lease.isExpired(long additionalLeaseMs)
方法的註釋。
public abstract class AbstractInstanceRegistry implements InstanceRegistry { class EvictionTask extends TimerTask { private final AtomicLong lastExecutionNanosRef = new AtomicLong(0l); @Override public void run() { try { long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } long getCompensationTimeMs() { long currNanos = getCurrentTimeNano(); long lastNanos = lastExecutionNanosRef.getAndSet(currNanos); if (lastNanos == 0l) { return 0l; } long elapsedMs = TimeUnit.NANOSECONDS.toMillis(currNanos - lastNanos); long compensationTime = elapsedMs - serverConfig.getEvictionIntervalTimerInMs(); return compensationTime <= 0l ? 0l : compensationTime; } long getCurrentTimeNano() { // for testing return System.nanoTime(); } } public void evict(long additionalLeaseMs) { logger.debug("Running the evict task"); if (!isLeaseExpirationEnabled()) { logger.debug("DS: lease expiration is currently disabled."); return; } // We collect first all expired items, to evict them in random order. For large eviction sets, // if we do not that, we might wipe out whole apps before self preservation kicks in. By randomizing it, // the impact should be evenly distributed across all applications. List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // To compensate for GC pauses or drifting local time, we need to use current registry size as a base for // triggering self-preservation. Without that we would wipe out full registry. int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { logger.info("Evicting {} items (expired={}, evictionLimit={})", toEvict, expiredLeases.size(), evictionLimit); Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { // Pick a random item (Knuth shuffle algorithm) int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); internalCancel(appName, id, false); } } } } public class Lease<T> { /** * Checks if the lease of a given {@link com.netflix.appinfo.InstanceInfo} has expired or not. * 關於補償時間的說明: * Note that due to renew() doing the 'wrong" thing and setting lastUpdateTimestamp to +duration more than * what it should be, the expiry will actually be 2 * duration. This is a minor bug and should only affect * instances that ungracefully shutdown. Due to possible wide ranging impact to existing usage, this will * not be fixed. * * @param additionalLeaseMs any additional lease time to add to the lease evaluation in ms. */ public boolean isExpired(long additionalLeaseMs) { return (evictionTimestamp > 0 || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs)); } }
isLeaseExpirationEnabled
是判斷是否開啓了自我保護機制
,當時是否符合自我保護機制
條件。
清理的時候:
分批清理例子:
// 假設 20 個租約,其中有 10 個租約過時。 // 第一輪執行開始 int registrySize = 20; int registrySizeThreshold = (int) (20 * 0.85) = 17; int evictionLimit = 20 - 17 = 3; int toEvict = Math.min(10, 3) = 3; // 第一輪執行結束,剩餘 17 個租約,其中有 7 個租約過時。 // 第二輪執行開始 int registrySize = 17; int registrySizeThreshold = (int) (17 * 0.85) = 14; int evictionLimit = 17 - 14 = 3; int toEvict = Math.min(7, 3) = 3; // 第二輪執行結束,剩餘 14 個租約,其中有 4 個租約過時。 // 第三輪執行開始 int registrySize = 14; int registrySizeThreshold = (int) (14 * 0.85) = 11; int evictionLimit = 14 - 11 = 3; int toEvict = Math.min(4, 3) = 3; // 第三輪執行結束,剩餘 11 個租約,其中有 1 個租約過時。 // 第四輪執行開始 int registrySize = 11; int registrySizeThreshold = (int) (11 * 0.85) = 9; int evictionLimit = 11 - 9 = 2; int toEvict = Math.min(1, 2) = 1; // 第四輪執行結束,剩餘 10 個租約,其中有 0 個租約過時。結束。
下線
的代碼internalCancel
跟前面的下線是同一個方法。
參考:
《 Eureka 源碼解析 —— 應用實例註冊發現(五)之過時》