SpringCloud源碼閱讀1-EurekaServer源碼的祕密

題外話:

Spring Cloud Netflix 做爲springcloud 咱們經常使用的一個項目,其子項目Eureka,zuul,Rebbion是我熟悉的。可是Spring Cloud Netflix 被宣佈進入了維護模式, 意思再也不添加新特性了,這對於咱們來講很不友好了。 你們紛紛尋找相應的替代工具。(具體能夠網上搜索)java

但這不影響咱們學習一些組件的框架思想。我對註冊發現,負載均衡這塊比較感興趣。因此在此記錄下本身的閱讀心得。node

版本說明:Finchley.SR1git

1.組件的配置:

1.1 啓用Eureka註冊中心

當咱們在springboot的啓動類上加上@EnableEurekaServer,一個基本的註冊中心就能夠生效了。github

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
}
}
複製代碼

@EnableEurekaServer僅僅是引入EurekaServerMarkerConfiguration類。 Marker的英文意思是標記的意思,spring相關框架中有不少相似xxxMarkerxxx這樣的註解.其實他們的意思就是一個開關。會在其餘地方進行開關的判斷,有對應xxxMarkerxxx類就表示打開,沒有表示關閉。web

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(EurekaServerMarkerConfiguration.class)
public @interface EnableEurekaServer {
}
複製代碼

EurekaServerMarkerConfiguration開關打開的是哪一個類呢??spring

org.springframework.cloud.netflix.eureka.server項目spring.factories資源文件中自動注入類EurekaServerAutoConfiguration,此類在自動注入的過程當中,會判斷開關是否打開來決定是否自動注入相關類json

org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
  org.springframework.cloud.netflix.eureka.server.EurekaServerAutoConfiguration
複製代碼
@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
	.....
}
複製代碼

由此看出EurekaServerMarkerConfiguration開關打開的EurekaServerAutoConfiguration緩存

1.2 組件的配置。

下面咱們看看EurekaServerAutoConfiguration配置了什麼東西。 (1.先看註解上相關配置springboot

@Configuration
@Import(EurekaServerInitializerConfiguration.class)
@ConditionalOnBean(EurekaServerMarkerConfiguration.Marker.class)
@EnableConfigurationProperties({ EurekaDashboardProperties.class,
		InstanceRegistryProperties.class })
@PropertySource("classpath:/eureka/server.properties")
public class EurekaServerAutoConfiguration extends WebMvcConfigurerAdapter {
	...
}
複製代碼
  • 引入EurekaServerInitializerConfiguration類,此類繼承了SmartLifecycle接口,因此會在spring啓動完畢時回調此類的start()方法
  • EurekaDashboardProperties 表示Euerka面板相關配置屬性。例如:是否打開面板;面板的訪問路徑
  • InstanceRegistryProperties 表示實例註冊相關配置屬性。例如:每分鐘最大的續約數量,默認打開的通訊數量 等
  • 加載/eureka/server.properties的配置屬性。

(2.再看類內部相關配置(代碼比較長,這裏只講內容,建議打開源碼看) 尋找類中的Bean服務器

  • HasFeatures 註冊HasFeatures表示Eureka特徵,
  • EurekaServerConfigBean配置類,表示EurekaServer的配置信息。經過@ConfigurationProperties(「eureka.server」)映射咱們的配置文件中的eureka.server.xxxx格式的配置信息(此類很重要啊,咱們想修改EurekaServer的配置信息,能夠配置eureka.server.xxxx覆蓋此類中的默認配置)
  • EurekaController: 面板的訪問配置默認是「/」
  • 註冊編碼器(ServerCodecs)CloudServerCodecs
  • PeerAwareInstanceRegistry:對等節點同步器。 多個節點下複製相關。 與註冊中心高可用有關的組件。此處註冊的是 InstanceRegistry(注意PeerAwareInstanceRegistry實現了AbstractInstanceRegistry,這裏準確的說是 對等節點+當前節點同步器
  • PeerEurekaNodes: Eureka-Server 集羣節點的集合。存儲了集羣下各個節點信息。也是與高可用有關。
  • EurekaServerContext : 上下文。默認註冊的DefaultEurekaServerContext
  • EurekaServerBootstrap: EurekaServer啓動器。EurekaServerBootstrap
  • FilterRegistrationBean: 註冊 Jersey filter過濾器。這裏有必要講一下。Eureka也是servlet應用。不過他是經過Jersey 框架來提供接口的。Jersey 框架是一個類Springmvc的web框架。咱們項目中大多都是使用springmvc來處理。因此註冊 Jersey filter過濾器,把/eureka開頭的請求都交給Jersey 框架去解析。容器是com.sun.jersey.spi.container.servlet.ServletContainer
  • ApplicationResource: 暴漏com.netflix.discovery","com.netflix.eureka"包路徑下的接口。一般咱們再springmvc中經過Controller概念來表示接口,Jersey框架下用ApplicationResource的概念來表示接口。暴露的接口其實就是eureka各個應用通訊的接口。(下面再說這些接口)

EurekaServerAutoConfiguration基本上就作了這些工做。咱們來歸類總結下

針對當前Eureka實例的相關組件:

  • EurekaDashboardProperties:面板屬性
  • EurekaController: 面板的訪問的處理器。
  • InstanceRegistryProperties:實例註冊相關屬性
  • (EurekaServerConfig)EurekaServerConfigBean:當前ErekekaServer相關配置
  • EurekaServerContext : 當前Eureka 註冊中心上下文
  • 請求相關組件:註冊/eureka路徑的相關接口,註冊攔截/eureka的攔截器,註冊com.sun.jersey.spi.container.servlet.ServletContainer容器來處理對應的請求

兩個針對集羣下相關組件:

  • PeerAwareInstanceRegistry:用於集羣下的節點相關複製信息用
  • PeerEurekaNodes:集羣下的全部節點信息

兩個針對啓動相關類:

  • EurekaServerInitializerConfiguration: 對接spring,再spring啓動完成後,調用
  • EurekaServerBootstrap:啓動器,用於啓動當前Eureak實例的上下文

至此:咱們也能夠大體瞭解了一個EurekaServer大體長什麼樣子了。

2.EurekaServerContext初始化:

EurekaServerContext做爲上下文,應該是核心所在。上文講過註冊DefaultEurekaServerContext。此類中有@Inject,@PostConstruct, @PreDestroy註解的方法,重點來看看。

@Inject
public DefaultEurekaServerContext(EurekaServerConfig serverConfig, ServerCodecs serverCodecs, PeerAwareInstanceRegistry registry, PeerEurekaNodes peerEurekaNodes, ApplicationInfoManager applicationInfoManager) {
        this.serverConfig = serverConfig;
        this.serverCodecs = serverCodecs;
        this.registry = registry;
        this.peerEurekaNodes = peerEurekaNodes;
        this.applicationInfoManager = applicationInfoManager;
}
複製代碼

2.1 @Inject註解的構造方法

@Inject註解的方法,參數由IOC容器注入。serverConfig ,serverCodecs ,registry ,peerEurekaNodes咱們已經認識了。ApplicationInfoManager 是用來管理應用信息的,也就是實例註冊信息,由ApplicationInfoManager統一管理。

2.2 @PostConstruct註解的initialize()方法

@PostConstruct修飾的方法會在服務器加載Servle的時候運行,而且只會被服務器執行一次,被@PostConstruct修飾的方法會在構造函數以後,init()方法以前運行.

@PostConstruct
@Override
public void initialize() {
        logger.info("Initializing ...");
        peerEurekaNodes.start();
        try {
            registry.init(peerEurekaNodes);
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
        logger.info("Initialized");
}
複製代碼

這個方法很簡明,主要有兩個重要的的點:

  • peerEurekaNodes.start();
  • registry.init(peerEurekaNodes);

2.2.1 peerEurekaNodes.start()

PeerEurekaNodes: 用於管理PeerEurekaNode節點集合。 peerEurekaNodes.start();

public void start() {
		//建立一個單線程定時任務線程池:線程的名稱叫作Eureka-PeerNodesUpdater
        taskExecutor = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactory() {
                    @Override
                    public Thread newThread(Runnable r) {
                        Thread thread = new Thread(r, "Eureka-PeerNodesUpdater");
                        thread.setDaemon(true);
                        return thread;
                    }
                }
        );
        try {
        	// 解析Eureka Server URL,並更新PeerEurekaNodes列表
            updatePeerEurekaNodes(resolvePeerUrls());
            //建立任務
            //任務內容爲:解析Eureka Server URL,並更新PeerEurekaNodes列表
            Runnable peersUpdateTask = new Runnable() {
                @Override
                public void run() {
                    try {
                        updatePeerEurekaNodes(resolvePeerUrls());
                    } catch (Throwable e) {
                        logger.error("Cannot update the replica Nodes", e);
                    }

                }
            };
            //交給線程池執行,執行間隔10min
            taskExecutor.scheduleWithFixedDelay(
                    peersUpdateTask,
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    serverConfig.getPeerEurekaNodesUpdateIntervalMs(),
                    TimeUnit.MILLISECONDS
            );
        } catch (Exception e) {
            throw new IllegalStateException(e);
        }
        for (PeerEurekaNode node : peerEurekaNodes) {
            logger.info("Replica node URL: {}", node.getServiceUrl());
        }
}
複製代碼
resolvePeerUrls():

解析配置的對等體URL。就是在配置文件中配置的多個Eureka註冊中心的URL.

updatePeerEurekaNodes:
protected void updatePeerEurekaNodes(List<String> newPeerUrls) {
        //計算須要移除的url= 原來-新配置。
        Set<String> toShutdown = new HashSet<>(peerEurekaNodeUrls);
        toShutdown.removeAll(newPeerUrls);
        //計算須要增長的url= 新配置-原來的。
        Set<String> toAdd = new HashSet<>(newPeerUrls);
        toAdd.removeAll(peerEurekaNodeUrls);
        //沒有變化就不更新
        if (toShutdown.isEmpty() && toAdd.isEmpty()) { // No change
            return;
        }
       
        List<PeerEurekaNode> newNodeList = new ArrayList<>(peerEurekaNodes);
         // 刪除須要移除url對應的節點。
        if (!toShutdown.isEmpty()) {
            int i = 0;
            while (i < newNodeList.size()) {
                PeerEurekaNode eurekaNode = newNodeList.get(i);
                if (toShutdown.contains(eurekaNode.getServiceUrl())) {
                    newNodeList.remove(i);
                    eurekaNode.shutDown();
                } else {
                    i++;
                }
            }
        }
        // 添加須要增長的url對應的節點
        if (!toAdd.isEmpty()) {
            logger.info("Adding new peer nodes {}", toAdd);
            for (String peerUrl : toAdd) {
                newNodeList.add(createPeerEurekaNode(peerUrl));
            }
        }
        //更新節點列表
        this.peerEurekaNodes = newNodeList;
        //更新節點url列表
        this.peerEurekaNodeUrls = new HashSet<>(newPeerUrls);
    }
複製代碼

總結:start()方法,其實就是完成新配置的eureka集羣信息的初始化更新工做。

2.2.2 registry.init(peerEurekaNodes)

對等節點同步器的初始化。

public void init(PeerEurekaNodes peerEurekaNodes) throws Exception {
        //統計最近X秒內的來自對等節點複製的續約數量(默認1秒)
        this.numberOfReplicationsLastMin.start();
        this.peerEurekaNodes = peerEurekaNodes;
        //初始化返回結果緩存
        initializedResponseCache();
        //更新續約閥值
        scheduleRenewalThresholdUpdateTask();
        //初始化遠程區域註冊 相關信息
        initRemoteRegionRegistry();
        ...
}
複製代碼
numberOfReplicationsLastMin.start():

啓動一個定時任務,任務名稱爲Eureka-MeasureRateTimer,每1秒統計從對等節點複製的續約數,將當前的桶的統計數據放到lastBucket,當前桶置爲0

this.numberOfReplicationsLastMin = new MeasuredRate(1000 * 60 * 1);
--
this.timer = new Timer("Eureka-MeasureRateTimer", true);
---
timer.schedule(new TimerTask() {
                @Override
                public void run() {
                    try {
                        // Zero out the current bucket.
                        lastBucket.set(currentBucket.getAndSet(0));
                    } catch (Throwable e) {
                        logger.error("Cannot reset the Measured Rate", e);
                    }
                }
}, sampleInterval, sampleInterval);
複製代碼

注意:此統計器用於節點之間複製的統計。

initializedResponseCache():

精闢,緩存來實現返回結果的緩存,優秀設計啊。

使用goole cache初始化一個緩存類ResponseCacheImpl,緩存(all applications, delta changes and for individual applications)請求的結果, 此類中有兩個緩存:

  • readWriteCacheMap: 讀寫緩存。初始化容量1000,失效時間3分鐘。
  • readOnlyCacheMap:只讀緩存,shouldUseReadOnlyResponseCache屬性控制是否啓用,默認是啓用的。此緩存會使用,名爲Eureka-CacheFillTimer的timer,每30s更新從 readWriteCacheMap中更新readOnlyCacheMap中的緩存值。

取值邏輯: 先從readOnlyCacheMap取值,沒有去readWriteCacheMap,沒有去經過CacheLoader加載,而CacheLoader會到維護應用實例註冊信息的Map中獲取。

這裏就產生了一個疑問,爲啥有搞個二級緩存來緩存結果呢?不是很理解。

scheduleRenewalThresholdUpdateTask()

使用名爲ReplicaAwareInstanceRegistry - RenewalThresholdUpdater的timer,每15(900s)分鐘執行updateRenewalThreshold()任務,更新續約閥值。

private void updateRenewalThreshold() {
        try {
            Applications apps = eurekaClient.getApplications();
            int count = 0;
            //統計全部註冊instance個數
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    if (this.isRegisterable(instance)) {
                        ++count;
                    }
                }
            }
            synchronized (lock) {
                當總數》預期值時 或者 關閉了自我保護模式,更新
                if ((count * 2) > (serverConfig.getRenewalPercentThreshold() * expectedNumberOfRenewsPerMin)
                        || (!this.isSelfPreservationModeEnabled())) {
                    this.expectedNumberOfRenewsPerMin = count * 2;
                    this.numberOfRenewsPerMinThreshold = (int) ((count * 2) * serverConfig.getRenewalPercentThreshold());
                }
            }
            logger.info("Current renewal threshold is : {}", numberOfRenewsPerMinThreshold);
        } catch (Throwable e) {
            logger.error("Cannot update renewal threshold", e);
        }
}
複製代碼
  • expectedNumberOfRenewsPerMin每分鐘最大的續約數量 (30s/次,2次/s): =客戶端數量count*2
  • numberOfRenewsPerMinThreshold每分鐘續約閾值。serverConfig.getRenewalPercentThreshold()*expectedNumberOfRenewsPerMin serverConfig.getRenewalPercentThreshold()默認是0.85

當每分鐘續約數小於numberOfRenewsPerMinThreshold閾值時,而且自我保護沒有關閉的狀況下,開啓自我保護,此期間不剔除任何一個客戶端。(下面的EvictionTask()驅逐任務會講到如何利用)

  • InstanceRegistry初始化
  • 客戶端cancle主動下線
  • 客戶端註冊
  • scheduleRenewalThresholdUpdateTask

此四個地方都會更新兩個值

initRemoteRegionRegistry()

初始化 遠程區域註冊 相關信息

2.3 @PreDestroy註解的initialize方法

@PreDestroy修飾的方法會在服務器卸載Servlet的時候執行,而且只會被服務器執行一次,被@PreDestroy修飾的方法會Destroy方法以後執行,在Servlet被完全卸載以前.

public void shutdown() {
        registry.shutdown();
        peerEurekaNodes.shutdown();
}
複製代碼

registry.shutdown();

停掉init()時啓動的定時任務

peerEurekaNodes.shutdown()

清空集羣url緩存,集羣節點緩存。


2.4 小結

總結:EurekaServerContext的初始化作了不少事情,很精闢,建議多閱讀,多學習


3.EurekaServer啓動:

EurekaServerInitializerConfiguration實現了SmartLifecycle接口,在spring啓動後,執行start()方法

eurekaServerBootstrap.contextInitialized(EurekaServerInitializerConfiguration.this.servletContext);
log.info("Started Eureka Server");
//發佈註冊中心能夠註冊事件
publish(new EurekaRegistryAvailableEvent(getEurekaServerConfig()));
//狀態爲運行狀態
EurekaServerInitializerConfiguration.this.running = true;
//發佈註冊中心啓動完成事件
publish(new EurekaServerStartedEvent(getEurekaServerConfig()));
複製代碼

這裏重點看先EurekaServerBootstrap.contextInitialized EurekaServerBootstrap的contextInitialized主要乾了兩件事

initEurekaEnvironment();初始化環境
initEurekaServerContext();初始化上下文
複製代碼

3.1 initEurekaEnvironment

主要是數據中心等環境變量的初始化

3.2 initEurekaServerContext

此方法中最重要的是

從相鄰eureka節點拷貝註冊列表信息
int registryCount = this.registry.syncUp();
容許開始與客戶端的數據傳輸,即開始做爲Server服務
this.registry.openForTraffic(this.applicationInfoManager, registryCount);
複製代碼

3.1.1 registry.syncUp()

@Override
    public int syncUp() {
        // Copy entire entry from neighboring DS node
        int count = 0;
		//重試次數
        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
            //從eurekaClient獲取服務列表
            Applications apps = eurekaClient.getApplications();
            //遍歷註冊
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
                        if (isRegisterable(instance)) {
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }
複製代碼

3.1.2 registry.openForTraffic

容許開始與客戶端的數據傳輸,即開始做爲Server服務

InstanceRegistry.openForTraffic public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
		super.openForTraffic(applicationInfoManager,
				count == 0 ? this.defaultOpenForTrafficCount : count);
}
PeerAwareInstanceRegistryImpl.openForTraffic public void openForTraffic(ApplicationInfoManager applicationInfoManager, int count) {
       //每分鐘期待的續約數(默認30s續約,60s就是2次)
        this.expectedNumberOfRenewsPerMin = count * 2;
        // 每分鐘續約的閥值:0.85 * expectedNumberOfRenewsPerMin
        this.numberOfRenewsPerMinThreshold =
                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
      	....
        logger.info("Changing status to UP");
        //applicationInfoManager設置狀態爲UP
        applicationInfoManager.setInstanceStatus(InstanceStatus.UP);
        super.postInit();
}
複製代碼

3.1.3 EvictionTask() 驅逐任務

protected void postInit() {
		//又啓動了一個續約數統計器,此統計器用於配合驅逐任務
        renewsLastMin.start();
        if (evictionTaskRef.get() != null) {
            evictionTaskRef.get().cancel();
        }
        evictionTaskRef.set(new EvictionTask());
        evictionTimer.schedule(evictionTaskRef.get(),
                serverConfig.getEvictionIntervalTimerInMs(),
                serverConfig.getEvictionIntervalTimerInMs());
}
複製代碼

建立一個名爲Eureka-EvictionTimer的定時器來執行EvictionTask()任務。 EvictionTask()任務:

@Override
public void run() {
          	// 獲取延遲秒數,就是延遲幾秒下線時間。
            long compensationTimeMs = getCompensationTimeMs();
            //驅逐操做
            evict(compensationTimeMs);
}
複製代碼

evict()驅逐操做:清理過時租約

public void evict(long additionalLeaseMs) {
        // 判斷是否開啓自我保護,自我保護期間不剔除任何任務
        if (!isLeaseExpirationEnabled()) {
            logger.debug("DS: lease expiration is currently disabled.");
            return;
        }
        List<Lease<InstanceInfo>> expiredLeases = new ArrayList<>();
        //循環得到 全部過時的租約
        for (Entry<String, Map<String, Lease<InstanceInfo>>> groupEntry : registry.entrySet()) {
            Map<String, Lease<InstanceInfo>> leaseMap = groupEntry.getValue();
            if (leaseMap != null) {
                for (Entry<String, Lease<InstanceInfo>> leaseEntry : leaseMap.entrySet()) {
                    Lease<InstanceInfo> lease = leaseEntry.getValue();
                     // 判斷是否過時
                    if (lease.isExpired(additionalLeaseMs) && lease.getHolder() != null) {
                        expiredLeases.add(lease);
                    }
                }
            }
        }
		// 計算 最大容許清理租約數量
        int registrySize = (int) getLocalRegistrySize();
        int registrySizeThreshold = (int) (registrySize * serverConfig.getRenewalPercentThreshold());
        int evictionLimit = registrySize - registrySizeThreshold;
		// 計算 清理租約數量
        int toEvict = Math.min(expiredLeases.size(), evictionLimit);
        if (toEvict > 0) {
            Random random = new Random(System.currentTimeMillis());
            // 遍歷清理。
            for (int i = 0; i < toEvict; i++) { 
                int next = i + random.nextInt(expiredLeases.size() - i);
                Collections.swap(expiredLeases, i, next);
                Lease<InstanceInfo> lease = expiredLeases.get(i);

                String appName = lease.getHolder().getAppName();
                String id = lease.getHolder().getId();
                EXPIRED.increment();
                internalCancel(appName, id, false);
            }
        }
    }
複製代碼

isLeaseExpirationEnabled():判斷是否開啓自我保護的兩個條件

  1. 自我保護配置處於開啓狀態
  2. 當前單位續約數(renewsLastMin統計器統計的數據)<閾值

Lease.isExpire():是否過時的判斷:

public boolean isExpired(long additionalLeaseMs) {
        return (
        //或者明確實例下線時間。
        evictionTimestamp > 0 
        //或者距離最後更新時間已通過去至少3分鐘
        || System.currentTimeMillis() > (lastUpdateTimestamp + duration + additionalLeaseMs));
    }
複製代碼
  • evictionTimestamp : 實例下線時間,當客戶端下線時,會更新這個時間
  • duration : 過時間隔,默認爲90秒
  • lastUpdateTimestamp : 爲最後更新時間
//續約時更新lastUpdateTimestamp,加上了過時間隔?
public void renew() {
        lastUpdateTimestamp = System.currentTimeMillis() + duration;
}
複製代碼

過時時間判斷: System.currentTimeMillis()> lastUpdateTimestamp + duration + additionalLeaseMs 這裏加了兩次duration, 也就是180秒,加上延遲下線時間。也就是最少須要3分鐘才判斷下線。


3.3 小結

至此Eureka server的初始化就完成了。 這裏經過debug模式來看看初始化過程當中的定時任務。

在這裏插入圖片描述


4.API接口

Eureka Server 啓動後,就是對外提供服務了。等待客戶端來註冊。

Eureka是一個基於REST(Representational State Transfer)服務,咱們從官方文檔中能夠看到其對外提供的接口: 官方文檔

在這裏插入圖片描述
能夠推測,客戶端註冊時也是調用了這些接口來進行與服務端的通訊的。

上文說過,Eureka 使用jersey框架來作MVC框架,暴露接口。ApplicationResource相似springmvc中的Controller。

com.netflix.eureka.resources包下咱們能夠看到這些ApplicationResource

在這裏插入圖片描述

4.1註冊接口

ApplicationResource.addInstance對應的就是服務註冊接口

@POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info, @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
		....
		//使用PeerAwareInstanceRegistryImpl#register() 註冊實例信息。
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }
複製代碼
InstanceRegistry
@Override
	public void register(final InstanceInfo info, final boolean isReplication) {
		//發佈註冊事件,
		handleRegistration(info, resolveInstanceLeaseDuration(info), isReplication);
		super.register(info, isReplication);
}
PeerAwareInstanceRegistryImpl
@Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        //租期90s
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        //註冊實例
        super.register(info, leaseDuration, isReplication);
        //複製到其餘節點。
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }
複製代碼

4.1.1註冊到當前Eureka

AbstractInstanceRegistry public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
	read.lock()讀鎖
	1.從緩存中獲取實例名稱對應的租約信息
	Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
    2.統計數+1
    REGISTER.increment(isReplication); 
    //gmap爲null.則建立一個Map。
	
	3.租約的處理分兩種狀況:
	租約已經存在:
			比較新租約與舊租約的LastDirtyTimestamp,使用LastDirtyTimestamp最晚的租約
	租約不存在,即新註冊:        
			synchronized (lock) {
				更新期待每分鐘續約數
				更新續約閾值
			}
	將租約放入appname對應的map中。
	
	4.在最近註冊隊(recentRegisteredQueue)裏添加一個當前註冊信息
	5.狀態的處理:
		將當前實例的OverriddenStatus狀態,放到Eureka Server的overriddenInstanceStatusMap;
		根據OverriddenStatus狀態,設置狀態
	7.實例actionType=ADDED
	registrant.setActionType(ActionType.ADDED);
    8. 維護recentlyChangedQueue,保存最近操做
    recentlyChangedQueue.add(new RecentlyChangedItem(lease));
    9.更新最後更新時間
    registrant.setLastUpdatedTimestamp();
    10.使當前實例的結果緩存ResponseCache失效()
    invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
	
}
複製代碼

4.1.2複製到其餘節點

此處能夠看源碼閱讀,在此不講了

4.2查詢接口

咱們獲取的實例信息,其實都是從緩存中獲取的String payLoad = responseCache.get(cacheKey);

@GET
    public Response getApplication(@PathParam("version") String version, @HeaderParam("Accept") final String acceptHeader, @HeaderParam(EurekaAccept.HTTP_X_EUREKA_ACCEPT) String eurekaAccept) {
        if (!registry.shouldAllowAccess(false)) {
            return Response.status(Status.FORBIDDEN).build();
        }

        EurekaMonitors.GET_APPLICATION.increment();

        CurrentRequestVersion.set(Version.toEnum(version));
        KeyType keyType = Key.KeyType.JSON;
        if (acceptHeader == null || !acceptHeader.contains("json")) {
            keyType = Key.KeyType.XML;
        }

        Key cacheKey = new Key(
                Key.EntityType.Application,
                appName,
                keyType,
                CurrentRequestVersion.get(),
                EurekaAccept.fromString(eurekaAccept)
        );

        String payLoad = responseCache.get(cacheKey);

        if (payLoad != null) {
            logger.debug("Found: {}", appName);
            return Response.ok(payLoad).build();
        } else {
            logger.debug("Not Found: {}", appName);
            return Response.status(Status.NOT_FOUND).build();
        }
    }
複製代碼

總結

因爲篇幅限制:

  • Renew: 服務續約
  • Cancel: 服務下線 不說了。

至此:Eureka服務端內容大致講完,只講了些大概,具體建議跟源碼。

若有錯誤,敬請指出

相關文章
相關標籤/搜索