爲了方便說明,就把上篇博客的圖再貼一遍了。java
上篇說道Application Service向Eureka Server註冊服務的過程,在完成註冊以後,因爲Eureka Server是對等集羣,其餘Server也須要同步這一註冊信息。與zookeeper相比,Eureka並不追求很強的一致性,而是認爲A(可用性)和P(分區容錯性)更重要。node
基於此,Eureka採用複製的方式進行註冊信息的同步。app
一、在完成註冊方法以後,進行復制。ide
PeerAwareInstanceRegistryImpl類
this
public void register(final InstanceInfo info, final boolean isReplication) { int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS; if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) { leaseDuration = info.getLeaseInfo().getDurationInSecs(); } super.register(info, leaseDuration, isReplication); replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication); }
二、具體的複製步驟url
複製方法,複製eureka的全部action操做除了流量。 private void replicateToPeers(Action action, String appName, String id, InstanceInfo info /* optional */, InstanceStatus newStatus /* optional */, boolean isReplication) { Stopwatch tracer = action.getTimer().start(); try { if (isReplication) { numberOfReplicationsLastMin.increment(); } // 若是已經複製過,就再也不復制 if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) { return; } //遍歷eureka集羣中的全部節點,進行復制操做 for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) { // If the url represents this host, do not replicate to yourself. if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) { continue; } replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node); } } finally { tracer.stop(); } }
Eureka Server的複製不會進行第二次,能夠參考上面的代碼。spa
判斷isReplication的值,若是複製過,則再也不復制。若是沒有複製過,遍歷集羣中的node節點個數(即Eureka Server集羣),依次進行復制操做。pwa
具體的複製action,包括取消、註冊、心跳、狀態更新等。code
private void replicateInstanceActionsToPeers(Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, PeerEurekaNode node) { try { InstanceInfo infoFromRegistry = null; CurrentRequestVersion.set(Version.V2); switch (action) { case Cancel: node.cancel(appName, id); break; case Heartbeat: InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id); infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false); break; case Register: node.register(info); break; case StatusUpdate: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.statusUpdate(appName, id, newStatus, infoFromRegistry); break; case DeleteStatusOverride: infoFromRegistry = getInstanceByAppAndId(appName, id, false); node.deleteStatusOverride(appName, id, infoFromRegistry); break; } } catch (Throwable t) { logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t); } }
首先,在PeerAwareInstanceRegistryImpl類中的init方法中找到了peerEurekaNodes的賦值。orm
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; initializedResponseCache(); scheduleRenewalThresholdUpdateTask(); initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { super.init(peerEurekaNodes); this.awsAsgUtil = new AwsAsgUtil(serverConfig, clientConfig, this); // We first check if the instance is STARTING or DOWN, then we check explicit overrides, // then we see if our ASG is UP, then we check the status of a potentially existing lease. this.instanceStatusOverrideRule = new FirstMatchWinsCompositeRule(new DownOrStartingRule(), new OverrideExistsRule(overriddenInstanceStatusMap), new AsgEnabledRule(this.awsAsgUtil), new LeaseExistsRule()); }
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); peerEurekaNodes.start(); registry.init(peerEurekaNodes); logger.info("Initialized"); }
protected void initEurekaServerContext() throws Exception { //省略配置信息 logger.info("Initializing the eureka client..."); //建立node組 PeerEurekaNodes peerEurekaNodes = new PeerEurekaNodes( registry, eurekaServerConfig, eurekaClientConfig, serverCodecs, applicationInfoManager ); serverContext = new DefaultEurekaServerContext( eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, applicationInfoManager ); EurekaServerContextHolder.initialize(serverContext); serverContext.initialize(); logger.info("Initialized server context"); // Copy registry from neighboring eureka node int registryCount = registry.syncUp(); registry.openForTraffic(applicationInfoManager, registryCount); // Register all monitoring statistics. EurekaMonitors.registerAllStats(); }