Eureka應用註冊與集羣數據同步源碼解析

在以前的EurekaClient自動裝配及啓動流程解析一文中咱們提到過,在構造DiscoveryClient類時,會把自身註冊到服務端,本文就來分析一下這個註冊流程node

客戶端發起註冊
boolean register() throws Throwable {
        logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
        EurekaHttpResponse<Void> httpResponse;
        try {
            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
        } catch (Exception e) {
            logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
            throw e;
        }
        if (logger.isInfoEnabled()) {
            logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
        }
        return httpResponse.getStatusCode() == 204;
    }

這個方法中包含的registrationClientinstanceInfo兩個對象在以前的文章中都已經單獨拿出來了:Eureka中重要的對象json

服務端接受註冊

服務端接受註冊的Controller在ApplicationResource類中,這裏須要注意的是普通客戶端註冊時其中參數isReplication爲false,這個參數就是控制集羣是否同步的表示緩存

@POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // 一系列的參數校驗
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getIPAddr())) {
            return Response.status(400).entity("Missing ip address").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // AWS的一些東西,不用細看
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }
	//註冊
        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

接着往下看註冊的處理邏輯app

public void register(final InstanceInfo info, final boolean isReplication) {
   // 租約過時時間
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
   // 註冊
        super.register(info, leaseDuration, isReplication);
   //集羣複製
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

Lease租約這個類以前也分析過了,再也不展開了ide

服務端保存註冊信息
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
        try {
		  //獲取鎖
            read.lock();
		  //獲取該實例的註冊信息
            Map<String, Lease<InstanceInfo>> gMap = registry.get(registrant.getAppName());
            REGISTER.increment(isReplication);
            if (gMap == null) {
                final ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap<String, Lease<InstanceInfo>>();
              //若是不存在則添加  
			  gMap = registry.putIfAbsent(registrant.getAppName(), gNewMap);
                if (gMap == null) {
                    gMap = gNewMap;
                }
            }
            Lease<InstanceInfo> existingLease = gMap.get(registrant.getId());
            // 當存在這個應用的註冊信息時
            if (existingLease != null && (existingLease.getHolder() != null)) {
                Long existingLastDirtyTimestamp = existingLease.getHolder().getLastDirtyTimestamp();
                Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp();
                logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);

                //使用註冊時間長的一方的應用信息
                if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) {
                    logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater" +
                            " than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp);
                    logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant");
                    registrant = existingLease.getHolder();
                }
            } else {

                synchronized (lock) {
                    if (this.expectedNumberOfRenewsPerMin > 0) {
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
            }
		  //建立租約
            Lease<InstanceInfo> lease = new Lease<InstanceInfo>(registrant, leaseDuration);
		  //若是存在租約則更新開始時間
            if (existingLease != null) {
                lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp());
            }
            gMap.put(registrant.getId(), lease);
            synchronized (recentRegisteredQueue) {
			  //添加到最近註冊隊列
                recentRegisteredQueue.add(new Pair<Long, String>(
                        System.currentTimeMillis(),
                        registrant.getAppName() + "(" + registrant.getId() + ")"));
            }
            if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) {
                logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the "
                                + "overrides", registrant.getOverriddenStatus(), registrant.getId());
                if (!overriddenInstanceStatusMap.containsKey(registrant.getId())) {
                    logger.info("Not found overridden id {} and hence adding it", registrant.getId());
                    overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus());
                }
            }
            InstanceStatus overriddenStatusFromMap = overriddenInstanceStatusMap.get(registrant.getId());
            if (overriddenStatusFromMap != null) {
                logger.info("Storing overridden status {} from map", overriddenStatusFromMap);
                registrant.setOverriddenStatus(overriddenStatusFromMap);
            }

           // 得到應用實例最終狀態
            InstanceStatus overriddenInstanceStatus = getOverriddenInstanceStatus(registrant, existingLease, isReplication);
            registrant.setStatusWithoutDirty(overriddenInstanceStatus);

            if (InstanceStatus.UP.equals(registrant.getStatus())) {
                lease.serviceUp();
            }
            registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
        } finally {
            read.unlock();
        }
    }
  1. 實例信息是由一個map對象保存的,在這個map中,key是應用的appName,value是另一個map,在這個map中key是應用的id,而value則是應用的租約信息,map對象以下:
private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> 
registry        = new ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>>();
  1. 大致流程爲,先查看是否存在該appName的註冊信息,如不存在則建立。
  2. 接着查看是否存在該id的註冊信息,若是存在判斷客戶端的最後修改時間,記錄最後修改的實例,若是不存在則設置自我保護模式的幾個參數
  3. 根據實例建立對應的租約信息,而後添加到map對象中
  4. 添加到最近註冊隊列
  5. 添加到應用實例覆蓋狀態映射
  6. 設置應用實例最終狀態,添加最近租約變動隊列,設置緩存等
集羣數據同步

isReplication屬性爲true的時候,就會牽扯到集羣信息同步了ui

private void replicateToPeers(Action action, String appName, String id,
                                  InstanceInfo info ,
                                  InstanceStatus newStatus , boolean isReplication) {
        Stopwatch tracer = action.getTimer().start();
        try {
            if (isReplication) {
                numberOfReplicationsLastMin.increment();
            }
            if (peerEurekaNodes == Collections.EMPTY_LIST || isReplication) {
                return;
            }

            for (final PeerEurekaNode node : peerEurekaNodes.getPeerEurekaNodes()) {
                if (peerEurekaNodes.isThisMyUrl(node.getServiceUrl())) {
                    continue;
                }
                replicateInstanceActionsToPeers(action, appName, id, info, newStatus, node);
            }
        } finally {
            tracer.stop();
        }
    }

關於PeerEurekaNode在以前的文章也提到了,保存了集羣節點信息,這裏能夠看到是循環全部的集羣節點,而後排除自己的信息以後調用了replicateInstanceActionsToPeers方法this

private void replicateInstanceActionsToPeers(Action action, String appName,
                                                 String id, InstanceInfo info, InstanceStatus newStatus,
                                                 PeerEurekaNode node) {
        try {
            InstanceInfo infoFromRegistry = null;
            CurrentRequestVersion.set(Version.V2);
            switch (action) {
                case Cancel:
                    node.cancel(appName, id);
                    break;
                case Heartbeat:
                    InstanceStatus overriddenStatus = overriddenInstanceStatusMap.get(id);
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.heartbeat(appName, id, infoFromRegistry, overriddenStatus, false);
                    break;
                case Register:
                    node.register(info);
                    break;
                case StatusUpdate:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.statusUpdate(appName, id, newStatus, infoFromRegistry);
                    break;
                case DeleteStatusOverride:
                    infoFromRegistry = getInstanceByAppAndId(appName, id, false);
                    node.deleteStatusOverride(appName, id, infoFromRegistry);
                    break;
            }
        } catch (Throwable t) {
            logger.error("Cannot replicate information to {} for action {}", node.getServiceUrl(), action.name(), t);
        }
    }

這裏重點關注Register分支spa

public void register(final InstanceInfo info) throws Exception {
        long expiryTime = System.currentTimeMillis() + getLeaseRenewalOf(info);
        batchingDispatcher.process(
                taskId("register", info),
                new InstanceReplicationTask(targetHost, Action.Register, info, null, true) {
                    public EurekaHttpResponse<Void> execute() {
                        return replicationClient.register(info);
                    }
                },
                expiryTime
        );
    }

在不關心Eureka自有的調度處理相關的前提下,核心代碼是replicationClient.register(info),也就是說,若是當客戶端註冊時指定須要同步集羣信息時,Eureka會把這個客戶端再註冊到它所在的集羣其它的節點上pwa

服務端發起集羣數據同步

在以前得EurekaServer自動裝配及啓動流程解析一文中,咱們提到過在初始化服務端的時候會從EurekaServer集羣中同步數據,也就是下面這段代碼:debug

public class EurekaServerBootstrap {   
    protected void initEurekaServerContext() throws Exception {
  //xxxx
    
                EurekaServerContextHolder.initialize(this.serverContext);
    
                log.info("Initialized server context");
    
                // 從其餘 Eureka-Server 拉取註冊信息
                int registryCount = this.registry.syncUp();
                this.registry.openForTraffic(this.applicationInfoManager, registryCount);
    
                // Register all monitoring statistics.
                EurekaMonitors.registerAllStats();
        }

數據同步的代碼在syncUp

public int syncUp() {
        int count = 0;

        for (int i = 0; ((i < serverConfig.getRegistrySyncRetries()) && (count == 0)); i++) {
            if (i > 0) {
			  //未讀取到註冊信息則開始等待
                try {
                    Thread.sleep(serverConfig.getRegistrySyncRetryWaitMs());
                } catch (InterruptedException e) {
                    logger.warn("Interrupted during registry transfer..");
                    break;
                }
            }
		   // 獲取註冊信息
            Applications apps = eurekaClient.getApplications();
            for (Application app : apps.getRegisteredApplications()) {
                for (InstanceInfo instance : app.getInstances()) {
                    try {
					  //判斷是否能夠註冊,這裏基於AWS環境的判斷,若是不是AWS環境直接返回true,故不須要關心這個問題
                        if (isRegisterable(instance)) {
						  //註冊
                            register(instance, instance.getLeaseInfo().getDurationInSecs(), true);
                            count++;
                        }
                    } catch (Throwable t) {
                        logger.error("During DS init copy", t);
                    }
                }
            }
        }
        return count;
    }

這裏的處理流程就是當前EurekaServer獲取到客戶端的註冊信息以後,就會再次調用上邊我們提到的服務端Controller調用的register方法,固然此次調用時isReplication參數就變爲true了

1

相關文章
相關標籤/搜索