文章首發於微信公衆號《程序員果果》
地址:https://mp.weixin.qq.com/s/47TUd96NMz67_PCDyvyInQjava
Eureka是一種基於REST(Representational State Transfer)的服務,主要用於AWS雲,用於定位服務,以實現中間層服務器的負載平衡和故障轉移。咱們將此服務稱爲Eureka Server。Eureka還附帶了一個基於Java的客戶端組件Eureka Client,它使與服務的交互變得更加容易。客戶端還有一個內置的負載均衡器,能夠進行基本的循環負載均衡。在Netflix,一個更復雜的負載均衡器包含Eureka基於流量,資源使用,錯誤條件等多種因素提供加權負載平衡,以提供卓越的彈性。
先看一張 github 上 Netflix Eureka 的一架構圖,以下:git
從圖能夠看出在這個體系中,有2個角色,即Eureka Server和Eureka Client。而Eureka Client又分爲Applicaton Service和Application Client,即服務提供者何服務消費者。 每一個區域有一個Eureka集羣,而且每一個區域至少有一個eureka服務器能夠處理區域故障,以防服務器癱瘓。程序員
Eureka Client 在 Eureka Server 註冊,而後Eureka Client 每30秒向 Eureka Server 發送一次心跳來更新一次租約。若是 Eureka Client 沒法續訂租約幾回,則會在大約90秒內 Eureka Server 將其從服務器註冊表中刪除。註冊信息和續訂將複製到羣集中的全部 Eureka Server 節點。來自任何區域的客戶端均可以查找註冊表信息(每30秒發生一次)根據這些註冊表信息,Application Client 能夠遠程調用 Applicaton Service 來消費服務。github
基於Spring Cloud的 eureka 的 client 端在啓動類上加上 @EnableDiscoveryClient 註解,就能夠 用 NetFlix 提供的 Eureka client。下面就以 @EnableDiscoveryClient 爲入口,進行Eureka Client的源碼分析。spring
@EnableDiscoveryClient,經過源碼能夠發現這是一個標記註解:服務器
/** * Annotation to enable a DiscoveryClient implementation. * @author Spencer Gibb */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Inherited @Import(EnableDiscoveryClientImportSelector.class) public @interface EnableDiscoveryClient { boolean autoRegister() default true; }
經過註釋能夠知道 @EnableDiscoveryClient 註解是用來 啓用 DiscoveryClient 的實現,DiscoveryClient接口代碼以下:微信
public interface DiscoveryClient { String description(); List<ServiceInstance> getInstances(String serviceId); List<String> getServices(); }
接口說明:架構
DiscoveryClient 接口的實現結構圖:app
EurekaDiscoveryClient 是 Eureka 對 DiscoveryClient接口的實現,代碼以下:負載均衡
public class EurekaDiscoveryClient implements DiscoveryClient { public static final String DESCRIPTION = "Spring Cloud Eureka Discovery Client"; private final EurekaInstanceConfig config; private final EurekaClient eurekaClient; public EurekaDiscoveryClient(EurekaInstanceConfig config, EurekaClient eurekaClient) { this.config = config; this.eurekaClient = eurekaClient; } @Override public String description() { return DESCRIPTION; } @Override public List<ServiceInstance> getInstances(String serviceId) { List<InstanceInfo> infos = this.eurekaClient.getInstancesByVipAddress(serviceId, false); List<ServiceInstance> instances = new ArrayList<>(); for (InstanceInfo info : infos) { instances.add(new EurekaServiceInstance(info)); } return instances; } @Override public List<String> getServices() { Applications applications = this.eurekaClient.getApplications(); if (applications == null) { return Collections.emptyList(); } List<Application> registered = applications.getRegisteredApplications(); List<String> names = new ArrayList<>(); for (Application app : registered) { if (app.getInstances().isEmpty()) { continue; } names.add(app.getName().toLowerCase()); } return names; } }
從代碼能夠看出 EurekaDiscoveryClient 實現了 DiscoveryClient 定義的規範接口,真正實現發現服務的是 EurekaClient,下面是 EurekaClient 依賴結構圖:
EurekaClient 惟一實現類 DiscoveryClient,DiscoveryClient 的構造方法以下:
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { //省略... try { // default size of 2 - 1 each for heartbeat and cacheRefresh scheduler = Executors.newScheduledThreadPool(2, new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-%d") .setDaemon(true) .build()); heartbeatExecutor = new ThreadPoolExecutor( 1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-HeartbeatExecutor-%d") .setDaemon(true) .build() ); // use direct handoff cacheRefreshExecutor = new ThreadPoolExecutor( 1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), new ThreadFactoryBuilder() .setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d") .setDaemon(true) .build() ); // use direct handoff //省略... initScheduledTasks(); try { Monitors.registerObject(this); } catch (Throwable e) { logger.warn("Cannot register timers", e); } //省略... }
能夠看到這個構造方法裏面,主要作了下面幾件事:
上面說了,initScheduledTasks()方法中調用了InstanceInfoReplicator.start()方法,InstanceInfoReplicator 的 run()方法代碼以下:
public void run() { try { discoveryClient.refreshInstanceInfo(); Long dirtyTimestamp = instanceInfo.isDirtyWithTime(); if (dirtyTimestamp != null) { discoveryClient.register(); instanceInfo.unsetIsDirty(dirtyTimestamp); } } catch (Throwable t) { logger.warn("There was a problem with the instance info replicator", t); } finally { Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS); scheduledPeriodicRef.set(next); } }
發現 InstanceInfoReplicator的run方法,run方法中會調用DiscoveryClient的register方法。DiscoveryClient 的 register方法 代碼以下:
/** * Register with the eureka service by making the appropriate REST call. */ boolean register() throws Throwable { logger.info(PREFIX + "{}: registering service...", appPathIdentifier); EurekaHttpResponse<Void> httpResponse; try { httpResponse = eurekaTransport.registrationClient.register(instanceInfo); } catch (Exception e) { logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e); throw e; } if (logger.isInfoEnabled()) { logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode()); } return httpResponse.getStatusCode() == 204; }
最終又通過一系列調用,最終會調用到AbstractJerseyEurekaHttpClient的register方法,代碼以下:
public EurekaHttpResponse<Void> register(InstanceInfo info) { String urlPath = "apps/" + info.getAppName(); ClientResponse response = null; try { Builder resourceBuilder = jerseyClient.resource(serviceUrl).path(urlPath).getRequestBuilder(); addExtraHeaders(resourceBuilder); response = resourceBuilder .header("Accept-Encoding", "gzip") .type(MediaType.APPLICATION_JSON_TYPE) .accept(MediaType.APPLICATION_JSON) .post(ClientResponse.class, info); return anEurekaHttpResponse(response.getStatus()).headers(headersOf(response)).build(); } finally { if (logger.isDebugEnabled()) { logger.debug("Jersey HTTP POST {}/{} with instance {}; statusCode={}", serviceUrl, urlPath, info.getId(), response == null ? "N/A" : response.getStatus()); } if (response != null) { response.close(); } } }
能夠看到最終經過http rest請求eureka server端,把應用自身的InstanceInfo實例註冊給server端,咱們再來完整梳理一下服務註冊流程:
服務續約和服務註冊很是相似,HeartbeatThread 代碼以下:
private class HeartbeatThread implements Runnable { public void run() { if (renew()) { //更新最後一次心跳的時間 lastSuccessfulHeartbeatTimestamp = System.currentTimeMillis(); } } } // 續約的主方法 boolean renew() { EurekaHttpResponse<InstanceInfo> httpResponse; try { httpResponse = eurekaTransport.registrationClient.sendHeartBeat(instanceInfo.getAppName(), instanceInfo.getId(), instanceInfo, null); logger.debug(PREFIX + "{} - Heartbeat status: {}", appPathIdentifier, httpResponse.getStatusCode()); if (httpResponse.getStatusCode() == 404) { REREGISTER_COUNTER.increment(); logger.info(PREFIX + "{} - Re-registering apps/{}", appPathIdentifier, instanceInfo.getAppName()); long timestamp = instanceInfo.setIsDirtyWithTime(); boolean success = register(); if (success) { instanceInfo.unsetIsDirty(timestamp); } return success; } return httpResponse.getStatusCode() == 200; } catch (Throwable e) { logger.error(PREFIX + "{} - was unable to send heartbeat!", appPathIdentifier, e); return false; } }
發送心跳 ,請求eureka server 端 ,若是接口返回值爲404,就是說服務不存在,那麼從新走註冊流程。
若是接口返回值爲404,就是說不存在,歷來沒有註冊過,那麼從新走註冊流程。
服務續約流程以下圖:
在服務shutdown的時候,須要及時通知服務端把本身剔除,以免客戶端調用已經下線的服務,shutdown()方法代碼以下:
public synchronized void shutdown() { if (isShutdown.compareAndSet(false, true)) { logger.info("Shutting down DiscoveryClient ..."); if (statusChangeListener != null && applicationInfoManager != null) { applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId()); } // 關閉各類定時任務 // 關閉刷新實例信息/註冊的定時任務 // 關閉續約(心跳)的定時任務 // 關閉獲取註冊信息的定時任務 cancelScheduledTasks(); // If APPINFO was registered if (applicationInfoManager != null && clientConfig.shouldRegisterWithEureka() && clientConfig.shouldUnregisterOnShutdown()) { // 更改實例狀態,使實例再也不接收流量 applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN); //向EurekaServer端發送下線請求 unregister(); } if (eurekaTransport != null) { eurekaTransport.shutdown(); } heartbeatStalenessMonitor.shutdown(); registryStalenessMonitor.shutdown(); logger.info("Completed shut down of DiscoveryClient"); } } private void cancelScheduledTasks() { if (instanceInfoReplicator != null) { instanceInfoReplicator.stop(); } if (heartbeatExecutor != null) { heartbeatExecutor.shutdownNow(); } if (cacheRefreshExecutor != null) { cacheRefreshExecutor.shutdownNow(); } if (scheduler != null) { scheduler.shutdownNow(); } } void unregister() { // It can be null if shouldRegisterWithEureka == false if(eurekaTransport != null && eurekaTransport.registrationClient != null) { try { logger.info("Unregistering ..."); EurekaHttpResponse<Void> httpResponse = eurekaTransport.registrationClient.cancel(instanceInfo.getAppName(), instanceInfo.getId()); logger.info(PREFIX + "{} - deregister status: {}", appPathIdentifier, httpResponse.getStatusCode()); } catch (Exception e) { logger.error(PREFIX + "{} - de-registration failed{}", appPathIdentifier, e.getMessage(), e); } } }
先關閉各類定時任務,而後向eureka server 發送服務下線通知。服務下線流程以下圖:
https://github.com/Netflix/eureka/wiki
http://yeming.me/2016/12/01/eureka1/
http://blog.didispace.com/springcloud-sourcecode-eureka/
https://www.jianshu.com/p/71a8bdbf03f4
歡迎關注個人公衆號《程序員果果》,關注有驚喜~~