對於服務註冊中心、服務提供者、服務消費者這個三個主要元素來講,服務提供者和服務消費者(即Eureka客戶端)在整個運行機制中是大部分通訊行爲的主動發起者(服務註冊、續約、下線等),而註冊中心主要是處理請求的接收者。因此,咱們從Eureka的客戶端爲入口分析它是如何完成這些主動通訊的。
通常狀況下,咱們將一個SpringBoot應用註冊到 Eureka Server 或者從 Eureka Server 獲取服務器列表時,就作了兩件事:html
咱們先看看 @EnableDiscoveryClient 這個註解的源碼,以下:java
/** * Annotation to enable a DiscoveryClient implementation. * @author Spencer Gibb */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@Import(EnableDiscoveryClientImportSelector.class)
public @interface EnableDiscoveryClient {
/** * If true, the ServiceRegistry will automatically register the local server. */
boolean autoRegister() default true;
}
經過註釋能夠知道,該註解能夠開啓 DiscoveryClient 實例,而後咱們搜索 DiscoveryClient 會發現一個類和一個接口,它們的關係如圖。算法
/** * The class that is instrumental for interactions with <tt>Eureka Server</tt>. * * <p> * <tt>Eureka Client</tt> is responsible for a) <em>Registering</em> the * instance with <tt>Eureka Server</tt> b) <em>Renewal</em>of the lease with * <tt>Eureka Server</tt> c) <em>Cancellation</em> of the lease from * <tt>Eureka Server</tt> during shutdown * <p> * d) <em>Querying</em> the list of services/instances registered with * <tt>Eureka Server</tt> * <p> * * <p> * <tt>Eureka Client</tt> needs a configured list of <tt>Eureka Server</tt> * {@link java.net.URL}s to talk to.These {@link java.net.URL}s are typically amazon elastic eips * which do not change. All of the functions defined above fail-over to other * {@link java.net.URL}s specified in the list in the case of failure. * </p> * * @author Karthik Ranganathan, Greg Kim * @author Spencer Gibb * */
@Singleton
public class DiscoveryClient implements EurekaClient {
...
}
這個類用於幫助與 Eureka Server 相互協做
Eureka Client客戶端負責如下內容:spring
根據咱們配置的屬性名eureka.client.serviceUrl.defaultZone,經過serviceUrl能夠找到該屬性相關的加載屬性,就是DiscoveryClient裏有個getEurekaServiceUrlsFromConfig()方法可是棄用了,改用EndpointUtils這個工具類,代碼以下:編程
getRegion()方法能夠看出一個微服務應用只能夠屬於一個Region,若是沒配置則爲default,能夠經過eureka.client.region屬性來定義。json
public static String getRegion(EurekaClientConfig clientConfig) {
String region = clientConfig.getRegion();
if (region == null) {
region = DEFAULT_REGION;
}
region = region.trim().toLowerCase();
return region;
}
getAvailabilityZones()方法能夠看出Region與Zone的關係,一個Region能夠有多個Zone,設置時能夠用逗號來分隔。默認採用defaultZone。服務器
public String[] getAvailabilityZones(String region) {
String value = (String)this.availabilityZones.get(region);
if(value == null) {
value = "defaultZone";
}
return value.split(",");
}
在獲取Region和Zone的信息後,根據傳入的參數按必定的算法肯定加載位於哪個Zone的serviceUrls。app
前面說了多個服務註冊中心信息的加載,這裏再看看 DiscoveryClient 類是如何實現服務註冊的。經過查看該類的構造函數,發現它調用瞭如下方法。ide
/** * Initializes all scheduled tasks. */
private void initScheduledTasks() {
...
if (clientConfig.shouldRegisterWithEureka()) {
...
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
// InstanceInfo replicator
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
...
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
這裏先根據配置判斷是否是要註冊到 Eureka,而後建立心跳檢測任務,獲取 instanceInfoReplicator。InstanceInfoReplicator類實現 Runnable接口,instanceInfoReplicator實例會執行一個定時任務,這個定時任務的內容能夠查看該類的run()方法。函數
上面說到的 initScheduledTasks() 方法還有兩個定時任務,分別是服務獲取和服務續約。
private void initScheduledTasks() {
if (clientConfig.shouldFetchRegistry()) {
// registry cache refresh timer
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
if (clientConfig.shouldRegisterWithEureka()) {
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
logger.info("Starting heartbeat executor: " + "renew interval is: {}", renewalIntervalInSecs);
// Heartbeat timer
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
...
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
clientConfig.shouldFetchRegistry()這裏實際上是經過eureka.client.fetch-registry參數來判斷的,默認爲true,它能夠按期更新客戶端的服務清單,從而客戶端能訪問到健康的服務實例。
服務續約也是發送REST請求實現的。
boolean renew() {
EurekaHttpResponse<InstanceInfo> httpResponse;
try {
httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null);
logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode());
if (httpResponse.getStatusCode() == 404) {
REREGISTER_COUNTER.increment();
logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName());
long timestamp = instanceInfo.setIsDirtyWithTime();
boolean success = register();
if (success) {
instanceInfo.unsetIsDirty(timestamp);
}
return success;
}
return httpResponse.getStatusCode() == 200;
} catch (Throwable e) {
logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e);
return false;
}
}
服務獲取的過程省略。
服務端根據實例Id和appName執行remove操做。
void unregister() {
// It can be null if shouldRegisterWithEureka == false
if(eurekaTransport != null && eurekaTransport.registrationClient != null) {
try {
logger.info("Unregistering ...");
EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId());
logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode());
} catch (Exception e) {
logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e);
}
}
}
前面的分析都是從客戶端出發的,如今看看 Eureka Server是如何處理各類Rest請求的。這種請求的定義都在com.netflix.eureka.resources包下。
以服務註冊爲例:
調用 ApplicationResource 類下的 addInstance()方法。
@POST
@Consumes({"application/json", "application/xml"})
public Response addInstance(InstanceInfo info, @HeaderParam("x-netflix-discovery-replication") String isReplication) {
logger.debug("Registering instance {} (replication={})", info.getId(), isReplication);
if(this.isBlank(info.getId())) {
return Response.status(400).entity("Missing instanceId").build();
} else if(this.isBlank(info.getHostName())) {
return Response.status(400).entity("Missing hostname").build();
} else if(this.isBlank(info.getIPAddr())) {
return Response.status(400).entity("Missing ip address").build();
} else if(this.isBlank(info.getAppName())) {
return Response.status(400).entity("Missing appName").build();
} else if(!this.appName.equals(info.getAppName())) {
return Response.status(400).entity("Mismatched appName, expecting " + this.appName + " but was " + info.getAppName()).build();
} else if(info.getDataCenterInfo() == null) {
return Response.status(400).entity("Missing dataCenterInfo").build();
} else if(info.getDataCenterInfo().getName() == null) {
return Response.status(400).entity("Missing dataCenterInfo Name").build();
} else {
DataCenterInfo dataCenterInfo = info.getDataCenterInfo();
if(dataCenterInfo instanceof UniqueIdentifier) {
String dataCenterInfoId = ((UniqueIdentifier)dataCenterInfo).getId();
if(this.isBlank(dataCenterInfoId)) {
boolean experimental = "true".equalsIgnoreCase(this.serverConfig.getExperimental("registration.validation.dataCenterInfoId"));
if(experimental) {
String entity = "DataCenterInfo of type " + dataCenterInfo.getClass() + " must contain a valid id";
return Response.status(400).entity(entity).build();
}
if(dataCenterInfo instanceof AmazonInfo) {
AmazonInfo amazonInfo = (AmazonInfo)dataCenterInfo;
String effectiveId = amazonInfo.get(MetaDataKey.instanceId);
if(effectiveId == null) {
amazonInfo.getMetadata().put(MetaDataKey.instanceId.getName(), info.getId());
}
} else {
logger.warn("Registering DataCenterInfo of type {} without an appropriate id", dataCenterInfo.getClass());
}
}
}
this.registry.register(info, "true".equals(isReplication));
return Response.status(204).build();
}
}
在對註冊信息進行校驗後,會調用org.springframework.cloud.netflix.eureka.server.InstanceRegistry的register(InstanceInfo info, int leaseDuration, boolean isReplication)方法。
public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) {
try {
this.read.lock();
Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName());
...
} finally {
this.read.unlock();
}
}