Eureka 客戶端和服務端間的交互

Eureka 服務器客戶端相關配置

1.創建eureka服務器

只須要使用@EnableEurekaServer註解就可讓應用變爲Eureka服務器,這是由於spring boot封裝了Eureka Server,讓你能夠嵌入到應用中直接使用。至於真正的EurekaServer是Netflix公司的開源項目,也是能夠單獨下載使用的java

@SpringBootApplication

  @EnableEurekaServer

  public class EurekaServer {

    public static void main(String[] args) {

    SpringApplication.run(EurekaServer.class, args);

      }

  }
在application.properties配置文件中使用以下配置:

  server.port=8761

  eureka.instance.hostname=localhost

  eureka.client.registerWithEureka=false

  eureka.client.fetchRegistry=false

  eureka.client.serviceUrl.defaultZone=http://${eureka.instance.hostname}:${server.port}/eureka/

其中server.port配置eureka服務器端口號。Eureka的配置屬性都在開源項目spring-cloud-netflix-master中定義,在這個項目中有兩個類EurekaInstanceConfigBean 和EurekaClientConfigBean,分別含有eureka.instance和eureka.client相關屬性的解釋和定義。從中能夠看到,registerWithEureka表示是否註冊自身到eureka服務器,由於當前這個應用就是eureka服務器,不必註冊自身,因此這裏是false。fetchRegistry表示是否從eureka服務器獲取註冊信息,同上,這裏不須要。defaultZone就比較重要了,是設置eureka服務器所在的地址,查詢服務和註冊服務都須要依賴這個地址node

2.讓服務使用eureka服務器

讓服務使用eureka服務器,只需添加@EnableDiscoveryClient註解就能夠了。在main方法所在的Application類中,添加@EnableDiscoveryClient註解。而後在配置文件中添加:git

eureka.client.serviceUrl.defaultZone=http\://localhost\:8761/eureka/

  spring.application.name=simple-service

pom文件須要增長:github

<dependency>

       <groupId>org.springframework.cloud</groupId>

       <artifactId>spring-cloud-starter-eureka-server</artifactId>

  </dependency>

其中defaultZone是指定eureka服務器的地址,不管是註冊仍是發現服務都須要這個地址。application.name是指定進行服務註冊時該服務的名稱。這個名稱就是後面調用服務時的服務標識符,pom文件須要增長:web

<dependency>

    <groupId>org.springframework.cloud</groupId>

    <artifactId>spring-cloud-starter-eureka</artifactId>

  </dependency>

  如此以來該服務啓動後會自動註冊到eureka服務器。若是在該服務中還須要調用別的服務,那麼直接使用那個服務的服務名稱加方法名構成的url便可。
  spring

Eureka Client 客戶端 服務註冊發現接口

Netxflix 提供的主要操做定義在com.netflix.discovery.EurekaClient中。主要操做有
json

其實現類是 com.netflix.discovery.DiscoveryClient。緩存

Spring cloud中對其進行了封裝,定義在org.springframework.cloud.client.discovery.DiscoveryClient中
服務器

服務發現核心類DiscoveryClient(Netflix 非 Spring)

最核心的一個構造方法

@Inject //google guice 注入遵循 JSR-330規範
    DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, DiscoveryClientOptionalArgs args,
                    Provider<BackupRegistry> backupRegistryProvider) {
        if (args != null) {
            this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
            this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
            this.eventBus = args.eventBus;
            this.discoveryJerseyClient = args.eurekaJerseyClient;
        } else {
            this.healthCheckCallbackProvider = null;
            this.healthCheckHandlerProvider = null;
            this.eventBus = null;
            this.discoveryJerseyClient = null;
        }

        this.applicationInfoManager = applicationInfoManager;
        InstanceInfo myInfo = applicationInfoManager.getInfo();

        this.backupRegistryProvider = backupRegistryProvider;

        try {
            scheduler = Executors.newScheduledThreadPool(3,
                    new ThreadFactoryBuilder()
                            .setNameFormat("DiscoveryClient-%d")
                            .setDaemon(true)
                            .build());
            clientConfig = config;
            staticClientConfig = clientConfig;
            transportConfig = config.getTransportConfig();
            instanceInfo = myInfo;
            if (myInfo != null) {
                appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
            } else {
                logger.warn("Setting instanceInfo to a passed in null value");
            }

            this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
            String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion());
            final String zone = InstanceInfo.getZone(availZones, myInfo);
            localRegionApps.set(new Applications());

            heartbeatExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>());  // use direct handoff

            cacheRefreshExecutor = new ThreadPoolExecutor(
                    1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
                    new SynchronousQueue<Runnable>());  // use direct handoff

            fetchRegistryGeneration = new AtomicLong(0);

            clientAccept = EurekaAccept.fromString(clientConfig.getClientDataAccept());

            eurekaTransport = new EurekaTransport();
            scheduleServerEndpointTask(eurekaTransport, zone);

            if (discoveryJerseyClient == null) {  // if not injected, create one

                EurekaJerseyClientBuilder clientBuilder = new EurekaJerseyClientBuilder()
                        .withUserAgent("Java-EurekaClient")
                        .withConnectionTimeout(clientConfig.getEurekaServerConnectTimeoutSeconds() * 1000)
                        .withReadTimeout(clientConfig.getEurekaServerReadTimeoutSeconds() * 1000)
                        .withMaxConnectionsPerHost(clientConfig.getEurekaServerTotalConnectionsPerHost())
                        .withMaxTotalConnections(clientConfig.getEurekaServerTotalConnections())
                        .withConnectionIdleTimeout(clientConfig.getEurekaConnectionIdleTimeoutSeconds())
                        .withEncoder(clientConfig.getEncoderName())
                        .withDecoder(clientConfig.getDecoderName(), clientConfig.getClientDataAccept());

                if (eurekaServiceUrls.get().get(0).startsWith("https://") &&
                        "true".equals(System.getProperty("com.netflix.eureka.shouldSSLConnectionsUseSystemSocketFactory"))) {
                    clientBuilder.withClientName("DiscoveryClient-HTTPClient-System")
                            .withSystemSSLConfiguration();
                } else if (clientConfig.getProxyHost() != null && clientConfig.getProxyPort() != null) {
                    clientBuilder.withClientName("Proxy-DiscoveryClient-HTTPClient")
                            .withProxy(
                                    clientConfig.getProxyHost(), clientConfig.getProxyPort(),
                                    clientConfig.getProxyUserName(), clientConfig.getProxyPassword()
                            );
                } else {
                    clientBuilder.withClientName("DiscoveryClient-HTTPClient");
                }
                discoveryJerseyClient = clientBuilder.build();
            }

            discoveryApacheClient = discoveryJerseyClient.getClient();

            remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
            remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
            AzToRegionMapper azToRegionMapper;
            if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
                azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
            } else {
                azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
            }
            if (null != remoteRegionsToFetch.get()) {
                azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
            }
            instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
            boolean enableGZIPContentEncodingFilter = config.shouldGZipContent();
            // should we enable GZip decoding of responses based on Response
            // Headers?
            if (enableGZIPContentEncodingFilter) {
                // compressed only if there exists a 'Content-Encoding' header
                // whose value is "gzip"
                discoveryApacheClient.addFilter(new GZIPContentEncodingFilter(false));
            }

            // always enable client identity headers
            String ip = instanceInfo == null ? null : instanceInfo.getIPAddr();
            EurekaClientIdentity identity = new EurekaClientIdentity(ip);
            discoveryApacheClient.addFilter(new EurekaIdentityHeaderFilter(identity));

            // add additional ClientFilters if specified
            if (args != null && args.additionalFilters != null) {
                for (ClientFilter filter : args.additionalFilters) {
                    discoveryApacheClient.addFilter(filter);
                }
            }

        } catch (Throwable e) {
            throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
        }

        if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
            fetchRegistryFromBackup();
        }

        initScheduledTasks();
        try {
            Monitors.registerObject(this);
        } catch (Throwable e) {
            logger.warn("Cannot register timers", e);
        }
        this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
        this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});

        // This is a bit of hack to allow for existing code using DiscoveryManager.getInstance()
        // to work with DI'd DiscoveryClient
        DiscoveryManager.getInstance().setDiscoveryClient(this);
        DiscoveryManager.getInstance().setEurekaClientConfig(config);

        initTimestampMs = System.currentTimeMillis();
    }

其中有初始化 部分定時任務的(心跳任務、獲取註冊信息任務)
client.refresh.interval 30 任務執行間隔app

/**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (shouldRegister(instanceInfo)) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }

                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };

            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }

            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

a. HeartbeatThread 心跳任務 (心跳線程默認30秒一次 第一次心跳即爲第一次註冊)

其中 renew() 方法 爲核心方法 ,此心跳機制用來通知Eureka Server 該服務狀態正常.若是超過90秒仍然鏈接失敗則此服務會被Eureka Server 從服務註冊表移除。此30秒默認間隔不建議調整。

/**
     * Renew with the eureka service by making the appropriate REST call
     * transport.query.enabled 此屬性是否爲true決定了
     * 是由 EurekaHttpClient 
     */
    boolean renew() {
        if (shouldUseExperimentalTransportForRegistration()) {
            EurekaHttpResponse<InstanceInfo> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
                logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
                if (httpResponse.getStatusCode() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                    return register();
                }
                return httpResponse.getStatusCode() == 200;
            } catch (Throwable e) {
                logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                return false;
            }
        } else {
            ClientResponse response = null;
            try {
                response = makeRemoteCall(Action.Renew);
                logger.debug("{} - Heartbeat status: {}", PREFIX + appPathIdentifier, (response != null ? response.getStatus() : "not sent"));
                if (response == null) {
                    return false;
                }
                if (response.getStatus() == 404) {
                    REREGISTER_COUNTER.increment();
                    logger.info("{} - Re-registering apps/{}", PREFIX + appPathIdentifier, instanceInfo.getAppName());
                    return register();
                }
            } catch (Throwable e) {
                logger.error("{} - was unable to send heartbeat!", PREFIX + appPathIdentifier, e);
                return false;
            } finally {
                if (response != null) {
                    response.close();
                }
            }
            return true;
        }
    }
    
    /**
     * Register with the eureka service by making the appropriate REST call.
     */
    boolean register() throws Throwable {
        logger.info(PREFIX + appPathIdentifier + ": registering service...");
        if (shouldUseExperimentalTransportForRegistration()) {
            EurekaHttpResponse<Void> httpResponse;
            try {
                httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
            } catch (Exception e) {
                logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                throw e;
            }
            isRegisteredWithDiscovery = true;
            if (logger.isInfoEnabled()) {
                logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
            }
            return httpResponse.getStatusCode() == 204;
        } else {
            ClientResponse response = null;
            try {
                response = makeRemoteCall(Action.Register);
                isRegisteredWithDiscovery = true;
                logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, (response != null ? response.getStatus() : "not sent"));
                return response != null && response.getStatus() == 204;
            } catch (Throwable e) {
                logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
                throw e;
            } finally {
                closeResponse(response);
            }
        }
    }

註冊時所傳信息 以下:

b.CacheRefreshThread 獲取全部服務註冊信息的定時任務

獲取遠端註冊信息,根據參數會決定是調用getAndStoreFullRegistry()或者getAndUpdateDelta(applications)獲取服務端的註冊信息刷新client的緩存

第一次獲取到全部服務的註冊信息後會將其緩存到本地(getAndStoreFullRegistry()),
存放容器爲 DiscoveryClient 中的 AtomicReference (能夠用原子方式更新的對象引用)

private final AtomicReference<Applications> localRegionApps = 
new AtomicReference<Applications>();
/**
     * The task that fetches the registry information at specified intervals.
     *
     */
    class CacheRefreshThread implements Runnable {
        public void run() {
            try {
                boolean isFetchingRemoteRegionRegistries = isFetchingRemoteRegionRegistries();

                boolean remoteRegionsModified = false;
                // This makes sure that a dynamic change to remote regions to fetch is honored.
                String latestRemoteRegions = clientConfig.fetchRegistryForRemoteRegions();
                if (null != latestRemoteRegions) {
                    String currentRemoteRegions = remoteRegionsToFetch.get();
                    if (!latestRemoteRegions.equals(currentRemoteRegions)) {
                        // Both remoteRegionsToFetch and AzToRegionMapper.regionsToFetch need to be in sync
                        synchronized (instanceRegionChecker.getAzToRegionMapper()) {
                            if (remoteRegionsToFetch.compareAndSet(currentRemoteRegions, latestRemoteRegions)) {
                                String[] remoteRegions = latestRemoteRegions.split(",");
                                remoteRegionsRef.set(remoteRegions);
                                instanceRegionChecker.getAzToRegionMapper().setRegionsToFetch(remoteRegions);
                                remoteRegionsModified = true;
                            } else {
                                logger.info("Remote regions to fetch modified concurrently," +
                                        " ignoring change from {} to {}", currentRemoteRegions, latestRemoteRegions);
                            }
                        }
                    } else {
                        // Just refresh mapping to reflect any DNS/Property change
                        instanceRegionChecker.getAzToRegionMapper().refreshMapping();
                    }
                }

                boolean success = fetchRegistry(remoteRegionsModified);
                if (success) {
                    registrySize = localRegionApps.get().size();
                    lastSuccessfulRegistryFetchTimestamp = System.currentTimeMillis();
                }

                if (logger.isDebugEnabled()) {
                    StringBuilder allAppsHashCodes = new StringBuilder();
                    allAppsHashCodes.append("Local region apps hashcode: ");
                    allAppsHashCodes.append(localRegionApps.get().getAppsHashCode());
                    allAppsHashCodes.append(", is fetching remote regions? ");
                    allAppsHashCodes.append(isFetchingRemoteRegionRegistries);
                    for (Map.Entry<String, Applications> entry : remoteRegionVsApps.entrySet()) {
                        allAppsHashCodes.append(", Remote region: ");
                        allAppsHashCodes.append(entry.getKey());
                        allAppsHashCodes.append(" , apps hashcode: ");
                        allAppsHashCodes.append(entry.getValue().getAppsHashCode());
                    }
                    logger.debug("Completed cache refresh task for discovery. All Apps hash code is {} ",
                            allAppsHashCodes.toString());
                }
            } catch (Throwable th) {
                logger.error("Cannot fetch registry from server", th);
            }
        }
    }
    
    
   /**
     * Fetches the registry information.
     *
     * <p>
     * This method tries to get only deltas after the first fetch unless there
     * is an issue in reconciling eureka server and client registry information.
     * </p>
     *
     * @param forceFullRegistryFetch Forces a full registry fetch.
     *
     * @return true if the registry was fetched
     */
    private boolean fetchRegistry(boolean forceFullRegistryFetch) {
        Stopwatch tracer = FETCH_REGISTRY_TIMER.start();

        try {
            // If the delta is disabled or if it is the first time, get all
            // applications
            Applications applications = getApplications();

            if (clientConfig.shouldDisableDelta()
                    || (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
                    || forceFullRegistryFetch
                    || (applications == null)
                    || (applications.getRegisteredApplications().size() == 0)
                    || (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
            {
                logger.info("Disable delta property : {}", clientConfig.shouldDisableDelta());
                logger.info("Single vip registry refresh property : {}", clientConfig.getRegistryRefreshSingleVipAddress());
                logger.info("Force full registry fetch : {}", forceFullRegistryFetch);
                logger.info("Application is null : {}", (applications == null));
                logger.info("Registered Applications size is zero : {}",
                        (applications.getRegisteredApplications().size() == 0));
                logger.info("Application version is -1: {}", (applications.getVersion() == -1));
                getAndStoreFullRegistry();
            } else {
                getAndUpdateDelta(applications);
            }
            applications.setAppsHashCode(applications.getReconcileHashCode());
            logTotalInstances();
        } catch (Throwable e) {
            logger.error(PREFIX + appPathIdentifier + " - was unable to refresh its cache! status = " + e.getMessage(), e);
            return false;
        } finally {
            if (tracer != null) {
                tracer.stop();
            }
        }

        // Notify about cache refresh before updating the instance remote status
        onCacheRefreshed();

        // Update remote status based on refreshed data held in the cache
        updateInstanceRemoteStatus();

        // registry was fetched successfully, so return true
        return true;
    }

上文getAndStoreFullRegistry() 裏面有個httpclient

com.netflix.discovery.shared.transport.jersey.AbstractJerseyEurekaHttpClient.getApplications其實就是發起一個Rest請求

@Override
public EurekaHttpResponse<Applications> getApplications(String… regions) {
return getApplicationsInternal(「apps/」, regions);
}

 private EurekaHttpResponse<Applications> getApplicationsInternal(String urlPath, String[] regions) {
            WebResource webResource = jerseyClient.resource(serviceUrl).path(urlPath);
                regionsParamValue = StringUtil.join(regions);
                webResource = webResource.queryParam("regions", regionsParamValue);
            Builder requestBuilder = webResource.getRequestBuilder();
            addExtraHeaders(requestBuilder);
            response = requestBuilder.accept(MediaType.APPLICATION_JSON_TYPE).get(ClientResponse.class);
}

Eureka Server 服務端

1.服務端接受請求

能夠看到服務端的也是開放一個rest接口

@POST
@Consumes({「application/json」, 「application/xml」})
public Response addInstance(InstanceInfo info,
@HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
logger.debug(「Registering instance {} (replication={})」, info.getId(), isReplicat
registry.register(info, 「true」.equals(isReplication));
}

2.執行註冊com.netflix.eureka.registry.PeerAwareInstanceRegistryImpl.register(InstanceInfo info, boolean isReplication)

/**
     * Registers the information about the {@link InstanceInfo} and replicates
     * this information to all peer eureka nodes. If this is replication event
     * from other replica nodes then it is not replicated.
     *
     * @param info
     *            the {@link InstanceInfo} to be registered and replicated.
     * @param isReplication
     *            true if this is a replication event from other replica nodes,
     *            false otherwise.
     */
    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

3.註冊信息保存

在其父類 AbstractInstanceRegistry 中實現通常的註冊信息存儲的操做,其實就是存儲在一個 ConcurrentHashMap<String, Map<String, Lease >> registry的結構中。

private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry
 = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
/**
     * Registers a new instance with a given duration.
     *
     * @see com.netflix.eureka.lease.LeaseManager#register(java.lang.Object, int, boolean)
     */
    public void register(InstanceInfo r, int leaseDuration, boolean isReplication) {
        try {
            read.lock();
            Map<String, Lease<InstanceInfo>> gMap = registry.get(r.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap =
                        new ConcurrentHashMap<String, Lease<InstanceInfo>>();
                gMap = registry.putIfAbsent(r.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(r.getId());
            // Retain the last dirty timestamp without overwriting it, if there is already a lease
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = r.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is " +
                                    "greater than the one that is being registered {}",
                            existingLastDirtyTimestamp,
                            registrationLastDirtyTimestamp);
                    r.setLastDirtyTimestamp(existingLastDirtyTimestamp);
                }
            } else {
                // The lease does not exist and hence it is a new registration
                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(r, leaseDuration);
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(r.getId(), lease);
            synchronized (recentRegisteredQueue) {
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        r.getAppName() + "(" + r.getId() + ")"));
            }
            // This is where the initial state transfer of overridden status happens
            if (!InstanceStatus.UNKNOWN.equals(r.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", r.getOverriddenStatus(), r.getId());
                if (!overriddenInstanceStatusMap.containsKey(r.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", r.getId());
                    overriddenInstanceStatusMap.put(r.getId(), r.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(r.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                r.setOverriddenStatus(overriddenStatusFromMap);
            }

            // Set the status based on the overridden status rules
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(r, existingLease, isReplication);
            r.setStatusWithoutDirty(overriddenInstanceStatus);

            // If the lease is registered with UP status, set lease service up timestamp
            if (InstanceStatus.UP.equals(r.getStatus())) {
                lease.serviceUp();
            }
            r.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            r.setLastUpdatedTimestamp();
            invalidateCache(r.getAppName(), r.getVIPAddress(), r.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    r.getAppName(), r.getId(), r.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }

附:

Eureka REST operations

相關文章
相關標籤/搜索