本文主要研究一下spring cloud的DefaultEurekaServerContextjava
@Configuration @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter { //...... @Bean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); } //...... }
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/DefaultEurekaServerContext.javanode
@PostConstruct @Override public void initialize() throws Exception { logger.info("Initializing ..."); peerEurekaNodes.start(); registry.init(peerEurekaNodes); logger.info("Initialized"); } @PreDestroy @Override public void shutdown() throws Exception { logger.info("Shutting down ..."); registry.shutdown(); peerEurekaNodes.shutdown(); logger.info("Shut down"); }
實例化後的時候執行peerEurekaNodes.start();以及registry.init(peerEurekaNodes); 銷燬以前執行registry.shutdown();以及peerEurekaNodes.shutdown();git
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNodes.javagithub
public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } }
這裏首先執行updatePeerEurekaNodes,以後註冊定時任務去定時觸發updatePeerEurekaNodes,時間間隔爲erverConfig.getPeerEurekaNodesUpdateIntervalMs()spring
/** * Resolve peer URLs. * * @return peer URLs with node's own URL filtered out */ protected List<String> resolvePeerUrls() { InstanceInfo myInfo = applicationInfoManager.getInfo(); String zone = InstanceInfo.getZone(clientConfig.getAvailabilityZones(clientConfig.getRegion()), myInfo); List<String> replicaUrls = EndpointUtils .getDiscoveryServiceUrls(clientConfig, zone, new EndpointUtils.InstanceInfoBasedUrlRandomizer(myInfo)); int idx = 0; while (idx < replicaUrls.size()) { if (isThisMyUrl(replicaUrls.get(idx))) { replicaUrls.remove(idx); } else { idx++; } } return replicaUrls; }
首先經過resolvePeerUrls來獲取replicaUrls,這裏獲取的是健康的url,而後剔除本身的urlapp
/** * Given new set of replica URLs, destroy {@link PeerEurekaNode}s no longer available, and * create new ones. * * @param newPeerUrls peer node URLs; this collection should have local node's URL filtered out */ protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); toShutdown.removeAll(newPeerUrls); Set<String> toAdd = new HashSet<>(newPeerUrls); toAdd.removeAll(peerEurekaNodeUrls); if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // Add new peers if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); }
主要是跟原始或上次更新的peerEurekaNodeUrls對比,移除掉不健康的節點,移除的時候會調用PeerEurekaNode的shutdown方法,添加的時候經過createPeerEurekaNode建立dom
protected PeerEurekaNode createPeerEurekaNode(String peerEurekaNodeUrl) { HttpReplicationClient replicationClient = JerseyReplicationClient.createReplicationClient(serverConfig, serverCodecs, peerEurekaNodeUrl); String targetHost = hostFromUrl(peerEurekaNodeUrl); if (targetHost == null) { targetHost = "host"; } return new PeerEurekaNode(registry, targetHost, peerEurekaNodeUrl, replicationClient, serverConfig); }
添加的時候會建立新的PeerEurekaNodeide
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.javathis
public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; initializedResponseCache(); scheduleRenewalThresholdUpdateTask(); initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
這裏初始化ResponseCache、調度RenewalThresholdUpdateTask、還有初始化RemoteRegionRegistryurl
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/PeerAwareInstanceRegistryImpl.java
/** * Perform all cleanup and shutdown operations. */ @Override public void shutdown() { try { DefaultMonitorRegistry.getInstance().unregister(Monitors.newObjectMonitor(this)); } catch (Throwable t) { logger.error("Cannot shutdown monitor registry", t); } try { peerEurekaNodes.shutdown(); } catch (Throwable t) { logger.error("Cannot shutdown ReplicaAwareInstanceRegistry", t); } numberOfReplicationsLastMin.stop(); super.shutdown(); }
這裏主要是調用peerEurekaNodes.shutdown(),還有super的shutdown
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/registry/AbstractInstanceRegistry.java
/** * Perform all cleanup and shutdown operations. */ @Override public void shutdown() { deltaRetentionTimer.cancel(); evictionTimer.cancel(); renewsLastMin.stop(); }
主要是關閉一些計時器
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNodes.java
public void shutdown() { taskExecutor.shutdown(); List<PeerEurekaNode> toRemove = this.peerEurekaNodes; this.peerEurekaNodes = Collections.emptyList(); this.peerEurekaNodeUrls = Collections.emptySet(); for (PeerEurekaNode node : toRemove) { node.shutDown(); } }
除了系統關閉會調用外,該eureka node被認爲不健康的時候,被剔除時,也會調用shutdown方法
eureka-core-1.8.8-sources.jar!/com/netflix/eureka/cluster/PeerEurekaNode.java
/** * Shuts down all resources used for peer replication. */ public void shutDown() { batchingDispatcher.shutdown(); nonBatchingDispatcher.shutdown(); }
主要是關閉dispatcher
EurekaServerContext主要註冊了bean初始化及銷燬時執行的操做。初始化時,啓動peerNodes,而後初始化registry;銷燬時,關閉registry,而後關閉peerNodes。關於peerNodes,主要的是定時任務以erverConfig.getPeerEurekaNodesUpdateIntervalMs()時間間隔去定時觸發updatePeerEurekaNodes,而這個操做是跟原始或上次更新的peerEurekaNodeUrls對比,移除掉不健康的節點,添加新的節點若是有的話。移除的時候會調用PeerEurekaNode的shutdown方法,添加新的peerNode的時候經過createPeerEurekaNode建立。