那eureka client如何將本地服務的註冊信息發送到遠端的註冊服務器eureka server上。經過下面的源碼分析,看出Eureka Client的定時任務調用Eureka Server的Reset接口,而Eureka接收到調用請求後會處理服務的註冊以及Eureka Server中的數據同步的問題。java
源碼分析,看出服務註冊能夠認爲是Eureka client本身完成,不須要服務自己來關心。spring
在com.netflix.discovery.DiscoveryClient啓動的時候,會初始化一個定時任務,定時的把本地的服務配置信息,即須要註冊到遠端的服務信息自動刷新到註冊服務器上。
首先看一下Eureka的代碼,在spring-cloud-netflix-eureka-server工程中能夠找到這個依賴eureka-client-1.4.11.jar查看代碼能夠看到,
com.netflix.discovery.DiscoveryClient.java中的1240行能夠看到Initializes all scheduled tasks,在1277行,能夠看到InstanceInfoReplicator定時任務json
在DiscoveryClient中初始化一個InstanceInfoReplicator,其實裏面封裝了以定時任務。服務器
/** * Initializes all scheduled tasks. */ private void initScheduledTasks() { if (clientConfig.shouldFetchRegistry()) { // registry cache refresh timer int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds(); int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs); // Heartbeat timer scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator /**************************封裝了定時任務**********************************/ instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } //點擊能夠查看start方法 instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
com.netflix.discovery.DiscoveryClient中的 register()方法,大概在811行。app
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + appPathIdentifier + ": registering service..."); EurekaHttpResponse<Void> httpResponse; try { //Eureka Client客戶端,調用Eureka服務端的入口 httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
Eureka server服務端請求入口
ApplicationResource.java文件中第183行,以下所示,能夠看出Eureka是經過http post的方式去服務註冊ide
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); if(this.isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if(this.isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if(this.isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if(!this.appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build(); } else if(info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if(info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } else { DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if(dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId(); if(this.isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if(experimental) { String amazonInfo1 = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(amazonInfo1).build(); } if(dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo; String effectiveId = amazonInfo.get(MetaDataKey.instanceId); if(effectiveId == null) { amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } }
//進入到PeerAwareInstanceRegistryImpl中的register方法
this.registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
}
}
InstanceRegistry.java文件中的52行,能夠看到調用PeerAwareInstanceRegistryImpl中的278行register方法源碼分析
public void register(InstanceInfo info, boolean isReplication) { this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication); super.register(info, isReplication); }
PeerAwareInstanceRegistryImpl中的278行register方法post
public void register(InstanceInfo info, boolean isReplication) { int leaseDuration = 90; if(info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } ////調用父類方法註冊 super.register(info, leaseDuration, isReplication);
// 同步Eureka中的服務信息 this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication); }
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { this.read.lock(); Object gMap = (Map)this.registry.get(registrant.getAppName()); EurekaMonitors.REGISTER.increment(isReplication); if(gMap == null) { ConcurrentHashMap existingLease = new ConcurrentHashMap(); gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), existingLease); if(gMap == null) { gMap = existingLease; } } Lease existingLease1 = (Lease)((Map)gMap).get(registrant.getId()); if(existingLease1 != null && existingLease1.getHolder() != null) { Long lease1 = ((InstanceInfo)existingLease1.getHolder()).getLastDirtyTimestamp(); Long overriddenStatusFromMap = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", lease1, overriddenStatusFromMap); if(lease1.longValue() > overriddenStatusFromMap.longValue()) { logger.warn("There is an existing lease and the existing lease\'s dirty timestamp {} is greater than the one that is being registered {}", lease1, overriddenStatusFromMap); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = (InstanceInfo)existingLease1.getHolder(); } } else { Object lease = this.lock; synchronized(this.lock) { if(this.expectedNumberOfRenewsPerMin > 0) { this.expectedNumberOfRenewsPerMin += 2; this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } Lease lease2 = new Lease(registrant, leaseDuration); if(existingLease1 != null) { lease2.setServiceUpTimestamp(existingLease1.getServiceUpTimestamp()); } ((Map)gMap).put(registrant.getId(), lease2); AbstractInstanceRegistry.CircularQueue overriddenStatusFromMap1 = this.recentRegisteredQueue; synchronized(this.recentRegisteredQueue) { this.recentRegisteredQueue.add(new Pair(Long.valueOf(System.currentTimeMillis()), registrant.getAppName() + "(" + registrant.getId() + ")")); } if(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides", registrant.getOverriddenStatus(), registrant.getId()); if(!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap2 = (InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId()); if(overriddenStatusFromMap2 != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap2); registrant.setOverriddenStatus(overriddenStatusFromMap2); } InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease1, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); if(InstanceStatus.UP.equals(registrant.getStatus())) { lease2.serviceUp(); } registrant.setActionType(ActionType.ADDED); this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease2)); registrant.setLastUpdatedTimestamp(); this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), Boolean.valueOf(isReplication)}); } finally { this.read.unlock(); } }
說明:註冊信息其實就是存儲在一個 ConcurrentHashMap<string, map<string,="" lease<instanceinfo="">>> registry的結構中。ui
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();
ApplicationResource類接收Http服務請求,調用PeerAwareInstanceRegistryImpl的register方法,PeerAwareInstanceRegistryImpl完成服務註冊後,this
調用replicateToPeers向其它Eureka Server節點(Peer)作狀態同步。以下圖所示。
EurekaHttpClient:提供eureka 的客戶端API方法 ,實現類JerseyReplicationClient ,eureka客戶端發送http 請求類
ApplicationResource :eureka 服務端接收 eureka 客戶端發送請求的處理類
InstanceRegistry eureka 服務端註冊表實例類
LookupService::查詢活動服務的實例接口
DefaultEurekaClientConfig:eureka 客戶端默認配置類
EurekaClientConfigBean :eureka 客戶端配置bean ,在配置文件中提示屬性的類,配置前綴:eureka.client