Spring Cloud Eureka服務註冊源碼分析

Eureka是怎麼work的

那eureka client如何將本地服務的註冊信息發送到遠端的註冊服務器eureka server上。經過下面的源碼分析,看出Eureka Client的定時任務調用Eureka Server的Reset接口,而Eureka接收到調用請求後會處理服務的註冊以及Eureka Server中的數據同步的問題。java

服務註冊

源碼分析,看出服務註冊能夠認爲是Eureka client本身完成,不須要服務自己來關心。spring

 

Eureka Client的定時任務調用Eureka Server的提供接口

在com.netflix.discovery.DiscoveryClient啓動的時候,會初始化一個定時任務,定時的把本地的服務配置信息,即須要註冊到遠端的服務信息自動刷新到註冊服務器上。
首先看一下Eureka的代碼,在spring-cloud-netflix-eureka-server工程中能夠找到這個依賴eureka-client-1.4.11.jar查看代碼能夠看到,
com.netflix.discovery.DiscoveryClient.java中的1240行能夠看到Initializes all scheduled tasks,在1277行,能夠看到InstanceInfoReplicator定時任務
json

 

在DiscoveryClient中初始化一個InstanceInfoReplicator,其實裏面封裝了以定時任務。服務器

/**
     * Initializes all scheduled tasks.
     */
    private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }
        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: " + renewalIntervalInSecs);
            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);
            // InstanceInfo replicator
            /**************************封裝了定時任務**********************************/
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize
            statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
                @Override
                public String getId() {
                    return "statusChangeListener";
                }
                @Override
                public void notify(StatusChangeEvent statusChangeEvent) {
                    if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
                            InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
                        // log at warn level if DOWN was involved
                        logger.warn("Saw local status change event {}", statusChangeEvent);
                    } else {
                        logger.info("Saw local status change event {}", statusChangeEvent);
                    }
                    instanceInfoReplicator.onDemandUpdate();
                }
            };
            if (clientConfig.shouldOnDemandUpdateStatusChange()) {
                applicationInfoManager.registerStatusChangeListener(statusChangeListener);
            }
            //點擊能夠查看start方法
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

 

com.netflix.discovery.DiscoveryClient中的 register()方法,大概在811行。app

/**
 * Register with the eureka service by making the appropriate REST call.
 */
boolean register() throws Throwable {
    logger.info(PREFIX + appPathIdentifier + ": registering service...");
    EurekaHttpResponse<Void> httpResponse;
    try {
        //Eureka Client客戶端,調用Eureka服務端的入口
        httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
    } catch (Exception e) {
        logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
        throw e;
    }
    if (logger.isInfoEnabled()) {
        logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
    }
    return httpResponse.getStatusCode() == 204;
}

 

 

Eureka server服務端請求入口
ApplicationResource.java文件中第183行,以下所示,能夠看出Eureka是經過http post的方式去服務註冊ide

@POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        if(this.isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if(this.isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if(this.isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if(!this.appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
        } else if(info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if(info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        } else {
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if(dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
                if(this.isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if(experimental) {
                        String amazonInfo1 = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(amazonInfo1).build();
                    }

                    if(dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
                        String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
                        if(effectiveId == null) {
                            amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }
       
  //進入到PeerAwareInstanceRegistryImpl中的register方法
  this.registry.register(info, "true".equals(isReplication)); 

  return Response.status(204).build();
}
}

 

 

InstanceRegistry.java文件中的52行,能夠看到調用PeerAwareInstanceRegistryImpl中的278行register方法源碼分析

 public void register(InstanceInfo info, boolean isReplication) {
        this.handleRegistration(info, this.resolveInstanceLeaseDuration(info), isReplication);
        super.register(info, isReplication);
 }

 

PeerAwareInstanceRegistryImpl中的278行register方法post

    public void register(InstanceInfo info, boolean isReplication) {
        int leaseDuration = 90;
        if(info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
    ////調用父類方法註冊
        super.register(info, leaseDuration, isReplication);
    // 同步Eureka中的服務信息
this.replicateToPeers(PeerAwareInstanceRegistryImpl.Action.Register, info.getAppName(), info.getId(), info, (InstanceStatus)null, isReplication); }

 AbstractInstanceRegistry.java中151行,能夠看到Eureka真正的服務註冊實現的代碼

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            this.read.lock();
            Object gMap = (Map)this.registry.get(registrant.getAppName());
            EurekaMonitors.REGISTER.increment(isReplication);
            if(gMap == null) {
                ConcurrentHashMap existingLease = new ConcurrentHashMap();
                gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), existingLease);
                if(gMap == null) {
                    gMap = existingLease;
                }
            }

            Lease existingLease1 = (Lease)((Map)gMap).get(registrant.getId());
            if(existingLease1 != null && existingLease1.getHolder() != null) {
                Long lease1 = ((InstanceInfo)existingLease1.getHolder()).getLastDirtyTimestamp();
                Long overriddenStatusFromMap = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", lease1, overriddenStatusFromMap);
                if(lease1.longValue() > overriddenStatusFromMap.longValue()) {
                    logger.warn("There is an existing lease and the existing lease\'s dirty timestamp {} is greater than the one that is being registered {}", lease1, overriddenStatusFromMap);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = (InstanceInfo)existingLease1.getHolder();
                }
            } else {
                Object lease = this.lock;
                synchronized(this.lock) {
                    if(this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin += 2;
                        this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold());
                    }
                }

                logger.debug("No previous lease information found; it is new registration");
            }

            Lease lease2 = new Lease(registrant, leaseDuration);
            if(existingLease1 != null) {
                lease2.setServiceUpTimestamp(existingLease1.getServiceUpTimestamp());
            }

            ((Map)gMap).put(registrant.getId(), lease2);
            AbstractInstanceRegistry.CircularQueue overriddenStatusFromMap1 = this.recentRegisteredQueue;
            synchronized(this.recentRegisteredQueue) {
                this.recentRegisteredQueue.add(new Pair(Long.valueOf(System.currentTimeMillis()), registrant.getAppName() + "(" + registrant.getId() + ")"));
            }

            if(!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides", registrant.getOverriddenStatus(), registrant.getId());
                if(!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }

            InstanceStatus overriddenStatusFromMap2 = (InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId());
            if(overriddenStatusFromMap2 != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap2);
                registrant.setOverriddenStatus(overriddenStatusFromMap2);
            }

            InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease1, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);
            if(InstanceStatus.UP.equals(registrant.getStatus())) {
                lease2.serviceUp();
            }

            registrant.setActionType(ActionType.ADDED);
            this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease2));
            registrant.setLastUpdatedTimestamp();
            this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), Boolean.valueOf(isReplication)});
        } finally {
            this.read.unlock();
        }

    }

 

 說明:註冊信息其實就是存儲在一個 ConcurrentHashMap<string, map<string,="" lease<instanceinfo="">>> registry的結構中。ui

 private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap();

 

 

 

總結

 

ApplicationResource類接收Http服務請求,調用PeerAwareInstanceRegistryImpl的register方法,PeerAwareInstanceRegistryImpl完成服務註冊後,this

調用replicateToPeers向其它Eureka Server節點(Peer)作狀態同步。以下圖所示。
Eureka Server註冊時序圖

 

 

相關類

EurekaHttpClient:提供eureka 的客戶端API方法 ,實現類JerseyReplicationClient ,eureka客戶端發送http 請求類


ApplicationResource :eureka 服務端接收 eureka 客戶端發送請求的處理類

InstanceRegistry eureka 服務端註冊表實例類

LookupService::查詢活動服務的實例接口

DefaultEurekaClientConfig:eureka 客戶端默認配置類

EurekaClientConfigBean :eureka 客戶端配置bean ,在配置文件中提示屬性的類,配置前綴:eureka.client

相關文章
相關標籤/搜索