Spring-Cloud之Eureka註冊與發現-2

  1、Eureka是Netflix開發的服務發現框架,自己是一個基於REST的服務,主要用於定位運行在AWS域中的中間層服務,以達到負載均衡和中間層服務故障轉移的目的。SpringCloud將它集成在其子項目spring-cloud-netflix中,以實現SpringCloud的服務發現功能。java

  2、Spring爲何選擇Eureka。在Spring Cloud中可選的註冊中心其實包含:Consul、Zookeeper和Eureka,爲何選擇Eureka。node

  1)徹底開源:通過Netflix公司的生存環境的考驗,以及這麼年時間的不斷迭代,在功能和性能上都很是穩定,能夠放心使用。web

  2)無縫對接:Eureka是Spring Cloud的首選推薦的服務註冊與發現組件,可以達到無縫對接。spring

  3)相互配合:Eureka 和其餘組件,好比負載均衡組件 Ribbon、熔斷器組件Hystrix、熔斷器監控組件Hystrix Dashboard 組件、熔斷器聚合監控Turbine 組件,以及網關 Zuul 組件相 配合,可以很容易實現服務註冊、負載均衡、熔斷和智能路由等功能。json

  3、Eureka基本架構:瀏覽器

  1)Register Service :服務註冊中心,它是一個 Eureka Server ,提供服務註冊和發現的功能。緩存

  2)Provider Service :服務提供者,它是 Eureka Client ,提供服務服務器

  3)Consumer Service :服務消費者,它是 Eureka Cient ,消費服務架構

  

  4、編寫Eureka Server併發

  1)加入spring cloud基礎依賴,官方配置以下

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Finchley.SR4</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

  官方提供的版本選擇:

  

   我這裏用的是2.0.X的版本,因此直接使用的是Finchley的版本,具體版本號查看官方

  2)加入server依賴(有些地方配置成spring-cloud-starter-eureka-server)可是官方建議配置成

  

     <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>

  3)編寫啓動項加入@EnableEurekaServer註解

package com.cetc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer public class EurekaServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

  4)編寫配置文件application.yaml

server:
  port: 8670
eureka:
  instance:
    appname: server
  client:
    register-with-eureka: false # 關閉自己註冊
    fetch-registry: false # 是否從server獲取註冊信息
    service-url:
      defaultZone:
        http://127.0.0.1:8670/eureka/ # 實際開發中建議使用域名的方式
spring:
  application:
    name: server

  5)啓動項目瀏覽器查看http://127.0.0.1:8670/

  

  5、編寫Eureka client

  1)加入依賴,cloud基礎配置和server同樣

     <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

  2)編寫啓動類加入@EnableEurekaClient註解

package com.cetc;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient public class EurekaClientApplication {

    public static void main(String[] args) {
        SpringApplication.run(EurekaClientApplication.class, args);
    }
}

  3)配置application.yaml

server:
  port: 8673
eureka:
  instance:
    appname: client
  client:
    service-url:
      defaultZone:
        http://127.0.0.1:8670/eureka/ # 實際開發中建議使用域名的方式
spring:
  application:
    name: client

  4)啓用項目瀏覽器輸入http://127.0.0.1:8670/

  

   6、源碼解析

  1)Eureka的一些概念

  Registe 一一服務註冊:Eureka Client向Eureka Server 註冊時,Eureka Client 提供自身的元數據,好比 IP 地址、端口、運行情況H1標的 Uri 主頁地址等信息。

  Renew一一服務續約:Eureka client 在默認的狀況下會每隔 30 秒發送一次心跳來進行服務續約。經過服務續約來告知 Eureka Server。Eureka Client 仍然可用,沒有出現故障。正常狀況下,若是 Eureka Server90 秒內沒有收到 Eureka Client 的心跳, Eureka Server 會將 Eureka Client 實例從註冊列表中刪除。注意:’官網建議不要更改服務續約的間隔時間。

  Fetch Registries一一獲取服務註冊列表信息:Eureka Client從Eureka Server 獲取服務註冊表信息,井將其緩存在本地。 Eureka Client使用服務註冊列表信息查找其餘服務的信息,從而進行遠程調用。該註冊列表信息定時(每30 秒) 更新1次,每次返回註冊列表信息可能與 Eureka Client 的緩存信息不一樣, Eureka Client會本身處理這些信息。如過因爲某種緣由致使註冊列表信息不能及時匹配, Eureka Client 會從新獲取整個註冊表信息。Eureka Server 緩存了全部的服務註冊列表信息,並將整個註冊列表以及每一個應用程序信息進行了壓縮,壓縮內容和沒有壓縮的內容徹底相同。 Eureka Client和Eureka Server 可使用 JSON/XML 數據格式進行通訊。在默認的狀況下, Eureka Client使用JSON 格式的方式來獲取服務註冊列表的信息。

  Cancel——服務下線:Eureka Client 在程序關閉時能夠向 Eureka Server 發送下線請求。發送請求後,該客戶端的實例信息將從 Eureka Server 的服務註冊列表中刪除。

DiscoveryManager.getinstance().shutdownComponent();

  Eviction一一服務剔除:在默認狀況下,當 Eureka Client連續90秒沒有向 Eureka Server 發送服務續約(即心跳〉時, Eureka Server 會將該服務實例從服務註冊列表刪除,即服務剔除。

  2)Eureka的高可用架構

  

 

 

   在這個架構圖中有兩個角色 ,即 Eureka Server和Eureka Client。而EurekaClient 又分爲 Applicaton Service和Application Client 即服務提供者和服務消費者。每一個區域有一個Eureka 集羣, 而且每一個區域至少有一個Eureka Server 以處理區域故障 以防服務器癱瘓。

  Eureka Client向Eureka Server註冊時, 將本身客戶端信息提交給 Eureka Server 而後,Eureka Client 經過向 Eureka Server 發送心跳 (每 30 次)來續約服務。 若是某個客戶端不能持續續約,那 Eureka Server 定該客戶端不可用 該不可用的客戶端將在大約 90 秒後從Eureka Server 服務註冊列表中刪除 ,服務註冊列表信息和服務續約信息會被複 到集羣中的每Eureka Server 節點。來自任何區域 Eureka Client 均可 獲取整個系統的服務註冊列表信息。根據這些註冊列表信息, Application Client 遠程調用 Applicaton Service 來消費服務

  3)Register服務註冊

  服務註冊,即 Eureka Client向Eureka Server 提交本身服務信息 包括 IP 地址、 端口、Serviceld 等信息。 Eureka Client配置文件中 沒有配置 Serviceld ,則默認爲配置文件中配置的服務名 ,即$ {spring application.name }的值。

  Eureka Client 啓動時, 會將自身 的服務信息發送到 Eureka Server 這個過程其實很是簡單,如今從源碼角度分析服務註冊的過程,在Maven 的依賴包下,找到eureka-client-1.6.2.jar 包。在 com.netflix.discovery 包下有 DiscoveryClient 類,該類包含了Eureka Client和Eureka Server註冊的相關方法。其中, DiscoveryClient 實現了 EurekaClient而且它是單例模式,而 EurekaClient 繼承了 LookupServic 接口。

  

  在DiscoveryClient 類中有個服務註冊的方法register(), 該方法 經過Http 請求向Eureka Server註冊。

   boolean register() throws Throwable {
        logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
        EurekaHttpResponse httpResponse;
        try {
            httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo);
        } catch (Exception var3) {
            logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3});
            throw var3;
        }
        if (logger.isInfoEnabled()) {
            logger.info("DiscoveryClient_{} - registration status: {}", this.appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

  經過查詢register()調用狀況能夠知道,在InstanceInfoReplicator被調用,而且InstanceInfoReplicator實現了Runnable,能夠看出執行在run()方法中

public void run() {
        boolean var6 = false;

        ScheduledFuture next;
        label53: {
            try {
                var6 = true;
                this.discoveryClient.refreshInstanceInfo();
                Long dirtyTimestamp = this.instanceInfo.isDirtyWithTime();
                if (dirtyTimestamp != null) {
                    this.discoveryClient.register(); this.instanceInfo.unsetIsDirty(dirtyTimestamp.longValue());
                    var6 = false;
                } else {
                    var6 = false;
                }
                break label53;
            } catch (Throwable var7) {
                logger.warn("There was a problem with the instance info replicator", var7);
                var6 = false;
            } finally {
                if (var6) {
                    ScheduledFuture next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
                    this.scheduledPeriodicRef.set(next);
                }
            }
            next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
            this.scheduledPeriodicRef.set(next);
            return;
        }
        next = this.scheduler.schedule(this, (long)this.replicationIntervalSeconds, TimeUnit.SECONDS);
        this.scheduledPeriodicRef.set(next);
    }

  上面是具體的執行類,那具體的調用類在哪裏呢?經過在DiscoveryClient搜索能夠得知在initScheduledTasks()方法,initScheduledTasks()的調用就是在構造函數中實現的

private void initScheduledTasks() {
        int renewalIntervalInSecs;
        int expBackOffBound;
        if (this.clientConfig.shouldFetchRegistry()) {
       //獲取默認時間配置,默認30秒 renewalIntervalInSecs
= this.clientConfig.getRegistryFetchIntervalSeconds(); expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
       //定時任務 
this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); } if (this.clientConfig.shouldRegisterWithEureka()) { renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs);
       //定時任務
this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread(null)), (long)renewalIntervalInSecs, TimeUnit.SECONDS); this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); this.statusChangeListener = new StatusChangeListener() { public String getId() { return "statusChangeListener"; } public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) { DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent); } else { DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent); } DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate(); } }; if (this.clientConfig.shouldOnDemandUpdateStatusChange()) { this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener); } this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); } }

  而後跳出來,跟中Eureka Server發現EurekaBootStrap,咱們能夠得知EurekaBootStrap繼承具備初始化的權限,跟蹤得知

  

ServletContextListener:存在兩個方法:contextInitialized和contextDestroyed,意思就是容器初始化執行和容器銷燬時執行。
  protected void initEurekaServerContext() throws Exception {
        EurekaServerConfig eurekaServerConfig = new DefaultEurekaServerConfig();
        JsonXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
        XmlXStream.getInstance().registerConverter(new V1AwareInstanceInfoConverter(), 10000);
        logger.info("Initializing the eureka client...");
        logger.info(eurekaServerConfig.getJsonCodecName());
        ServerCodecs serverCodecs = new DefaultServerCodecs(eurekaServerConfig);
        ApplicationInfoManager applicationInfoManager = null;
        Object registry;
        if (this.eurekaClient == null) {
            registry = this.isCloud(ConfigurationManager.getDeploymentContext()) ? new CloudInstanceConfig() : new MyDataCenterInstanceConfig();
            applicationInfoManager = new ApplicationInfoManager((EurekaInstanceConfig)registry, (new EurekaConfigBasedInstanceInfoProvider((EurekaInstanceConfig)registry)).get());
            EurekaClientConfig eurekaClientConfig = new DefaultEurekaClientConfig();
            this.eurekaClient = new DiscoveryClient(applicationInfoManager, eurekaClientConfig);
        } else {
            applicationInfoManager = this.eurekaClient.getApplicationInfoManager();
        }

        if (this.isAws(applicationInfoManager.getInfo())) {
            registry = new AwsInstanceRegistry(eurekaServerConfig, this.eurekaClient.getEurekaClientConfig(), serverCodecs, this.eurekaClient);
            this.awsBinder = new AwsBinderDelegate(eurekaServerConfig, this.eurekaClient.getEurekaClientConfig(), (PeerAwareInstanceRegistry)registry, applicationInfoManager);
            this.awsBinder.start();
        } else {
            registry = new PeerAwareInstanceRegistryImpl(eurekaServerConfig, this.eurekaClient.getEurekaClientConfig(), serverCodecs, this.eurekaClient);
        }

        PeerEurekaNodes peerEurekaNodes = this.getPeerEurekaNodes((PeerAwareInstanceRegistry)registry, eurekaServerConfig, this.eurekaClient.getEurekaClientConfig(), serverCodecs, applicationInfoManager);
        this.serverContext = new DefaultEurekaServerContext(eurekaServerConfig, serverCodecs, (PeerAwareInstanceRegistry)registry, peerEurekaNodes, applicationInfoManager);
        EurekaServerContextHolder.initialize(this.serverContext);
        this.serverContext.initialize();
        logger.info("Initialized server context");
        int registryCount = ((PeerAwareInstanceRegistry)registry).syncUp();
        ((PeerAwareInstanceRegistry)registry).openForTraffic(applicationInfoManager, registryCount);
        EurekaMonitors.registerAllStats();
    }

  其中PeerAwareInstanceRegistryImpl和PeerEurekaNodes爲應該可高可用有關。

  看看PeerAwareInstanceRegistryImpl存在一個register()方法。

public void register(InstanceInfo info, boolean isReplication) {
        int leaseDuration = 90;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }

        super.register(info, leaseDuration, isReplication); this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication);
    }

  該方法提供了服務註冊功能,並同步到Eureka Server中。

  在父類AbstractInstanceRegistry中咱們看到更多細節

  public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            this.read.lock();
            Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
            EurekaMonitors.REGISTER.increment(isReplication);
            if (gMap == null) {
                ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap();
                gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }

            Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId());
            if (existingLease != null && existingLease.getHolder() != null) {
                Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp.longValue() > registrationLastDirtyTimestamp.longValue()) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = (InstanceInfo)existingLease.getHolder();
                }
            } else {
                Object var6 = this.lock;
                synchronized(this.lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin += 2;
                        this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold());
                    }
                }

                logger.debug("No previous lease information found; it is new registration");
            }

            Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }

            ((Map)gMap).put(registrant.getId(), lease);
            AbstractInstanceRegistry.CircularQueue var20 = this.recentRegisteredQueue;
            synchronized(this.recentRegisteredQueue) {
                this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")"));
            }

            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }

            InstanceStatus overriddenStatusFromMap = (InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }

            registrant.setActionType(ActionType.ADDED);
            this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication});
        } finally {
            this.read.unlock();
        }

    }

  能夠看到結果在一個Map中。

  而PeerAwareInstanceRegistryImpl的replicateToPeers()方法,爲把註冊信息同步到其餘Eureka Server中。

private void replicateToPeers(PeerAwareInstanceRegistryImpl.Action action, String appName, String id, InstanceInfo info, InstanceStatus newStatus, boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();

        try {
            if (isReplication) {
                this.numberOfReplicationsLastMin.increment();
            }

            if (this.peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            Iterator var8 = this.peerEurekaNodes.getPeerEurekaNodes().iterator();

            while(var8.hasNext()) {
                PeerEurekaNode node = (PeerEurekaNode)var8.next();
                if (!this.peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    this.replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
                }
            }
        } finally {
            tracer.stop();
        }

    }

  上面講述了Eureka Server的服務註冊和同步其餘Eureka的方式了,那麼誰來調用PeerAwareInstanceRegistryImpl的register()方法呢。

  前面也說過了,Eureka是經過http的方式進行通訊的,因此會存在調用接口來實現的。經過追蹤能夠看到爲ApplicationResourceaddInstance()添加實例方法

   @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        if (this.isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (this.isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (this.isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (this.isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!this.appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        } else {
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if (dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
                if (this.isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if (experimental) {
                        String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(entity).build();
                    }

                    if (dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
                        String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
                        if (effectiveId == null) {
                            amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }

            this.registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();
        }
    }

  4)Renew 服務續約

  服務續約和服務註冊很是類似,經過前文中的分析能夠知 ,服務註冊Eureka Client程序啓動以後 ,並同時開啓服務續約的定時任 務。DiscoveryClient 的類下有 renew()方法。注意從新註冊

boolean renew() {
        try {
       //發送續約請求 EurekaHttpResponse
<InstanceInfo> httpResponse = this.eurekaTransport.registrationClient.sendHeartBeat(this.instanceInfo.getAppName(), this.instanceInfo.getId(), this.instanceInfo, (InstanceStatus)null); logger.debug("DiscoveryClient_{} - Heartbeat status: {}", this.appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { this.REREGISTER_COUNTER.increment(); logger.info("DiscoveryClient_{} - Re-registering apps/{}", this.appPathIdentifier, this.instanceInfo.getAppName()); long timestamp = this.instanceInfo.setIsDirtyWithTime();
          //若是404,則從新註冊
boolean success = this.register(); if (success) { this.instanceInfo.unsetIsDirty(timestamp); } return success; } else { return httpResponse.getStatusCode() == 200; } } catch (Throwable var5) { logger.error("DiscoveryClient_{} - was unable to send heartbeat!", this.appPathIdentifier, var5); return false; } }

   Eureka Server端續約在InstanceResource之下,renewLease()方法。

   @PUT
    public Response renewLease(@HeaderParam("x-netflix-discovery-replication") String isReplication, @QueryParam("overriddenstatus") String overriddenStatus, @QueryParam("status") String status, @QueryParam("lastDirtyTimestamp") String lastDirtyTimestamp) {
        boolean isFromReplicaNode = "true".equals(isReplication);
     //續約
boolean isSuccess = this.registry.renew(this.app.getName(), this.id, isFromReplicaNode); if (!isSuccess) { logger.warn("Not Found (Renew): {} - {}", this.app.getName(), this.id); return Response.status(Status.NOT_FOUND).build(); } else { Response response = null; if (lastDirtyTimestamp != null && this.serverConfig.shouldSyncWhenTimestampDiffers()) { response = this.validateDirtyTimestamp(Long.valueOf(lastDirtyTimestamp), isFromReplicaNode); if (response.getStatus() == Status.NOT_FOUND.getStatusCode() && overriddenStatus != null && !InstanceStatus.UNKNOWN.name().equals(overriddenStatus) && isFromReplicaNode) { this.registry.storeOverriddenStatusIfRequired(this.app.getAppName(), this.id, InstanceStatus.valueOf(overriddenStatus)); } } else { response = Response.ok().build(); } logger.debug("Found (Renew): {} - {}; reply status={}", new Object[]{this.app.getName(), this.id, response.getStatus()}); return response; } }

  另外服務續約有兩個參數是能夠配置的,即 Eureka Client 發送續約心跳的時間參數Eureka Server 在多長時間內沒有收到心跳將實例剔除的時間參數。在默認狀況下,這兩個分別爲 30 秒和90秒, 官方的建議是不要修改,若是有特殊需求仍是能夠調整的,只須要分別Eureka Client Eureka Server 的配置文件 application.yml 中加如下的配置:

eureka:
  instance:
    lease-renewal-interval-in-seconds: 30
    lease-expiration-duration-in-seconds: 90

  其餘部分就大同小異了,能夠本身追蹤,這裏不贅述了。

  5)Eureka Client 延遲問題。

  a、Eureka Client 註冊延遲:Eureka Client 啓動以後,不是當即向 Eureka Server 註冊的,而是有一個延遲向服務端註冊的時間。經過跟蹤源碼,能夠發現默認的延遲時間爲 40 秒,源碼在DefaultEurekaClientConfig 類中。

  public int getInitialInstanceInfoReplicationIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "appinfo.initial.replicate.time", 40).get();
    }

 

  b、Eureka Server 的響應緩存:Eureka Server 維護每 30 更新一次響應緩存,可經過更改配置eureka.server.responseCacheUpdatelntervalMs 來修改。因此即便是剛剛註冊的實例,也不會當即出如今服務註冊列表中。

  c、Eureka Client 緩存:Eureka Client 保留註冊表信息的緩存。該緩存每 30 秒更新1次(如前所述)。所以, Eureka Client刷新本地緩存並發現其餘新註冊的實例可能須要 30 秒。

  d、LoadBalancer 緩存:Ribbon 的負載平衡器從本地的 Eureka Client 獲取服務註冊列表信息。 Ribbon 自己還維護了緩存,以免每一個請求都須要從 Eureka Client 獲取服務註冊列表。此緩存每30秒刷新一次(可由 ribbon.ServerListRe eshlnterval 置) ,因此可能至少須要30秒的時間才能使用新註冊的實例。

   6)Eureka的自我保護機制:簡單點就是Eureka會從相鄰節點獲取註冊信息,若是節點出現故障,嘗試從其餘地方獲取。若是能過正常獲取則更具配置設置續約的閾值。在任什麼時候候續約的信息低於85%(15分鐘),則開啓自我保護,即不在剔除註冊列表信息。這樣作的目的就是,保證消費者在使用過程當中的正常訪問。

  7)Eureka的默認配置數據

//
// Source code recreated from a .class file by IntelliJ IDEA
// (powered by Fernflower decompiler)
//

package com.netflix.discovery;

import com.google.inject.ProvidedBy;
import com.netflix.appinfo.EurekaAccept;
import com.netflix.config.DynamicPropertyFactory;
import com.netflix.config.DynamicStringProperty;
import com.netflix.discovery.internal.util.Archaius1Utils;
import com.netflix.discovery.providers.DefaultEurekaClientConfigProvider;
import com.netflix.discovery.shared.transport.DefaultEurekaTransportConfig;
import com.netflix.discovery.shared.transport.EurekaTransportConfig;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import javax.annotation.Nullable;
import javax.inject.Singleton;

@Singleton
@ProvidedBy(DefaultEurekaClientConfigProvider.class)
public class DefaultEurekaClientConfig implements EurekaClientConfig {
    /** @deprecated */
    @Deprecated
    public static final String DEFAULT_NAMESPACE = "eureka.";
    public static final String DEFAULT_ZONE = "defaultZone";
    private final String namespace;
    private final DynamicPropertyFactory configInstance;
    private final EurekaTransportConfig transportConfig;

    public DefaultEurekaClientConfig() {
        this("eureka");
    }

    public DefaultEurekaClientConfig(String namespace) {
        this.namespace = namespace.endsWith(".") ? namespace : namespace + ".";
        this.configInstance = Archaius1Utils.initConfig("eureka-client");
        this.transportConfig = new DefaultEurekaTransportConfig(namespace, this.configInstance);
    }

    public int getRegistryFetchIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "client.refresh.interval", 30).get();
    }

    public int getInstanceInfoReplicationIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "appinfo.replicate.interval", 30).get();
    }

    public int getInitialInstanceInfoReplicationIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "appinfo.initial.replicate.time", 40).get();
    }

    public int getEurekaServiceUrlPollIntervalSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "serviceUrlPollIntervalMs", 300000).get() / 1000;
    }

    public String getProxyHost() {
        return this.configInstance.getStringProperty(this.namespace + "eurekaServer.proxyHost", (String)null).get();
    }

    public String getProxyPort() {
        return this.configInstance.getStringProperty(this.namespace + "eurekaServer.proxyPort", (String)null).get();
    }

    public String getProxyUserName() {
        return this.configInstance.getStringProperty(this.namespace + "eurekaServer.proxyUserName", (String)null).get();
    }

    public String getProxyPassword() {
        return this.configInstance.getStringProperty(this.namespace + "eurekaServer.proxyPassword", (String)null).get();
    }

    public boolean shouldGZipContent() {
        return this.configInstance.getBooleanProperty(this.namespace + "eurekaServer.gzipContent", true).get();
    }

    public int getEurekaServerReadTimeoutSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "eurekaServer.readTimeout", 8).get();
    }

    public int getEurekaServerConnectTimeoutSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "eurekaServer.connectTimeout", 5).get();
    }

    public String getBackupRegistryImpl() {
        return this.configInstance.getStringProperty(this.namespace + "backupregistry", (String)null).get();
    }

    public int getEurekaServerTotalConnections() {
        return this.configInstance.getIntProperty(this.namespace + "eurekaServer.maxTotalConnections", 200).get();
    }

    public int getEurekaServerTotalConnectionsPerHost() {
        return this.configInstance.getIntProperty(this.namespace + "eurekaServer.maxConnectionsPerHost", 50).get();
    }

    public String getEurekaServerURLContext() {
        return this.configInstance.getStringProperty(this.namespace + "eurekaServer.context", this.configInstance.getStringProperty(this.namespace + "context", (String)null).get()).get();
    }

    public String getEurekaServerPort() {
        return this.configInstance.getStringProperty(this.namespace + "eurekaServer.port", this.configInstance.getStringProperty(this.namespace + "port", (String)null).get()).get();
    }

    public String getEurekaServerDNSName() {
        return this.configInstance.getStringProperty(this.namespace + "eurekaServer.domainName", this.configInstance.getStringProperty(this.namespace + "domainName", (String)null).get()).get();
    }

    public boolean shouldUseDnsForFetchingServiceUrls() {
        return this.configInstance.getBooleanProperty(this.namespace + "shouldUseDns", false).get();
    }

    public boolean shouldRegisterWithEureka() {
        return this.configInstance.getBooleanProperty(this.namespace + "registration.enabled", true).get();
    }

    public boolean shouldUnregisterOnShutdown() {
        return this.configInstance.getBooleanProperty(this.namespace + "shouldUnregisterOnShutdown", true).get();
    }

    public boolean shouldPreferSameZoneEureka() {
        return this.configInstance.getBooleanProperty(this.namespace + "preferSameZone", true).get();
    }

    public boolean allowRedirects() {
        return this.configInstance.getBooleanProperty(this.namespace + "allowRedirects", false).get();
    }

    public boolean shouldLogDeltaDiff() {
        return this.configInstance.getBooleanProperty(this.namespace + "printDeltaFullDiff", false).get();
    }

    public boolean shouldDisableDelta() {
        return this.configInstance.getBooleanProperty(this.namespace + "disableDelta", false).get();
    }

    @Nullable
    public String fetchRegistryForRemoteRegions() {
        return this.configInstance.getStringProperty(this.namespace + "fetchRemoteRegionsRegistry", (String)null).get();
    }

    public String getRegion() {
        DynamicStringProperty defaultEurekaRegion = this.configInstance.getStringProperty("eureka.region", "us-east-1");
        return this.configInstance.getStringProperty(this.namespace + "region", defaultEurekaRegion.get()).get();
    }

    public String[] getAvailabilityZones(String region) {
        return this.configInstance.getStringProperty(this.namespace + region + "." + "availabilityZones", "defaultZone").get().split(",");
    }

    public List<String> getEurekaServerServiceUrls(String myZone) {
        String serviceUrls = this.configInstance.getStringProperty(this.namespace + "serviceUrl" + "." + myZone, (String)null).get();
        if (serviceUrls == null || serviceUrls.isEmpty()) {
            serviceUrls = this.configInstance.getStringProperty(this.namespace + "serviceUrl" + ".default", (String)null).get();
        }

        return (List)(serviceUrls != null ? Arrays.asList(serviceUrls.split(",")) : new ArrayList());
    }

    public boolean shouldFilterOnlyUpInstances() {
        return this.configInstance.getBooleanProperty(this.namespace + "shouldFilterOnlyUpInstances", true).get();
    }

    public int getEurekaConnectionIdleTimeoutSeconds() {
        return this.configInstance.getIntProperty(this.namespace + "eurekaserver.connectionIdleTimeoutInSeconds", 30).get();
    }

    public boolean shouldFetchRegistry() {
        return this.configInstance.getBooleanProperty(this.namespace + "shouldFetchRegistry", true).get();
    }

    public String getRegistryRefreshSingleVipAddress() {
        return this.configInstance.getStringProperty(this.namespace + "registryRefreshSingleVipAddress", (String)null).get();
    }

    public int getHeartbeatExecutorThreadPoolSize() {
        return this.configInstance.getIntProperty(this.namespace + "client.heartbeat.threadPoolSize", 5).get();
    }

    public int getHeartbeatExecutorExponentialBackOffBound() {
        return this.configInstance.getIntProperty(this.namespace + "client.heartbeat.exponentialBackOffBound", 10).get();
    }

    public int getCacheRefreshExecutorThreadPoolSize() {
        return this.configInstance.getIntProperty(this.namespace + "client.cacheRefresh.threadPoolSize", 5).get();
    }

    public int getCacheRefreshExecutorExponentialBackOffBound() {
        return this.configInstance.getIntProperty(this.namespace + "client.cacheRefresh.exponentialBackOffBound", 10).get();
    }

    public String getDollarReplacement() {
        return this.configInstance.getStringProperty(this.namespace + "dollarReplacement", "_-").get();
    }

    public String getEscapeCharReplacement() {
        return this.configInstance.getStringProperty(this.namespace + "escapeCharReplacement", "__").get();
    }

    public boolean shouldOnDemandUpdateStatusChange() {
        return this.configInstance.getBooleanProperty(this.namespace + "shouldOnDemandUpdateStatusChange", true).get();
    }

    public boolean shouldEnforceRegistrationAtInit() {
        return this.configInstance.getBooleanProperty(this.namespace + "shouldEnforceRegistrationAtInit", false).get();
    }

    public String getEncoderName() {
        return this.configInstance.getStringProperty(this.namespace + "encoderName", (String)null).get();
    }

    public String getDecoderName() {
        return this.configInstance.getStringProperty(this.namespace + "decoderName", (String)null).get();
    }

    public String getClientDataAccept() {
        return this.configInstance.getStringProperty(this.namespace + "clientDataAccept", EurekaAccept.full.name()).get();
    }

    public String getExperimental(String name) {
        return this.configInstance.getStringProperty(this.namespace + "experimental" + "." + name, (String)null).get();
    }

    public EurekaTransportConfig getTransportConfig() {
        return this.transportConfig;
    }
}
View Code

  7、Eureka Server的集羣配置。

  1)前面的配置基本和Eureka Server的配置同樣,主要是application.的配置yaml

  server1:

server:
  port: 8671
eureka:
  instance:
    appname: server-master
  client:
    register-with-eureka: false
    fetch-registry: false
    service-url:
      defaultZone:
        http://127.0.0.1:8672/eureka/ # 實際開發中建議使用域名的方式
spring:
  application:
    name: server1

  server2:

server:
  port: 8672
eureka:
  instance:
    appname: server-backup
  client:
    register-with-eureka: false
    fetch-registry: false
    service-url:
      defaultZone:
        http://127.0.0.1:8671/eureka/ # 實際開發中建議使用域名的方式
spring:
  application:
    name: server2

  2)啓動項目瀏覽器訪問:http://127.0.0.1:8671/ http://127.0.0.1:8672/

  

 

 

   

 

   說明:這裏的DS Replicas是展現的域名,我這裏都是本地,因此能夠經過修改hosts,來體現不一樣的域名效果。

   3)測試客戶端,使用上面的client代碼測試註冊到8671端口

  結果:

  8671:

  

 

   8672:

  

相關文章
相關標籤/搜索