做爲一個server端,他有如下功能:node
Eureka - 簡單示例已經看到,Server端是須要@EnableEurekaServer
註解的。EnableEurekaServer中有個@Import(EurekaServerMarkerConfiguration.class)spring
@Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(EurekaServerMarkerConfiguration.class) public @interface EnableEurekaServer { }
EurekaServerMarkerConfiguration作的主要事情就是加載Marker。bootstrap
@Configuration(proxyBeanMethods = false) public class EurekaServerMarkerConfiguration { @Bean public Marker eurekaServerMarkerBean() { return new Marker(); } class Marker { } }
綜上,@EnableEurekaServer作的事情就是加載Marker,Marker類什麼也沒有,那這個Marker是幹嗎用的呢?
springboot的自動裝配讀取Eureka Server的spring.factories文件,咱們看到EurekaServerAutoConfiguration類,這個類有個@ConditionalOnBean的註解,恰好就是上面的Marker類,因此引入了@EnableEurekaServer就是建立了Marker類,讓EurekaServerAutoConfiguration能夠加載。segmentfault
@Configuration(proxyBeanMethods = false) @Import(EurekaServerInitializerConfiguration.class) @ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class) @EnableConfigurationProperties({ EurekaDashboardProperties.class, InstanceRegistryProperties.class }) @PropertySource("classpath:/eureka/server.properties") public class EurekaServerAutoConfiguration implements WebMvcConfigurer { // 其餘略 }
EurekaServerConfig的加載,eureka.server前綴的配置都是這個類裏。這裏默認了向服務器讀取失敗的重試的次數是5。緩存
@Configuration(proxyBeanMethods = false) protected static class EurekaServerConfigBeanConfiguration { @Bean @ConditionalOnMissingBean public EurekaServerConfig eurekaServerConfig(EurekaClientConfig clientConfig) { EurekaServerConfigBean server = new EurekaServerConfigBean(); if (clientConfig.shouldRegisterWithEureka()) { // Set a sensible default if we are supposed to replicate server.setRegistrySyncRetries(5); } return server; } }
主要是用於集羣註冊表,這裏的構造函數會開啓一個線程,用來清理過時的增量信息。springboot
@Bean public PeerAwareInstanceRegistry peerAwareInstanceRegistry( ServerCodecs serverCodecs) { this.eurekaClient.getApplications(); // force initialization return new InstanceRegistry(this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.eurekaClient, this.instanceRegistryProperties.getExpectedNumberOfClientsSendingRenews(), this.instanceRegistryProperties.getDefaultOpenForTrafficCount()); } protected AbstractInstanceRegistry(EurekaServerConfig serverConfig, EurekaClientConfig clientConfig, ServerCodecs serverCodecs) { this.serverConfig = serverConfig; this.clientConfig = clientConfig; this.serverCodecs = serverCodecs; this.recentCanceledQueue = new CircularQueue<Pair<Long, String>>(1000); this.recentRegisteredQueue = new CircularQueue<Pair<Long, String>>(1000); this.renewsLastMin = new MeasuredRate(1000 * 60 * 1); // 清理過時的增量信息,默認30秒 this.deltaRetentionTimer.schedule(getDeltaRetentionTask(), serverConfig.getDeltaRetentionTimerIntervalInMs(), serverConfig.getDeltaRetentionTimerIntervalInMs()); } private TimerTask getDeltaRetentionTask() { return new TimerTask() { @Override public void run() { Iterator<RecentlyChangedItem> it = recentlyChangedQueue.iterator(); while (it.hasNext()) { // 若是小於客戶端保持增量信息緩存的時間,就刪除 if (it.next().getLastUpdateTime() < System.currentTimeMillis() - serverConfig.getRetentionTimeInMSInDeltaQueue()) { it.remove(); } else { break; } } } }; }
PeerEurekaNodes主要是用於保存其餘集羣節點的信息,用於同步集羣之間的數據服務器
@Bean @ConditionalOnMissingBean public PeerEurekaNodes peerEurekaNodes(PeerAwareInstanceRegistry registry, ServerCodecs serverCodecs, ReplicationClientAdditionalFilters replicationClientAdditionalFilters) { return new RefreshablePeerEurekaNodes(registry, this.eurekaServerConfig, this.eurekaClientConfig, serverCodecs, this.applicationInfoManager, replicationClientAdditionalFilters); }
EurekaServerContext主要是EurekaServer的上下文信息。app
@Bean @ConditionalOnMissingBean public EurekaServerContext eurekaServerContext(ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes) { return new DefaultEurekaServerContext(this.eurekaServerConfig, serverCodecs, registry, peerEurekaNodes, this.applicationInfoManager); }
EurekaServerContext加載的時候,因爲@PostConstruct註解,會調用initialize的方法。而後就會調用PeerEurekaNodes#start()和PeerAwareInstanceRegistry#init。dom
@PostConstruct @Override public void initialize() { logger.info("Initializing ..."); peerEurekaNodes.start(); try { registry.init(peerEurekaNodes); } catch (Exception e) { throw new RuntimeException(e); } logger.info("Initialized"); }
咱們先看看PeerEurekaNodes#start(),主要是開啓一個線程,讀取其餘節點的信息並更新。ide
public void start() { taskExecutor = Executors.newSingleThreadScheduledExecutor( new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r, "Eureka-PeerNodesUpdater"); thread.setDaemon(true); return thread; } } ); try { // 更新節點 // resolvePeerUrls主要是從節點從剔除自身節點,這裏不細說 updatePeerEurekaNodes(resolvePeerUrls()); Runnable peersUpdateTask = new Runnable() { @Override public void run() { try { updatePeerEurekaNodes(resolvePeerUrls()); } catch (Throwable e) { logger.error("Cannot update the replica Nodes", e); } } }; // 默認每10分鐘更新 taskExecutor.scheduleWithFixedDelay( peersUpdateTask, serverConfig.getPeerEurekaNodesUpdateIntervalMs(), serverConfig.getPeerEurekaNodesUpdateIntervalMs(), TimeUnit.MILLISECONDS ); } catch (Exception e) { throw new IllegalStateException(e); } for (PeerEurekaNode node : peerEurekaNodes) { logger.info("Replica node URL: {}", node.getServiceUrl()); } }
對節點的新增、刪除。createPeerEurekaNode的方法在EurekaServerAutoConfiguration中。
protected void updatePeerEurekaNodes(List<String> newPeerUrls) { if (newPeerUrls.isEmpty()) { logger.warn("The replica size seems to be empty. Check the route 53 DNS Registry"); return; } // 獲取舊的節點 Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls); // 舊的(1,2,3)-新的(2,3,4),剩下的(1)就是能夠刪除的節點 toShutdown.removeAll(newPeerUrls); Set<String> toAdd = new HashSet<>(newPeerUrls); // 新的(2,3,4)-舊的(1,2,3),剩下的(4)就是能夠新增的節點 toAdd.removeAll(peerEurekaNodeUrls); // 沒有新增沒有移除的,說明沒有改變 if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change return; } // Remove peers no long available List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes); // 移除能夠刪除的節點 if (!toShutdown.isEmpty()) { logger.info("Removing no longer available peer nodes {}", toShutdown); int i = 0; while (i < newNodeList.size()) { PeerEurekaNode eurekaNode = newNodeList.get(i); if (toShutdown.contains(eurekaNode.getServiceUrl())) { newNodeList.remove(i); eurekaNode.shutDown(); } else { i++; } } } // Add new peers // 新增新節點 if (!toAdd.isEmpty()) { logger.info("Adding new peer nodes {}", toAdd); for (String peerUrl : toAdd) { newNodeList.add(createPeerEurekaNode(peerUrl)); } } // 從新賦值 this.peerEurekaNodes = newNodeList; this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls); }
主要是更新只讀響應緩存和開啓自我保護
@Override public void init(PeerEurekaNodes peerEurekaNodes) throws Exception { this.numberOfReplicationsLastMin.start(); this.peerEurekaNodes = peerEurekaNodes; // 更新只讀響應緩存 initializedResponseCache(); //開啓自我保護 scheduleRenewalThresholdUpdateTask(); initRemoteRegionRegistry(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register the JMX monitor for the InstanceRegistry :", e); } }
initializedResponseCache,更新只讀響應緩存
@Override public synchronized void initializedResponseCache() { if (responseCache == null) { responseCache = new ResponseCacheImpl(serverConfig, serverCodecs, this); } } //ResponseCacheImpl構造函數略,會每一個30秒調用TimerTask的fun方法 private TimerTask getCacheUpdateTask() { return new TimerTask() { @Override public void run() { logger.debug("Updating the client cache from response cache"); // 迭代readOnlyCacheMap for (Key key : readOnlyCacheMap.keySet()) { if (logger.isDebugEnabled()) { logger.debug("Updating the client cache from response cache for key : {} {} {} {}", key.getEntityType(), key.getName(), key.getVersion(), key.getType()); } try { CurrentRequestVersion.set(key.getVersion()); Value cacheValue = readWriteCacheMap.get(key); Value currentCacheValue = readOnlyCacheMap.get(key); // 不一致時,進行替換 if (cacheValue != currentCacheValue) { readOnlyCacheMap.put(key, cacheValue); } } catch (Throwable th) { logger.error("Error while updating the client cache from response cache for key {}", key.toStringCompact(), th); } finally { CurrentRequestVersion.remove(); } } } }; }
scheduleRenewalThresholdUpdateTask,根據自我保護的頻率定時調用updateRenewalThreshold方法,默認每15分鐘。
private void scheduleRenewalThresholdUpdateTask() { timer.schedule(new TimerTask() { @Override public void run() { updateRenewalThreshold(); } }, serverConfig.getRenewalThresholdUpdateIntervalMs(), serverConfig.getRenewalThresholdUpdateIntervalMs()); } private void updateRenewalThreshold() { try { //獲取當前的應用實例數 Applications apps = eurekaClient.getApplications(); int count = 0; for (Application app : apps.getRegisteredApplications()) { for (InstanceInfo instance : app.getInstances()) { if (this.isRegisterable(instance)) { ++count; } } } synchronized (lock) { // rRenewalPercentThreshold默認0.85, // expectedNumberOfClientsSendingRenews指望收到客戶端續約的總數,每次有服務註冊進來就加1 // selfPreservationModeEnabled是否開啓自我保護模式,若是沒有開啓,則每次都會計算 if ((count) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfClientsSendingRenews) || (!this.isSelfPreservationModeEnabled())) { // 重置expectedNumberOfClientsSendingRenews爲當前實例數 this.expectedNumberOfClientsSendingRenews = count; // 更新指望最小每分鐘續租次數,當每分鐘心跳次數( renewsLastMin ) 小於 numberOfRenewsPerMinThreshold 時, // 而且開啓自動保護模式開關( eureka.enableSelfPreservation = true ) 時,觸發自動保護機制,再也不自動過時租約 updateRenewsPerMinThreshold(); } } logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold); } catch (Throwable e) { logger.error("Cannot update renewal threshold", e); } } protected void updateRenewsPerMinThreshold() { // expectedClientRenewalIntervalSeconds 多久續訂一次 this.numberOfRenewsPerMinThreshold = (int) (this.expectedNumberOfClientsSendingRenews * (60.0 / serverConfig.getExpectedClientRenewalIntervalSeconds()) * serverConfig.getRenewalPercentThreshold()); }
加載EurekaServerBootstrap
@Bean public EurekaServerBootstrap eurekaServerBootstrap(PeerAwareInstanceRegistry registry, EurekaServerContext serverContext) { return new EurekaServerBootstrap(this.applicationInfoManager, this.eurekaClientConfig, this.eurekaServerConfig, registry, serverContext); }
以上幾個類的關係圖以下,結束了?就只有一個定時器處理節點以及自我保護,還有其餘的功能呢?EurekaServerAutoConfiguration還import了一個類--EurekaServerInitializerConfiguration。
EurekaServerInitializerConfiguration繼承了SmartLifecycle接口,因此加載的時候會調用start方法,在這裏開始了Eureka Server的啓動過程。
@Override public void start() { new Thread(() -> { try { // Eureka Server 初始化及啓動 eurekaServerBootstrap.contextInitialized( EurekaServerInitializerConfiguration.this.servletContext); log.info("Started Eureka Server"); // 發佈監聽 publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig())); // 開啓狀態設置true EurekaServerInitializerConfiguration.this.running = true; // 發佈監聽 publish(new EurekaServerStartedEvent(getEurekaServerConfig())); } catch (Exception ex) { // Help! log.error("Could not initialize Eureka servlet context", ex); } }).start(); }
這裏主要是兩件事,初始化環境信息、初始化上下文。初始化上下文中包括初始註冊信以及開啓線程按期剔除沒有心跳的客戶端。
public void contextInitialized(ServletContext context) { try { // 初始化環境信息 initEurekaEnvironment(); // 初始化上下文 initEurekaServerContext(); context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable e) { log.error("Cannot bootstrap eureka server :", e); throw new RuntimeException("Cannot bootstrap eureka server :", e); } } protected void initEurekaServerContext() throws Exception { // 其餘代碼略 // 初始註冊信息 int registryCount = this.registry.syncUp(); // 開啓線程按期剔除沒有心跳的客戶端 this.registry.openForTraffic(this.applicationInfoManager, registryCount); // 其餘代碼略 }
@Override public int syncUp() { // 其餘代碼略,其餘代碼邏輯就是若是獲取失敗,就休眠serverConfig.getRegistrySyncRetryWaitMs()後繼續重試 // 最多重試serverConfig.getRegistrySyncRetryWaitMs(),這個值上面加載EurekaServerConfigBeanConfiguration提過 // 把其餘server的註冊信息複製到本地 register(instance, instance.getLeaseInfo().getDurationInSecs(), true); // 其餘代碼略 }
註冊信息複製到本地
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { // 加讀鎖 read.lock(); try { // 從本地獲取示例信息 Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName()); REGISTER.increment(isReplication); //爲空就建立一個ConcurrentHashMap放入registry if (gMap == null) { final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>(); gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } Lease<InstanceInfo> existingLease = gMap.get(registrant.getId()); // 已經存在的狀況,看哪一個時間新用哪一個 if (existingLease != null && (existingLease.getHolder() != null)) { Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { registrant = existingLease.getHolder(); } } else { // 不存在說明新增,更新expectedNumberOfClientsSendingRenews以及numberOfRenewsPerMinThreshold synchronized (lock) { if (this.expectedNumberOfClientsSendingRenews > 0) { // Since the client wants to register it, increase the number of clients sending renews this.expectedNumberOfClientsSendingRenews = this.expectedNumberOfClientsSendingRenews + 1; updateRenewsPerMinThreshold(); } } } // 建立lease,leaseDuration是過時時間,ServiceUpTimestamp服務啓動時間,沿用以前的,也就是說一直是第一次的 Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } // 新增或者更新gMap gMap.put(registrant.getId(), lease); recentRegisteredQueue.add(new Pair<Long, String>( System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); // This is where the initial state transfer of overridden status happens if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the " + "overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); } // Set the status based on the overridden status rules InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); // If the lease is registered with UP status, set lease service up timestamp // 更新狀態 if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); } // 操做類型爲新增 registrant.setActionType(ActionType.ADDED); recentlyChangedQueue.add(new RecentlyChangedItem(lease)); // 最後更新時間戳 registrant.setLastUpdatedTimestamp(); //清空緩存 invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication); } finally { // 解鎖 read.unlock(); } }
服務的按期剔除就是這個這裏開啓的線程。
@Override public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) { //其餘略 super.postInit(); }
AbstractInstanceRegistry#postInit,開啓線程,時間默認60s
protected void postInit() { renewsLastMin.start(); if (evictionTaskRef.get() != null) { //取消以前的任務 evictionTaskRef.get().cancel(); } evictionTaskRef.set(new EvictionTask()); //開啓線程,時間默認60s evictionTimer.schedule(evictionTaskRef.get(), serverConfig.getEvictionIntervalTimerInMs(), serverConfig.getEvictionIntervalTimerInMs()); }
EvictionTask#run,移除節點,這個要根據保護機制,默認不能剔除15%的實例。
@Override public void run() { try { // 計算補償時間,定時器執行的時候略有延遲 long compensationTimeMs = getCompensationTimeMs(); logger.info("Running the evict task with compensationTime {}ms", compensationTimeMs); evict(compensationTimeMs); } catch (Throwable e) { logger.error("Could not run the evict task", e); } } public void evict(long additionalLeaseMs) { // 關閉了就不剔除 if (!isLeaseExpirationEnabled()) { return; } // 獲取過時的實例 List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>(); for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) { Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue(); if (leaseMap != null) { for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) { Lease<InstanceInfo> lease = leaseEntry.getValue(); if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) { expiredLeases.add(lease); } } } } // 計算最大可剔除的格式,默認保留85%,也就是最多剔除15% int registrySize = (int) getLocalRegistrySize(); int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold()); int evictionLimit = registrySize - registrySizeThreshold; int toEvict = Math.min(expiredLeases.size(), evictionLimit); if (toEvict > 0) { // 隨機剔除 Random random = new Random(System.currentTimeMillis()); for (int i = 0; i < toEvict; i++) { int next = i + random.nextInt(expiredLeases.size() - i); Collections.swap(expiredLeases, i, next); Lease<InstanceInfo> lease = expiredLeases.get(i); String appName = lease.getHolder().getAppName(); String id = lease.getHolder().getId(); EXPIRED.increment(); logger.warn("DS: Registry: expired lease for {}/{}", appName, id); // 從registry.get(appName)中移除 internalCancel(appName, id, false); } } }
Server服務啓動除了加載以上的幾個bean,還包括了每30清除增量數據recentlyChangedQueue、每10分鐘更新集羣節點、每30秒更新只讀響應緩存、每15分鐘更新更新指望最小每分鐘續租次數、每60s剔除過時服務。