在以前的EurekaClient自動裝配及啓動流程解析一文中咱們提到過,在構造DiscoveryClient
類時,會把自身註冊到服務端,本文就來分析一下這個註冊流程node
boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
這個方法中包含的registrationClient
和instanceInfo
兩個對象在以前的文章中都已經單獨拿出來了:Eureka中重要的對象json
服務端接受註冊的Controller在ApplicationResource
類中,這裏須要注意的是普通客戶端註冊時其中參數isReplication
爲false,這個參數就是控制集羣是否同步的表示緩存
@POST @Consumes({"application/json", "application/xml"}) public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) { logger.debug("Registering instance {} (replication={})", info.getId(), isReplication); // 一系列的參數校驗 if (isBlank(info.getId())) { return Response.status(400).entity("Missing instanceId").build(); } else if (isBlank(info.getHostName())) { return Response.status(400).entity("Missing hostname").build(); } else if (isBlank(info.getIPAddr())) { return Response.status(400).entity("Missing ip address").build(); } else if (isBlank(info.getAppName())) { return Response.status(400).entity("Missing appName").build(); } else if (!appName.equals(info.getAppName())) { return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build(); } else if (info.getDataCenterInfo() == null) { return Response.status(400).entity("Missing dataCenterInfo").build(); } else if (info.getDataCenterInfo().getName() == null) { return Response.status(400).entity("Missing dataCenterInfo Name").build(); } // AWS的一些東西,不用細看 DataCenterInfo dataCenterInfo = info.getDataCenterInfo(); if (dataCenterInfo instanceof UniqueIdentifier) { String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId(); if (isBlank(dataCenterInfoId)) { boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId")); if (experimental) { String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id"; return Response.status(400).entity(entity).build(); } else if (dataCenterInfo instanceof AmazonInfo) { AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo; String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId); if (effectiveId == null) { amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId()); } } else { logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass()); } } } //註冊 registry.register(info, "true".equals(isReplication)); return Response.status(204).build(); // 204 to be backwards compatible }
接着往下看註冊的處理邏輯app
public void register(final InstanceInfo info, final boolean isReplication) { // 租約過時時間 int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } // 註冊 super.register(info, leaseDuration, isReplication); //集羣複製 replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
Lease
租約這個類以前也分析過了,再也不展開了ide
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { //獲取鎖 read.lock(); //獲取該實例的註冊信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); //若是不存在則添加 gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // 當存在這個應用的註冊信息時 if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); //使用註冊時間長的一方的應用信息 if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" + " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = existingLease.getHolder(); } } else { synchronized (lock) { if (this.expectedNumberOfRenewsPerMin > 0) { this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2; this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold()); } } logger.debug("No previous lease information found; it is new registration"); } //建立租約 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); //若是存在租約則更新開始時間 if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } gMap.put(registrant.getId(), lease); synchronized (recentRegisteredQueue) { //添加到最近註冊隊列 recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); } if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // 得到應用實例最終狀態 InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { read.unlock(); } }
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
當isReplication
屬性爲true的時候,就會牽扯到集羣信息同步了ui
private void replicateToPeers(Action action, String appName, String id, InstanceInfo info , InstanceStatus newStatus , boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
關於PeerEurekaNode
在以前的文章也提到了,保存了集羣節點信息,這裏能夠看到是循環全部的集羣節點,而後排除自己的信息以後調用了replicateInstanceActionsToPeers
方法this
private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
這裏重點關注Register分支spa
public void register(final InstanceInfo info) throws Exception { long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info); batchingDispatcher.process( taskId("register", info), new InstanceReplicationTask(targetHost, Action.Register, info, null, true) { public EurekaHttpResponse<Void> execute() { return replicationClient.register(info); } }, expiryTime ); }
在不關心Eureka自有的調度處理相關的前提下,核心代碼是replicationClient.register(info)
,也就是說,若是當客戶端註冊時指定須要同步集羣信息時,Eureka會把這個客戶端再註冊到它所在的集羣其它的節點上pwa
在以前得EurekaServer自動裝配及啓動流程解析一文中,咱們提到過在初始化服務端的時候會從EurekaServer集羣中同步數據,也就是下面這段代碼:debug
public class EurekaServerBootstrap { protected void initEurekaServerContext() throws Exception { //xxxx EurekaServerContextHolder.initialize(this.serverContext); log.info("Initialized server context"); // 從其餘 Eureka-Server 拉取註冊信息 int registryCount = this.registry.syncUp(); this.registry.openForTraffic(this.applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }
數據同步的代碼在syncUp
中
public int syncUp() { int count = 0; for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) { if (i > 0) { //未讀取到註冊信息則開始等待 try { Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs()); } catch (InterruptedException e) { logger.warn("Interrupted during registry transfer.."); break; } } // 獲取註冊信息 Applications apps = eurekaClient.getApplications(); for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { try { //判斷是否能夠註冊,這裏基於AWS環境的判斷,若是不是AWS環境直接返回true,故不須要關心這個問題 if (isRegisterable(instance)) { //註冊 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); count++; } } catch (Throwable t) { logger.error("During DS init copy", t); } } } } return count; }
這裏的處理流程就是當前EurekaServer獲取到客戶端的註冊信息以後,就會再次調用上邊我們提到的服務端Controller調用的register
方法,固然此次調用時isReplication
參數就變爲true了