【源碼系列】Eureka源碼分析

  對於服務註冊中心、服務提供者、服務消費者這個三個主要元素來講,服務提供者和服務消費者(即Eureka客戶端)在整個運行機制中是大部分通訊行爲的主動發起者(服務註冊、續約、下線等),而註冊中心主要是處理請求的接收者。因此,咱們從Eureka的客戶端爲入口分析它是如何完成這些主動通訊的。
  通常狀況下,咱們將一個SpringBoot應用註冊到 Eureka Server 或者從 Eureka Server 獲取服務器列表時,就作了兩件事:html

  1. 在應用啓動類添加註解 @EnableDiscoveryClient
  2. 在 application.properties 文件上用 eureka.client.service-url.defaultZone 參數指定註冊中心的地址

咱們先看看 @EnableDiscoveryClient 這個註解的源碼,以下:java

/** * Annotation to enable a DiscoveryClient implementation. * @author Spencer Gibb */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {

    /** * If true, the ServiceRegistry will automatically register the local server. */
    boolean autoRegister() default true;
}

經過註釋能夠知道,該註解能夠開啓 DiscoveryClient 實例,而後咱們搜索 DiscoveryClient 會發現一個類和一個接口,它們的關係如圖。算法

 

enter description here
enter description here

右邊的 org.springframework.cloud.client.discovery.DiscoveryClient 是SpringCloud的接口,體現了面向接口編程的思想,定義了用來發現服務的經常使用抽象方法。 org.springframework.cloud.netflix.eureka.EurekaDiscoveryClient是該接口的實現,是對Eureka發現服務的封裝,內部依賴了一個EurekaClient接口,因此真正實現發現服務的是 com.netflix.discovery.DiscoveryClient類。
查看類註釋的內容:

 

/** * The class that is instrumental for interactions with <tt>Eureka Server</tt>. * * <p> * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from * <tt>Eureka Server</tt> during shutdown * <p> * d) <em>Querying</em> the list of services/instances registered with * <tt>Eureka Server</tt> * <p> * * <p> * <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt> * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips * which do not change. All of the functions defined above fail-over to other * {@link java.net.URL}s specified in the list in the case of failure. * </p> * * @author Karthik Ranganathan, Greg Kim * @author Spencer Gibb * */
@Singleton
public class DiscoveryClient implements EurekaClient {
    ...
}

這個類用於幫助與 Eureka Server 相互協做
Eureka Client客戶端負責如下內容:spring

  1. 向Eureka Server 註冊服務實例
  2. 向 Eureka Server 服務續約
  3. 服務關閉時取消租約
  4. 查詢註冊在 Eureka Server 上的服務或實例列表
    Eureka Client 還須要配置一個 Eureka Server 的服務列表。

哪裏對Eureka Server的URL列表配置?

根據咱們配置的屬性名eureka.client.serviceUrl.defaultZone,經過serviceUrl能夠找到該屬性相關的加載屬性,就是DiscoveryClient裏有個getEurekaServiceUrlsFromConfig()方法可是棄用了,改用EndpointUtils這個工具類,代碼以下:編程

  1. /** 
  2. * Get the list of all eureka service urls from properties file for the eureka client to talk to. 
  3. * 
  4. * @param clientConfig the clientConfig to use 
  5. * @param instanceZone The zone in which the client resides 
  6. * @param preferSameZone true if we have to prefer the same zone as the client, false otherwise 
  7. * @return an (ordered) map of zone -> list of urls mappings, with the preferred zone first in iteration order 
  8. */ 
  9. public static Map<String, List<String>> getServiceUrlsMapFromConfig(EurekaClientConfig clientConfig, String instanceZone, boolean preferSameZone) { 
  10. Map<String, List<String>> orderedUrls = new LinkedHashMap<>(); 
  11. String region = getRegion(clientConfig); 
  12. String[] availZones = clientConfig.getAvailabilityZones(clientConfig.getRegion()); 
  13. if (availZones == null || availZones.length == 0) { 
  14. availZones = new String[1]; 
  15. availZones[0] = DEFAULT_ZONE; 
  16. logger.debug("The availability zone for the given region {} are {}", region, availZones); 
  17. int myZoneOffset = getZoneOffset(instanceZone, preferSameZone, availZones); 
  18.  
  19. String zone = availZones[myZoneOffset]; 
  20. List<String> serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); 
  21. if (serviceUrls != null) { 
  22. orderedUrls.put(zone, serviceUrls); 
  23. int currentOffset = myZoneOffset == (availZones.length - 1) ? 0 : (myZoneOffset + 1); 
  24. while (currentOffset != myZoneOffset) { 
  25. zone = availZones[currentOffset]; 
  26. serviceUrls = clientConfig.getEurekaServerServiceUrls(zone); 
  27. if (serviceUrls != null) { 
  28. orderedUrls.put(zone, serviceUrls); 
  29. if (currentOffset == (availZones.length - 1)) { 
  30. currentOffset = 0
  31. } else
  32. currentOffset++; 
  33.  
  34. if (orderedUrls.size() < 1) { 
  35. throw new IllegalArgumentException("DiscoveryClient: invalid serviceUrl specified!"); 
  36. return orderedUrls; 
  37.  

Region,Zone

getRegion()方法能夠看出一個微服務應用只能夠屬於一個Region,若是沒配置則爲default,能夠經過eureka.client.region屬性來定義。json

public static String getRegion(EurekaClientConfig clientConfig) {
        String region = clientConfig.getRegion();
        if (region == null) {
            region = DEFAULT_REGION;
        }
        region = region.trim().toLowerCase();
        return region;
    }

getAvailabilityZones()方法能夠看出Region與Zone的關係,一個Region能夠有多個Zone,設置時能夠用逗號來分隔。默認採用defaultZone。服務器

public String[] getAvailabilityZones(String region) {
        String value = (String)this.availabilityZones.get(region);
        if(value == null) {
            value = "defaultZone";
        }

        return value.split(",");
    }

在獲取Region和Zone的信息後,根據傳入的參數按必定的算法肯定加載位於哪個Zone的serviceUrls。app

 

enter description here
enter description here

getEurekaServerServiceUrls方法是EurekaClientConfigBean的實現類,該方法用來獲取一個Zone下配置的因此serviceUrl,經過標註出來的地方能夠知道,eureka.client.serviceUrl.defaultZone屬性能夠配置多個,用逗號來分隔。

 

 

enter description here
enter description here

注意: Ribbon具備區域親和特性,Ribbon的默認策略會優先訪問同客戶端處於同一個Zone中的實例。因此經過Zone屬性的定義,配置實際部署的物理結構,咱們就能夠有效地設計出對區域性故障的容錯集羣。

 

服務註冊

前面說了多個服務註冊中心信息的加載,這裏再看看 DiscoveryClient 類是如何實現服務註冊的。經過查看該類的構造函數,發現它調用瞭如下方法。ide

/** * Initializes all scheduled tasks. */
    private void initScheduledTasks() {
        ...

        if (clientConfig.shouldRegisterWithEureka()) {
           ...

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

            // InstanceInfo replicator
            instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize

            ...
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

這裏先根據配置判斷是否是要註冊到 Eureka,而後建立心跳檢測任務,獲取 instanceInfoReplicator。InstanceInfoReplicator類實現 Runnable接口,instanceInfoReplicator實例會執行一個定時任務,這個定時任務的內容能夠查看該類的run()方法。函數

 

enter description here
enter description here

這裏定時刷新實例信息,discoveryClient.register()這裏觸發了服務註冊,register()的內容以下:

 

 

服務註冊的方法
服務註冊的方法

經過註釋也能看出來,這裏是經過發送REST請求的方式進行的, com.netflix.appinfo.InstanceInfo就是註冊時客戶端給服務端的元數據。

 

服務獲取與服務續約

上面說到的 initScheduledTasks() 方法還有兩個定時任務,分別是服務獲取和服務續約。

private void initScheduledTasks() {
        if (clientConfig.shouldFetchRegistry()) {
            // registry cache refresh timer
            int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
            int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "cacheRefresh",
                            scheduler,
                            cacheRefreshExecutor,
                            registryFetchIntervalSeconds,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new CacheRefreshThread()
                    ),
                    registryFetchIntervalSeconds, TimeUnit.SECONDS);
        }

        if (clientConfig.shouldRegisterWithEureka()) {
            int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
            int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
            logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);

            // Heartbeat timer
            scheduler.schedule(
                    new TimedSupervisorTask(
                            "heartbeat",
                            scheduler,
                            heartbeatExecutor,
                            renewalIntervalInSecs,
                            TimeUnit.SECONDS,
                            expBackOffBound,
                            new HeartbeatThread()
                    ),
                    renewalIntervalInSecs, TimeUnit.SECONDS);

           	...
            instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
        } else {
            logger.info("Not registering with Eureka server per configuration");
        }
    }

clientConfig.shouldFetchRegistry()這裏實際上是經過eureka.client.fetch-registry參數來判斷的,默認爲true,它能夠按期更新客戶端的服務清單,從而客戶端能訪問到健康的服務實例。
服務續約也是發送REST請求實現的。

boolean renew() {
        EurekaHttpResponse<InstanceInfo> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
            logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
            if (httpResponse.getStatusCode() == 404) {
                REREGISTER_COUNTER.increment();
                logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
                long timestamp = instanceInfo.setIsDirtyWithTime();
                boolean success = register();
                if (success) {
                    instanceInfo.unsetIsDirty(timestamp);
                }
                return success;
            }
            return httpResponse.getStatusCode() == 200;
        } catch (Throwable e) {
            logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
            return false;
        }
    }

服務獲取的過程省略。

服務下線

服務端根據實例Id和appName執行remove操做。

void unregister() {
        // It can be null if shouldRegisterWithEureka == false
        if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
            try {
                logger.info("Unregistering ...");
                EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
                logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
            } catch (Exception e) {
                logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
            }
        }
    }

註冊中心處理

前面的分析都是從客戶端出發的,如今看看 Eureka Server是如何處理各類Rest請求的。這種請求的定義都在com.netflix.eureka.resources包下。
以服務註冊爲例:
調用 ApplicationResource 類下的 addInstance()方法。

@POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        if(this.isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if(this.isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if(this.isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if(this.isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if(!this.appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
        } else if(info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if(info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        } else {
            DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
            if(dataCenterInfo instanceof UniqueIdentifier) {
                String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
                if(this.isBlank(dataCenterInfoId)) {
                    boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                    if(experimental) {
                        String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                        return Response.status(400).entity(entity).build();
                    }

                    if(dataCenterInfo instanceof AmazonInfo) {
                        AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
                        String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
                        if(effectiveId == null) {
                            amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
                        }
                    } else {
                        logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                    }
                }
            }

            this.registry.register(info, "true".equals(isReplication));
            return Response.status(204).build();
        }
    }

在對註冊信息進行校驗後,會調用org.springframework.cloud.netflix.eureka.server.InstanceRegistry的register(InstanceInfo info, int leaseDuration, boolean isReplication)方法。

 

enter description here
enter description here

 

 

enter description here
enter description here

首先會把新服務註冊事件傳播出去,而後調用父類com.netflix.eureka.registry.AbstractInstanceRegistry中的實現。

 

public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
            this.read.lock();
            Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
            ...
        } finally {
            this.read.unlock();
        }

    }

 

保存實例信息的雙層Map
保存實例信息的雙層Map
InstanceInfo的元數據信息保存在一個ConcurrentHashMap中,它是一個雙層的Map結構,第一層的key是服務名(即InstanceInfo的appName屬性),第二層的key是實例名(即InstanceInfo的InstanceId屬性)。 ApplicationResource中的其餘方法能夠自行研究。
相關文章
相關標籤/搜索