Eureka服務註冊過程

上篇博客《SpringCloud——Eureka服務註冊和發現》介紹了Eureka的基本功能,這篇咱們來聊聊eureka是如何實現的。java

上圖是eureka的架構圖,Eureka分爲Server和client,圖中,藍色爲Server端,綠色爲client。node

基本流程:json

一、最左邊的client(即服務提供者)發起us-east-1c註冊請求;緩存

二、以後,Eureka Server集羣中的其餘兩個node(us-east-1d和us-east-1e進行Replicate複製);架構

三、圖下放的兩個client(即服務消費者)分別向三個server獲取註冊信息及Get Registry。app


註冊過程:ide


Eureka Client:ui

一、DiscoveryClient中的initScheduledTasks()以定時任務的方式執行。this

private void initScheduledTasks() { 
	instanceInfoReplicator = new InstanceInfoReplicator(
                    this,
                    instanceInfo,
                    clientConfig.getInstanceInfoReplicationIntervalSeconds(),
                    2); // burstSize            
	instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
}

2、在InstanceInfoReplicatorrun方法中,調用discoveryClient的註冊方法。spa

	public void run() {
	        try {
	            discoveryClient.refreshInstanceInfo();
	
	            Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
	            if (dirtyTimestamp != null) {
	                discoveryClient.register();
	                instanceInfo.unsetIsDirty(dirtyTimestamp);
	            }
	        } catch (Throwable t) {
	            logger.warn("There was a problem with the instance info replicator", t);
	        } finally {
	            Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
	            scheduledPeriodicRef.set(next);
	        }
	    }

三、DiscoveryClient,使用rest調用。

boolean register() throws Throwable {	        logger.info(PREFIX + appPathIdentifier + ": registering service...");
	        EurekaHttpResponse<Void> httpResponse;
	        try {
	            httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
	        } catch (Exception e) {
	            logger.warn("{} - registration failed {}", PREFIX + appPathIdentifier, e.getMessage(), e);
	            throw e;
	        }
	        if (logger.isInfoEnabled()) {
	            logger.info("{} - registration status: {}", PREFIX + appPathIdentifier, httpResponse.getStatusCode());
	        }
	        return httpResponse.getStatusCode() == 204;
	    }


Eureka Server:


一、rest接口,即服務端rest方式調用的接口。

 @POST
    @Consumes({"application/json", "application/xml"})
    public Response addInstance(InstanceInfo info,
                                @HeaderParam(PeerEurekaNode.HEADER_REPLICATION) String isReplication) {
        logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
        // validate that the instanceinfo contains all the necessary required fields
        if (isBlank(info.getId())) {
            return Response.status(400).entity("Missing instanceId").build();
        } else if (isBlank(info.getHostName())) {
            return Response.status(400).entity("Missing hostname").build();
        } else if (isBlank(info.getAppName())) {
            return Response.status(400).entity("Missing appName").build();
        } else if (!appName.equals(info.getAppName())) {
            return Response.status(400).entity("Mismatched appName, expecting " + appName + " but was " + info.getAppName()).build();
        } else if (info.getDataCenterInfo() == null) {
            return Response.status(400).entity("Missing dataCenterInfo").build();
        } else if (info.getDataCenterInfo().getName() == null) {
            return Response.status(400).entity("Missing dataCenterInfo Name").build();
        }

        // handle cases where clients may be registering with bad DataCenterInfo with missing data
        DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
        if (dataCenterInfo instanceof UniqueIdentifier) {
            String dataCenterInfoId = ((UniqueIdentifier) dataCenterInfo).getId();
            if (isBlank(dataCenterInfoId)) {
                boolean experimental = "true".equalsIgnoreCase(serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
                if (experimental) {
                    String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
                    return Response.status(400).entity(entity).build();
                } else if (dataCenterInfo instanceof AmazonInfo) {
                    AmazonInfo amazonInfo = (AmazonInfo) dataCenterInfo;
                    String effectiveId = amazonInfo.get(AmazonInfo.MetaDataKey.instanceId);
                    if (effectiveId == null) {
                        amazonInfo.getMetadata().put(AmazonInfo.MetaDataKey.instanceId.getName(), info.getId());
                    }
                } else {
                    logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
                }
            }
        }

        registry.register(info, "true".equals(isReplication));
        return Response.status(204).build();  // 204 to be backwards compatible
    }

二、在PeerAwareInstanceRegistryImpl中的register方法

    @Override
    public void register(final InstanceInfo info, final boolean isReplication) {
        int leaseDuration = Lease.DEFAULT_DURATION_IN_SECS;
        if (info.getLeaseInfo() != null && info.getLeaseInfo().getDurationInSecs() > 0) {
            leaseDuration = info.getLeaseInfo().getDurationInSecs();
        }
        super.register(info, leaseDuration, isReplication);
        replicateToPeers(Action.Register, info.getAppName(), info.getId(), info, null, isReplication);
    }

註冊方法中,租約不存在。具體以下

if (existingLease != null && (existingLease.getHolder() != null)) { 
	//續租
	.......
}else{  
       // The lease does not exist and hence it is a new registration          
	synchronized (lock) {                    
		if (this.expectedNumberOfRenewsPerMin > 0) {
                        // Since the client wants to cancel it, reduce the threshold
                        // (1
                        // for 30 seconds, 2 for a minute)
                        this.expectedNumberOfRenewsPerMin = this.expectedNumberOfRenewsPerMin + 2;
                        this.numberOfRenewsPerMinThreshold =
                                (int) (this.expectedNumberOfRenewsPerMin * serverConfig.getRenewalPercentThreshold());
                    }
                }
                logger.debug("No previous lease information found; it is new registration");
}


構造註冊信息,處理緩存信息。

 registrant.setActionType(ActionType.ADDED);
            recentlyChangedQueue.add(new RecentlyChangedItem(lease));
            registrant.setLastUpdatedTimestamp();
            invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress());
            logger.info("Registered instance {}/{} with status {} (replication={})",
                    registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication);
相關文章
相關標籤/搜索