SpringCloud--服務治理Eureka

1.前序
     服務治理是微服務架構中最爲核心和基礎的模塊,主要用來實現各個微服務實例的自動化註冊與發現。它旨在解決業務增加,系統功能愈來愈複雜,微服務應用不斷增多的狀況下,解決咱們手動維護的問題。
Eureka服務治理體系主要有三個核心角色:服務註冊中心,服務提供者以及服務消費者。
2.服務治理Eureka的基礎架構java

  • 服務註冊中心:Eureka提供的服務端,提供服務註冊與發現的功能
  • 服務提供者:提供服務的應用,但是是SpringBoot應用,也能夠是其餘技術平臺且遵循Eureka通訊機制的應用。它將本身提供的服務註冊到Eureka,以供其餘應用發現和使用。
  • 服務消費者:消費者應用從服務註冊中心獲取服務列表,從而使消費者能夠知道去何處調用其所需的服務。

       

 

3.服務治理的機制緩存


 服務註冊中心網絡


 (1)失效剔除:
   失效剔除的主要做用是剔除集羣中不能提供服務的實例。服務實例可能因爲內存溢出,網絡故障燈緣由使得服務不能正常工做,服務中心也未收到」服務下線「的相關請求,這時候針對這些服務,Eureka Server在啓動的時候會建立一個定時任務,默認每隔一段時間將當前清單中超時沒有續約的服務剔除。
參數:eureka.instance.leaseExpirationDurationInSeconds
註釋:Eureka 服務端在收到最後一次心跳以後等待時間上限,單位爲秒,超過該時間以後服務端會將該服務實例從服務清單中剔除,從而進制服務調用請求被髮送到該實例上。架構

 (2)自我保護
   Eureka Server 在運行期間會去統計心跳失敗比例在 15 分鐘以內是否低於 85%(renewalPercentThreshold:默認值是0.85),若是低於 85%,Eureka Server 會將這些實例保護起來,讓這些實例不會過時,可是在保護期內若是服務恰好這個服務提供者非正常下線了,此時服務消費者就會拿到一個無效的服務實例,此時會調用失敗,對於這個問題須要服務消費者端要有一些容錯機制,如重試,斷路器等。app

   關閉自我保護進制的參數:eureka.server.enable-self-preservation=false
   Renews threshold:Eureka Server 指望每分鐘收到客戶端實例續約的總數。
   Renews (last min):Eureka Server 最後 1 分鐘收到客戶端實例續約的總數。
   自我保護模式被激活的條件是:在 1 分鐘後,Renews (last min) < Renews threshold。
   eureka.server.renewal-percent-threshold:能夠設置renewalPercentThreshold的值,默認值爲0.85
   protected volatile int numberOfRenewsPerMinThreshold; 指望最小每分鐘續租次數。
   protected volatile int expectedNumberOfClientsSendingRenews;指望最大每分鐘續租次數。
 觸發條件:
   當每次心跳次數(renewLastMin)小於numberOfRenewsPerMinThreshold時,而且開啓自動保護模式開關是打開,就會觸發自動保護機制,再也不自動過時續約。ide

 1 #   PeerAwareInstanceRegistryImpl.java
 2 
 3 @Override
 4     public boolean isLeaseExpirationEnabled() {
 5         if (!isSelfPreservationModeEnabled()) {
 6             // The self preservation mode is disabled, hence allowing the instances to expire.
 7             return true;
 8         }
 9         return numberOfRenewsPerMinThreshold > 0 && getNumOfRenewsInLastMin() > numberOfRenewsPerMinThreshold;
10     }

 

服務提供者微服務

(1)服務註冊
「服務提供者」在啓動的時候會經過發送REST請求的方式將本身註冊到Eureka Server上,同時會帶上自身服務的一些元數據信息。Eureka Server在接收到REST請求後,將元數據存儲在一個雙層的CurrentHashMap中,第一層的key是服務名,第二層的key是具體服務的實例名。
參數:eureka.cli ent.register-with-eureka = true;纔會啓動註冊操做。性能

服務註冊流程源碼以下:fetch

 /**
     * 初始化全部的定時任務
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
/**
*服務同步的定時任務,調用DiscoveryClient的
refreshRegistry方法
*/
 scheduler.schedule( new TimedSupervisorTask( "cacheRefresh", scheduler, cacheRefreshExecutor, registryFetchIntervalSeconds, TimeUnit.SECONDS, expBackOffBound, new CacheRefreshThread() ), registryFetchIntervalSeconds, TimeUnit.SECONDS); } if (clientConfig.shouldRegisterWithEureka()) { int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs); // 服務續約
 scheduler.schedule( new TimedSupervisorTask( "heartbeat", scheduler, heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new HeartbeatThread() ), renewalIntervalInSecs, TimeUnit.SECONDS); // InstanceInfo replicator //InstanceInfoReplicator的run方法中會調用discoveryClient.register();進行服務註冊
            instanceInfoReplicator = new InstanceInfoReplicator( this, instanceInfo, clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); // burstSize
 statusChangeListener = new ApplicationInfoManager.StatusChangeListener() { @Override public String getId() { return "statusChangeListener"; } @Override public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN == statusChangeEvent.getStatus() || InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) { // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent); } else { logger.info("Saw local status change event {}", statusChangeEvent); } instanceInfoReplicator.onDemandUpdate(); } }; if (clientConfig.shouldOnDemandUpdateStatusChange()) { applicationInfoManager.registerStatusChangeListener(statusChangeListener); } instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }
InstanceInfoReplicator的服務註冊相關的代碼以下:
public void run() {
        try {
            discoveryClient.refreshInstanceInfo();

            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
            if (dirtyTimestamp != null) {
               //此方法進行服務註冊調用
                discoveryClient.register();
                instanceInfo.unsetIsDirty(dirtyTimestamp);
            }
        } catch (Throwable t) {
            logger.warn("There was a problem with the instance info replicator", t);
        } finally {
            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
            scheduledPeriodicRef.set(next);
        }
    }

DiscoveryClient中真正實現服務註冊的源碼以下:
 /**
     * 經過REST請求的方式進行服務註冊
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == Status.NO_CONTENT.getStatusCode();
    }

 

(2)服務同步
     兩個服務提供者分別註冊到了兩個不一樣的服務註冊中心上,可是它們的信息分別被兩個服務註冊中心所維護,當服務中心採用的是集羣方案的時候,當服務提供者發送註冊請求到一個服務註冊中心的時候,它會將該請求轉發給集羣中相鏈接的其餘的註冊中心,從而實現註冊中心之間的服務同步。
經過服務同步,兩個服務提供者的服務信息能夠在集羣中任意一個註冊中心上獲取到。this

 服務同步入口的源碼:
/**
     * The task that fetches the registry information at specified intervals.
     *
     */
    class CacheRefreshThread implements Runnable {
        public void run() {
            refreshRegistry();
        }
    }

    @VisibleForTesting
    void refreshRegistry() {
        try {
            boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

            boolean remoteRegionsModified = false;
            // This makes sure that a dynamic change to remote regions to fetch is honored.
            String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
            if (null != latestRemoteRegions) {
                String currentRemoteRegions = remoteRegionsToFetch.get();
                if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                    // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                    synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                        if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                            String[] remoteRegions = latestRemoteRegions.split(",");
                            remoteRegionsRef.set(remoteRegions);
                            instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                            remoteRegionsModified = true;
                        } else {
                            logger.info("Remote regions to fetch modified concurrently," +
                                    " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                        }
                    }
                } else {
                    // Just refresh mapping to reflect any DNS/Property change
                    instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                }
            }

            boolean success = fetchRegistry(remoteRegionsModified);
            if (success) {
                registrySize = localRegionApps.get().size();
                lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
            }

         ............
    }

 

 

(3)服務續約
當服務註冊完以後,會啓動一個定時任務維護一個心跳來持續告訴Eureka Server,我還處於存活狀態,以防止被Eureka Server 剔除。
兩個重要參數:
eureka.instance.lease-renewal-interval-in-seconds=30 :用於定義服務續約任務的調用間隔時間 默認是30秒
eureka.instance.lease-expiration-duration-in-seconds=90 :用於定義服務失效的時間,默認爲90秒。

服務續約源碼入口:

 /**
     * The heartbeat task that renews the lease in the given intervals.
     */
    private class HeartbeatThread implements Runnable {

        public void run() {
            if (renew()) {
                lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis();
            }
        }
    }


/**
     * Renew with the eureka service by making the appropriate REST call
     */
    boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == Status.NOT_FOUND.getStatusCode()) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == Status.OK.getStatusCode();
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

 

服務消費者

 

(1)獲取服務
在咱們啓動服務消費者的實例以後,會發送一個REST請求到服務註冊中心,來獲取服務註冊中心上面已經註冊的服務清單,出於性能上的考慮,Eureka Server會維護一份只讀的服務清單,同時緩存該清單的時間間隔默認爲30s刷新一次。
注意:第一點,服務消費者必須配置保eureka.c巨ent.fetch-registry=true,這是服務消費者獲取服務列表的基礎。第二點:服務列表緩存刷新時間能夠經過該參數修改eureka.c巨ent.registry-fetch-interval-seconds=30,默認是30秒

(2)服務調用
服務消費者在獲取到服務清單以後,經過服務名能夠獲取到具體提供的服務的實例名和該實例的元數據信息(InstanceInfo),客戶端會根據本身的狀況決定選擇調用哪一個實例。
在這塊有兩個概念須要理解下,Region和Zone
//TODO

(3)服務下線
服務實例在進行正常的關閉操做的時候,它會觸發一個服務下線的REST請求給Eureka Servcer ,告訴服務註冊中心,服務端在接收到請求以後,就會將該服務的狀態設置爲DOWN,而且把這個下線事件傳播出去。

相關文章
相關標籤/搜索