提及微服務框架,可能你們首先想到的就是Spring Cloud的你們族了。微服務簡單理解就是很小、功能相對集中的服務。在微服務的思想下,原有功能都集中在一塊兒實現的業務,如今拆分出來不少的服務模塊,那服務之間的互相調用就不可避免,同時服務的高可用也要可以有所保證。要解決上述須要,少不了微服務中相當重要的內容,即服務治理。spring
服務治理包括三大組件:服務註冊中心、服務的註冊、服務的發現。舉個例子:就比如咱們租房,房東呢,想要把房子租出去;房客想要租房。房東把本身的房子信息掛在租房平臺上,房客經過租房平臺瞭解到出租房屋的信息,進而能夠給房東打電話租房。bootstrap
這裏呢租房平臺就至關於服務註冊中心,房東在平臺上登記出租房屋至關於房屋的註冊,房客從平臺上獲取出租信息至關於服務的發現。進入到程序的世界,Spring Cloud Eureka組件就提供了服務治理的功能。Eureka的服務端就是註冊中心;客戶端既能夠是消費者也能夠是提供者,承擔着服務的註冊和發現。瀏覽器
簡單總結各類解決要提供的主要功能:緩存
服務端:服務器
接收註冊。app
接收續期。框架
提供服務下線。ide
提供服務列表微服務
客戶端:工具
註冊服務。
續期。
獲取服務列表。
維持心跳。
大致上瞭解以後,咱們就來探索一下在底層是如何實現的。
搭建Eureka環境(單實例環境)
基於Springboot咱們能夠輕鬆的搭建Eureka的客戶端和服務端。
1、服務端搭建:
引入依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId></dependency>
主類上添加註解:
@SpringBootApplication@EnableEurekaServerpublic class EurekaServer_6001 { public static void main(String[] args) { SpringApplication.run(EurekaServer_6001.class, args); }}
在application.properties中相關Eureka的配置:
eureka: instance: hostname: eurekapeer1 #eureka的註冊實例名稱 client: register-with-eureka: false # 服務註冊,不讓本身註冊到eureka中 fetch-registry: false # 服務的發現 , 本身不從服務中發現信息 service-url: defaultZone: http://localhost:6001/eureka/
啓動服務,使用瀏覽器訪問配置的端口,能夠看到Eureka的服務端的頁面:
2、客戶端搭建:
引入依賴:
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId></dependency>
主類添加註解:
@EnableDiscoveryClient@SpringBootApplicationpublic class ManagerZuulFilter { public static void main(String[] args) { SpringApplication.run(ManagerZuulFilter.class,args); }}
在application.properties中相關Eureka的配置:
eureka: client: register-with-eureka: true #服務註冊開關 fetch-registry: true #服務發現開關 serviceUrl: defaultZone: http://localhost:6001/eureka/
啓動服務,刷新服務端Eureka的頁面,看到客戶端服務註冊到Eureka的頁面。
關閉客戶端服務,服務端Eureka的管理頁面,客戶端的服務從列表中下線。
源碼詳解
1、客戶端源碼分析
客戶端服務的治理包括:服務的註冊、服務的續期、服務的下線、服務列表的發現等。
Eureka的註冊和發現的內容在DiscoveryClient類中。
@Inject DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args, Provider<BackupRegistry> backupRegistryProvider) { //...省略一些初始化的內容 //不註冊到Eureka server而且不從Eureka server中獲取服務列表,至關於單節點的Eureka服務端 if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()) { logger.info("Client configured to neither register nor query for data."); this.scheduler = null; this.heartbeatExecutor = null; this.cacheRefreshExecutor = null; this.eurekaTransport = null; this.instanceRegionChecker = new InstanceRegionChecker(new PropertyBasedAzToRegionMapper(config), this.clientConfig.getRegion()); DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); this.initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size()); } else { try { //註冊到Eureka而且從Eureka中獲取服務列表的邏輯處理 this.scheduler = Executors.newScheduledThreadPool(2, (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-%d").setDaemon(true).build()); //建立心跳定時執行任務 this.heartbeatExecutor = new ThreadPoolExecutor(1, this.clientConfig.getHeartbeatExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-HeartbeatExecutor-%d").setDaemon(true).build()); //建立刷新緩存定時執行任務 this.cacheRefreshExecutor = new ThreadPoolExecutor(1, this.clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0L, TimeUnit.SECONDS, new SynchronousQueue(), (new ThreadFactoryBuilder()).setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d").setDaemon(true).build()); //註冊前處理器,能夠在服務註冊到Eureka以前作些操做 if (this.preRegistrationHandler != null) { this.preRegistrationHandler.beforeRegistration(); } //是否強制初始階段就註冊到Eureka if (this.clientConfig.shouldRegisterWithEureka() && this.clientConfig.shouldEnforceRegistrationAtInit()) { try { if (!this.register()) { throw new IllegalStateException("Registration error at startup. Invalid server response."); } } catch (Throwable var8) { logger.error("Registration error at startup: {}", var8.getMessage()); throw new IllegalStateException(var8); } }
//執行定時任務 this.initScheduledTasks();
try { Monitors.registerObject(this); } catch (Throwable var7) { logger.warn("Cannot register timers", var7); }
DiscoveryManager.getInstance().setDiscoveryClient(this); DiscoveryManager.getInstance().setEurekaClientConfig(config); this.initTimestampMs = System.currentTimeMillis(); logger.info("Discovery Client initialized at timestamp {} with initial instances count: {}", this.initTimestampMs, this.getApplications().size()); } }
//初始化定時任務private void initScheduledTasks() { int renewalIntervalInSecs; int expBackOffBound; //定時獲取服務列表任務開啓 if (this.clientConfig.shouldFetchRegistry()) { renewalIntervalInSecs = this.clientConfig.getRegistryFetchIntervalSeconds(); expBackOffBound = this.clientConfig.getCacheRefreshExecutorExponentialBackOffBound(); this.scheduler.schedule(new TimedSupervisorTask("cacheRefresh", this.scheduler, this.cacheRefreshExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.CacheRefreshThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); } //註冊到Eureka,心跳定時任務開始。要讓Eureka Server知道客戶端還活着 if (this.clientConfig.shouldRegisterWithEureka()) { renewalIntervalInSecs = this.instanceInfo.getLeaseInfo().getRenewalIntervalInSecs(); expBackOffBound = this.clientConfig.getHeartbeatExecutorExponentialBackOffBound(); logger.info("Starting heartbeat executor: renew interval is: {}", renewalIntervalInSecs); this.scheduler.schedule(new TimedSupervisorTask("heartbeat", this.scheduler, this.heartbeatExecutor, renewalIntervalInSecs, TimeUnit.SECONDS, expBackOffBound, new DiscoveryClient.HeartbeatThread()), (long)renewalIntervalInSecs, TimeUnit.SECONDS); this.instanceInfoReplicator = new InstanceInfoReplicator(this, this.instanceInfo, this.clientConfig.getInstanceInfoReplicationIntervalSeconds(), 2); this.statusChangeListener = new StatusChangeListener() { public String getId() { return "statusChangeListener"; }
public void notify(StatusChangeEvent statusChangeEvent) { if (InstanceStatus.DOWN != statusChangeEvent.getStatus() && InstanceStatus.DOWN != statusChangeEvent.getPreviousStatus()) { DiscoveryClient.logger.info("Saw local status change event {}", statusChangeEvent); } else { DiscoveryClient.logger.warn("Saw local status change event {}", statusChangeEvent); }
DiscoveryClient.this.instanceInfoReplicator.onDemandUpdate(); } }; if (this.clientConfig.shouldOnDemandUpdateStatusChange()) { this.applicationInfoManager.registerStatusChangeListener(this.statusChangeListener); } this.instanceInfoReplicator.start(this.clientConfig.getInitialInstanceInfoReplicationIntervalSeconds()); } else { logger.info("Not registering with Eureka server per configuration"); }
}
//註冊發送,REST請求,註冊服務到Eureka Server中。boolean register() throws Throwable { logger.info("DiscoveryClient_{}: registering service...", this.appPathIdentifier);
EurekaHttpResponse httpResponse; try { httpResponse = this.eurekaTransport.registrationClient.register(this.instanceInfo); } catch (Exception var3) { logger.warn("DiscoveryClient_{} - registration failed {}", new Object[]{this.appPathIdentifier, var3.getMessage(), var3}); throw var3; }
if (logger.isInfoEnabled()) { logger.info("DiscoveryClient_{} - registration status: {}", this.appPathIdentifier, httpResponse.getStatusCode()); }
return httpResponse.getStatusCode() == 204; }
查看線程,發現確實有上述幾類線程。
2、服務端源碼分析
服務端的主要邏輯就是:接收客戶端的註冊/續期/下線等請求、提供服務列表、維護註冊信息等。
#eureka server端的功能代碼主要是在eureka-core包中主要涉及類和接口有:
EurekaServerBootstrap
InstanceRegistry
下面主要爲你們講一下客戶端註冊在服務端如何處理:eureka-server包下org.springframework.cloud.netflix.eureka.server.EurekaServerBootstrap初始化eurekserver相關的內容:
public class EurekaServerBootstrap { public void contextInitialized(ServletContext context) { try { this.initEurekaEnvironment();//初始化eureka的部署環境,默認test this.initEurekaServerContext();//初始化eureka server的上下文 context.setAttribute(EurekaServerContext.class.getName(), this.serverContext); } catch (Throwable var3) { log.error("Cannot bootstrap eureka server :", var3); throw new RuntimeException("Cannot bootstrap eureka server :", var3); } }}
eureka-server包下的org.springframework.cloud.netflix.eureka.server.InstanceRegistry中方法爲跟服務治理有關係的內容。
public class InstanceRegistry extends PeerAwareInstanceRegistryImpl implements ApplicationContextAware { public void register(InstanceInfo info, int leaseDuration, boolean isReplication) { this.handleRegistration(info, leaseDuration, isReplication); super.register(info, leaseDuration, isReplication);//帶有租期的註冊,默認租期90s }}
public abstract class AbstractInstanceRegistry implements InstanceRegistry { //存放註冊到eureka server上的服務的數據接口 //第一層的String key是指服務名 //第二層的String key是指每一個服務的instanceId:默認hostmame:applicationName:port //最裏面的InstanceInfo就是一個服務的詳細信息 private final ConcurrentHashMap<String, Map<String, Lease<InstanceInfo>>> registry = new ConcurrentHashMap(); //客戶端註冊到服務端的具體處理過程 public void register(InstanceInfo registrant, int leaseDuration, boolean isReplication) { try { this.read.lock(); //該Map存儲的就是eureka server上的服務信息 //根據應用名獲取該應用名下的列表 Map<String, Lease<InstanceInfo>> gMap = (Map)this.registry.get(registrant.getAppName()); EurekaMonitors.REGISTER.increment(isReplication); if (gMap == null) { //沒有註冊過,則新建立 ConcurrentHashMap<String, Lease<InstanceInfo>> gNewMap = new ConcurrentHashMap(); gMap = (Map)this.registry.putIfAbsent(registrant.getAppName(), gNewMap); if (gMap == null) { gMap = gNewMap; } } //經過instanceId獲取具體的服務 Lease<InstanceInfo> existingLease = (Lease)((Map)gMap).get(registrant.getId()); if (existingLease != null && existingLease.getHolder() != null) { Long existingLastDirtyTimestamp = ((InstanceInfo)existingLease.getHolder()).getLastDirtyTimestamp(); Long registrationLastDirtyTimestamp = registrant.getLastDirtyTimestamp(); logger.debug("Existing lease found (existing={}, provided={}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); if (existingLastDirtyTimestamp > registrationLastDirtyTimestamp) { logger.warn("There is an existing lease and the existing lease's dirty timestamp {} is greater than the one that is being registered {}", existingLastDirtyTimestamp, registrationLastDirtyTimestamp); logger.warn("Using the existing instanceInfo instead of the new instanceInfo as the registrant"); registrant = (InstanceInfo)existingLease.getHolder(); } } else { synchronized(this.lock) { if (this.expectedNumberOfRenewsPerMin > 0) { this.expectedNumberOfRenewsPerMin += 2; this.numberOfRenewsPerMinThreshold = (int)((double)this.expectedNumberOfRenewsPerMin * this.serverConfig.getRenewalPercentThreshold()); } }
logger.debug("No previous lease information found; it is new registration"); } //建立新的租期信息 Lease<InstanceInfo> lease = new Lease(registrant, leaseDuration); if (existingLease != null) { lease.setServiceUpTimestamp(existingLease.getServiceUpTimestamp()); } //註冊服務或者更新服務 ((Map)gMap).put(registrant.getId(), lease); synchronized(this.recentRegisteredQueue) { this.recentRegisteredQueue.add(new Pair(System.currentTimeMillis(), registrant.getAppName() + "(" + registrant.getId() + ")")); }
if (!InstanceStatus.UNKNOWN.equals(registrant.getOverriddenStatus())) { logger.debug("Found overridden status {} for instance {}. Checking to see if needs to be add to the overrides", registrant.getOverriddenStatus(), registrant.getId()); if (!this.overriddenInstanceStatusMap.containsKey(registrant.getId())) { logger.info("Not found overridden id {} and hence adding it", registrant.getId()); this.overriddenInstanceStatusMap.put(registrant.getId(), registrant.getOverriddenStatus()); } } InstanceStatus overriddenStatusFromMap = (InstanceStatus)this.overriddenInstanceStatusMap.get(registrant.getId()); if (overriddenStatusFromMap != null) { logger.info("Storing overridden status {} from map", overriddenStatusFromMap); registrant.setOverriddenStatus(overriddenStatusFromMap); }
InstanceStatus overriddenInstanceStatus = this.getOverriddenInstanceStatus(registrant, existingLease, isReplication); registrant.setStatusWithoutDirty(overriddenInstanceStatus); if (InstanceStatus.UP.equals(registrant.getStatus())) { lease.serviceUp(); }
registrant.setActionType(ActionType.ADDED); this.recentlyChangedQueue.add(new AbstractInstanceRegistry.RecentlyChangedItem(lease)); registrant.setLastUpdatedTimestamp(); this.invalidateCache(registrant.getAppName(), registrant.getVIPAddress(), registrant.getSecureVipAddress()); logger.info("Registered instance {}/{} with status {} (replication={})", new Object[]{registrant.getAppName(), registrant.getId(), registrant.getStatus(), isReplication}); } finally { this.read.unlock(); } }}
配置類
經常使用配置說明:客戶端、服務端、以及實例配置能夠直接在配置類的屬性中查到:
客戶端配置:前綴爲eureka.client
org.springframework.cloud.netflix.eureka.EurekaClientConfigBean
服務端配置:前綴爲eureka.server
org.springframework.cloud.netflix.eureka.server.EurekaServerConfigBean
實例配置:前綴eureka.instance
org.springframework.cloud.netflix.eureka.EurekaInstanceConfigBean
經常使用配置
註冊中心配置:
#關閉註冊中心的保護機制,Eureka 會統計15分鐘以內心跳失敗的比例低於85%將會觸發保護機制,不剔除服務提供者eureka.server.enable-self-preservation = false
服務實例配置:
#不使用主機名來定義註冊中心的地址,而使用IP地址的形式eureka.instance.prefer-ip-address = true
服務註冊類配置:
#獲取服務註冊列表eureka.client.fetch-registery = true#註冊服務到Eureka Servereureka.client.register-with-eureka = true#從Eureka服務器端獲取註冊信息的間隔時間,單位:秒eureka.client.registery-fetch-interval-seconds = 30#讀取EurekaServer信息的超時時間,單位:秒eureka.client.eureka-server-read-timeout-seconds = 8#鏈接EurekaServer的超時時間,單位:秒eureka.client.eureka-server-connect-timeout-seconds = 5
通用配置:
eureka.client.service-url.defaultZone = http://localhost:8761/eureka
Spring cloud Eureka中還提供了其餘的一些功能,好比認證和限流的內容,有興趣的能夠了解一下。除Eureka提供的服務治理外,還有其餘的框架也能夠實現服務治理,如Dubbo+Zookeeper,有興趣能夠對比看一下。
框架能夠算是一種工具,瞭解工具的內部是爲了更好的讓工具爲咱們服務。優秀框架的設計思想是咱們要學習的重點,抽絲剝繭以後,原理其實很基礎。