推薦閱讀java
SpringCloud源碼閱讀0-SpringCloud必備知識spring
SpringCloud源碼閱讀1-EurekaServer源碼的祕密緩存
配置類的做用通常就是配置框架運行的基本組件,因此看懂配置類,也就入了框架的門。app
當咱們在啓動類上加入@EnableDiscoveryClient
或者@EnableEurekaClient
時,就能使Eureka客戶端生效。框架
這兩個註解最終都會使,Eureka客戶端對應的配置類EurekaClientAutoConfiguration
生效。這裏直接講配置類,具體註解如何使他生效,不在此處贅述。dom
EurekaClientAutoConfiguration 做爲EurekaClient的自動配置類,配了EurekaClient運行所須要的組件。ide
(1.註解上的Bean工具
@Configuration
@EnableConfigurationProperties//啓動屬性映射
@ConditionalOnClass(EurekaClientConfig.class)//須要EurekaClientConfig類存在
@Import(DiscoveryClientOptionalArgsConfiguration.class)//加載可選參數配置類到容器,
@ConditionalOnBean(EurekaDiscoveryClientConfiguration.Marker.class)//須要開關存在
@ConditionalOnProperty(value = "eureka.client.enabled", matchIfMissing = true)//須要eureka.client.enabled屬性
//在這三個自動注入類以前解析
@AutoConfigureBefore({ NoopDiscoveryClientAutoConfiguration.class,
CommonsClientAutoConfiguration.class, ServiceRegistryAutoConfiguration.class })
//在這三個自動注入類以後解析
@AutoConfigureAfter(name = {"org.springframework.cloud.autoconfigure.RefreshAutoConfiguration",
"org.springframework.cloud.netflix.eureka.EurekaDiscoveryClientConfiguration",
"org.springframework.cloud.client.serviceregistry.AutoServiceRegistrationAutoConfiguration"})
public class EurekaClientAutoConfiguration {
}
複製代碼
能夠看出,註解大部分都是在作EurekaClientAutoConfiguration 配置類生效條件的判斷。oop
其中@AutoConfigureAfter對應的三個配置類須要講下:
RefreshAutoConfiguration:與刷新做用域有關的配置類,
EurekaDiscoveryClientConfiguration:
* 向容器中注入開關EurekaDiscoveryClientConfiguration.Marker
* 建立RefreshScopeRefreshedEvent事件監聽器,
* 當eureka.client.healthcheck.enabled=true時,注入EurekaHealthCheckHandler用於健康檢查
AutoServiceRegistrationAutoConfiguration:關於服務自動註冊的相關配置。
複製代碼
註解上沒有引入重要組件。post
(2.類內部的Bean
(3. 內部類的Bean EurekaClientAutoConfiguration 內部有兩個關於EurekaClient的配置類,
不論是哪一種EurekaClient都會註冊三個組件:
(4. 分類總結:
EurekaClient 能夠看作是客戶端的上下文。他的初始化,卸載方法包括了客戶端的整個生命週期
上文講到,此時EurekaClient註冊的是CloudEurekaClient。
@Inject
DiscoveryClient(ApplicationInfoManager applicationInfoManager, EurekaClientConfig config, AbstractDiscoveryClientOptionalArgs args,
Provider<BackupRegistry> backupRegistryProvider) {
//留下擴展點,能夠經過參數配置各類處理器
if (args != null) {
this.healthCheckHandlerProvider = args.healthCheckHandlerProvider;
this.healthCheckCallbackProvider = args.healthCheckCallbackProvider;
this.eventListeners.addAll(args.getEventListeners());
this.preRegistrationHandler = args.preRegistrationHandler;
} else {
this.healthCheckCallbackProvider = null;
this.healthCheckHandlerProvider = null;
this.preRegistrationHandler = null;
}
//賦值applicationInfoManager屬性
this.applicationInfoManager = applicationInfoManager;
//applicationInfoManager在初始化時,
//new InstanceInfoFactory().create(config);建立了當前實例信息
InstanceInfo myInfo = applicationInfoManager.getInfo();
clientConfig = config;
staticClientConfig = clientConfig;
transportConfig = config.getTransportConfig();
instanceInfo = myInfo;
if (myInfo != null) {
appPathIdentifier = instanceInfo.getAppName() + "/" + instanceInfo.getId();
} else {
logger.warn("Setting instanceInfo to a passed in null value");
}
//備用註冊處理器
this.backupRegistryProvider = backupRegistryProvider;
this.urlRandomizer = new EndpointUtils.InstanceInfoBasedUrlRandomizer(instanceInfo);
//本地區域應用緩存初始化,Applications存儲Application 列表
localRegionApps.set(new Applications());
fetchRegistryGeneration = new AtomicLong(0);
remoteRegionsToFetch = new AtomicReference<String>(clientConfig.fetchRegistryForRemoteRegions());
remoteRegionsRef = new AtomicReference<>(remoteRegionsToFetch.get() == null ? null : remoteRegionsToFetch.get().split(","));
if (config.shouldFetchRegistry()) {
this.registryStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRY_PREFIX + "lastUpdateSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.registryStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
if (config.shouldRegisterWithEureka()) {
this.heartbeatStalenessMonitor = new ThresholdLevelsMetric(this, METRIC_REGISTRATION_PREFIX + "lastHeartbeatSec_", new long[]{15L, 30L, 60L, 120L, 240L, 480L});
} else {
this.heartbeatStalenessMonitor = ThresholdLevelsMetric.NO_OP_METRIC;
}
//開始初始化默認區域
logger.info("Initializing Eureka in region {}", clientConfig.getRegion());
// 若是既不要向eureka server註冊,又不要獲取服務列表,就什麼都不用初始化
if (!config.shouldRegisterWithEureka() && !config.shouldFetchRegistry()){
....
return;
}
try {
//默認建立2個線程的調度池,用於TimedSupervisorTask任務。
scheduler = Executors.newScheduledThreadPool(2,
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-%d")
.setDaemon(true)
.build());
//心跳線程池,2個線程
heartbeatExecutor = new ThreadPoolExecutor(
1, clientConfig.getHeartbeatExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-HeartbeatExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//緩存刷新線程池,2個線程
cacheRefreshExecutor = new ThreadPoolExecutor(
1, clientConfig.getCacheRefreshExecutorThreadPoolSize(), 0, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
new ThreadFactoryBuilder()
.setNameFormat("DiscoveryClient-CacheRefreshExecutor-%d")
.setDaemon(true)
.build()
); // use direct handoff
//初始化與EurekaServer真正通訊的通訊器。
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
//配置區域映射器。可配置DNS映射。
AzToRegionMapper azToRegionMapper;
if (clientConfig.shouldUseDnsForFetchingServiceUrls()) {
azToRegionMapper = new DNSBasedAzToRegionMapper(clientConfig);
} else {
azToRegionMapper = new PropertyBasedAzToRegionMapper(clientConfig);
}
if (null != remoteRegionsToFetch.get()) {
azToRegionMapper.setRegionsToFetch(remoteRegionsToFetch.get().split(","));
}
//建立實例區域檢查器。
instanceRegionChecker = new InstanceRegionChecker(azToRegionMapper, clientConfig.getRegion());
} catch (Throwable e) {
throw new RuntimeException("Failed to initialize DiscoveryClient!", e);
}
// 若是須要從eureka server獲取服務列表,而且嘗試fetchRegistry(false)失敗,
//調用BackupRegistry
if (clientConfig.shouldFetchRegistry() && !fetchRegistry(false)) {
fetchRegistryFromBackup();
}
//回調擴展點處理器。此處理器,在註冊前處理。
if (this.preRegistrationHandler != null) {
this.preRegistrationHandler.beforeRegistration();
}
if (clientConfig.shouldRegisterWithEureka() && clientConfig.shouldEnforceRegistrationAtInit()) {
try {
if (!register() ) {
throw new IllegalStateException("Registration error at startup. Invalid server response.");
}
} catch (Throwable th) {
logger.error("Registration error at startup: {}", th.getMessage());
throw new IllegalStateException(th);
}
}
// 初始化全部定時任務
initScheduledTasks();
try {
Monitors.registerObject(this);
} catch (Throwable e) {
logger.warn("Cannot register timers", e);
}
//將client,clientConfig放到DiscoveryManager 統一管理,以便其餘地方能夠DI依賴注入。
DiscoveryManager.getInstance().setDiscoveryClient(this);
DiscoveryManager.getInstance().setEurekaClientConfig(config);
}
複製代碼
Eureka初始化仍是比較複雜,咱們找重點說說。
在DiscoveryClient初始化時,初始化EurekaTransport。
eurekaTransport = new EurekaTransport();
scheduleServerEndpointTask(eurekaTransport, args);
複製代碼
EurekaTransport 是客戶端與服務端底層通訊器。 有5個重要的屬性:
scheduleServerEndpointTask方法完成EurekaTransport5個屬性的初始化
transportClientFactory 屬於低層次的http工廠。 EurekaHttpClientFactory 屬於高層次的http工廠。 經過具備不一樣功能的EurekaHttpClientFactory 工廠 對transportClientFactory 進行層層裝飾。生產的http工具也具備不一樣層次的功能。 列如: 最外層的SessionedEurekaHttpClient 具備會話功能的EurekaHttpClient 第二次RetryableEurekaHttpClient 具備重試功能的EurekaHttpClient
private boolean fetchRegistry(boolean forceFullRegistryFetch) {
Stopwatch tracer = FETCH_REGISTRY_TIMER.start();
try {
//獲取本地緩存
Applications applications = getApplications();
//若是增量拉取被禁用或是第一次拉取,全量拉取server端已經註冊的服務實例信息
if (clientConfig.shouldDisableDelta()
|| (!Strings.isNullOrEmpty(clientConfig.getRegistryRefreshSingleVipAddress()))
|| forceFullRegistryFetch
|| (applications == null)
|| (applications.getRegisteredApplications().size() == 0)
|| (applications.getVersion() == -1)) //Client application does not have latest library supporting delta
{
//獲取所有實例
getAndStoreFullRegistry();
} else {
//增量拉取服務實例
getAndUpdateDelta(applications);
}
applications.setAppsHashCode(applications.getReconcileHashCode());
logTotalInstances();
} catch (Throwable e) {
return false;
} finally {
if (tracer != null) {
tracer.stop();
}
}
// 刷新本地緩存
onCacheRefreshed();
// 基於緩存中的實例數據更新遠程實例狀態, (發佈StatusChangeEvent)
updateInstanceRemoteStatus();
// 註冊表拉取成功後返回true
return true;
}
複製代碼
全量獲取最終調用
EurekaHttpResponse<Applications> httpResponse = clientConfig.getRegistryRefreshSingleVipAddress() == null
? eurekaTransport.queryClient.getApplications(remoteRegionsRef.get())
: eurekaTransport.queryClient.getVip(clientConfig.getRegistryRefreshSingleVipAddress(), remoteRegionsRef.get());
複製代碼
增量獲取
EurekaHttpResponse<Applications> httpResponse = eurekaTransport.queryClient.getDelta(remoteRegionsRef.get());
複製代碼
能夠看出,客戶端與服務端通訊底層是EurekaTransport在提供支持。
在此以前有必要說說:TimedSupervisorTask。 TimedSupervisorTask 是自動調節間隔的週期性任務,當不超時,將以初始化的間隔執行。當任務超時時,將下一個週期的間隔調大。每次超時都會增大相應倍數,直到外部設置的最大參數。一旦新任務再也不超時,間隔自動恢復默認值。
也就是說,這是一個具備自適應的週期性任務。(很是棒的設計啊)
private void initScheduledTasks() {
//1.若是獲取服務列表,則建立週期性緩存更新(即獲取服務列表任務)任務
if (clientConfig.shouldFetchRegistry()) {
//初始間隔時間(默認30秒)
int registryFetchIntervalSeconds = clientConfig.getRegistryFetchIntervalSeconds();
//最大倍數 默認10倍
int expBackOffBound = clientConfig.getCacheRefreshExecutorExponentialBackOffBound();
//執行TimedSupervisorTask ,監督CacheRefreshThread任務的執行。
//具體執行線程池cacheRefreshExecutor,具體任務CacheRefreshThread
scheduler.schedule(
new TimedSupervisorTask(
"cacheRefresh",
scheduler,
cacheRefreshExecutor,
registryFetchIntervalSeconds,
TimeUnit.SECONDS,
expBackOffBound,
new CacheRefreshThread()//緩存刷新,調用fetchRegistry()獲取服務列表
),
registryFetchIntervalSeconds, TimeUnit.SECONDS);
}
//2. 如何註冊,就建立週期性續租任務,維持心跳。
if (clientConfig.shouldRegisterWithEureka()) {
//心跳間隔,默認30秒。
int renewalIntervalInSecs = instanceInfo.getLeaseInfo().getRenewalIntervalInSecs();
//最大倍數 默認10倍
int expBackOffBound = clientConfig.getHeartbeatExecutorExponentialBackOffBound();
//執行TimedSupervisorTask ,監督HeartbeatThread任務的執行。
//具體執行線程池heartbeatExecutor,具體任務HeartbeatThread
scheduler.schedule(
new TimedSupervisorTask(
"heartbeat",
scheduler,
heartbeatExecutor,
renewalIntervalInSecs,
TimeUnit.SECONDS,
expBackOffBound,
new HeartbeatThread()
),
renewalIntervalInSecs, TimeUnit.SECONDS);
//3.建立應用實例信息複製器。
instanceInfoReplicator = new InstanceInfoReplicator(
this,
instanceInfo,
clientConfig.getInstanceInfoReplicationIntervalSeconds(),
2); // burstSize
//4.建立狀態改變監聽器,監聽StatusChangeEvent
statusChangeListener = new ApplicationInfoManager.StatusChangeListener() {
@Override
public String getId() {
return "statusChangeListener";
}
@Override
public void notify(StatusChangeEvent statusChangeEvent) {
if (InstanceStatus.DOWN == statusChangeEvent.getStatus() ||
InstanceStatus.DOWN == statusChangeEvent.getPreviousStatus()) {
// log at warn level if DOWN was involved
logger.warn("Saw local status change event {}", statusChangeEvent);
} else {
logger.info("Saw local status change event {}", statusChangeEvent);
}
//狀態有變化,使用信息複製器,執行一個任務,更新狀態變化到註冊中心
instanceInfoReplicator.onDemandUpdate();
}
};
//是否關注狀態變化,將監聽器添加到applicationInfoManager
if (clientConfig.shouldOnDemandUpdateStatusChange()) {
applicationInfoManager.registerStatusChangeListener(statusChangeListener);
}
// 啓動InstanceInfo複製器
instanceInfoReplicator.start(clientConfig.getInitialInstanceInfoReplicationIntervalSeconds());
} else {
logger.info("Not registering with Eureka server per configuration");
}
}
複製代碼
總結下initScheduledTasks()的工做: 如何配置獲取服務
cacheRefresh
,監督服務獲取CacheRefreshThread(DiscoveryClient-CacheRefreshExecutor-%d)
線程的執行,默認每30秒執行一次。獲取服務列表更新到本地緩存。任務內容refreshRegistry
,本地緩存localRegionApps
如何配置註冊,
"heartbeat",
監督續約任務HeartbeatThread(DiscoveryClient-HeartbeatExecutor-%d)
的執行,默認每30秒執行一次。任務內容爲renew()
onDemandUpdate()
方法,更新變化到遠程server(DiscoveryClient-InstanceInfoReplicator-%d)
,定時(默認40秒)檢測當前實例的DataCenterInfo、LeaseInfo、InstanceStatus,若是有變動,執行InstanceInfoReplicator.this.run()
方法將變動信息同步到server下面咱們看看這幾個重要的任務內容:
eurekaTransport.registrationClient.sendHeartBeat
向server發送當前實例信息public void run() {
try {
//1.刷新DataCenterInfo
//2.刷新LeaseInfo 租約信息
//3.從HealthCheckHandler中獲取InstanceStatus
discoveryClient.refreshInstanceInfo();
//若是isInstanceInfoDirty=true代表須要更新,返回dirtyTimestamp。
Long dirtyTimestamp = instanceInfo.isDirtyWithTime();
if (dirtyTimestamp != null) {
discoveryClient.register();//註冊。
instanceInfo.unsetIsDirty(dirtyTimestamp);//註冊完成,設置沒有更新。
}
} catch (Throwable t) {
logger.warn("There was a problem with the instance info replicator", t);
} finally {
Future next = scheduler.schedule(this, replicationIntervalSeconds, TimeUnit.SECONDS);
scheduledPeriodicRef.set(next);
}
}
複製代碼
能夠看出實例註冊就在discoveryClient.register()
。
那麼第一次註冊發生在何時呢?
initScheduledTasks方法中,執行instanceInfoReplicator.start時,會首先調用instanceInfo.setIsDirty(),初始化是否更新標誌位爲ture ,開啓線程,40秒後發起第一次註冊。(固然若是在這40秒內,若是有狀態變化,會當即發起註冊。)
boolean register() throws Throwable {
logger.info(PREFIX + "{}: registering service...", appPathIdentifier);
EurekaHttpResponse<Void> httpResponse;
try {
//發起註冊
httpResponse = eurekaTransport.registrationClient.register(instanceInfo);
} catch (Exception e) {
logger.warn(PREFIX + "{} - registration failed {}", appPathIdentifier, e.getMessage(), e);
throw e;
}
if (logger.isInfoEnabled()) {
logger.info(PREFIX + "{} - registration status: {}", appPathIdentifier, httpResponse.getStatusCode());
}
return httpResponse.getStatusCode() == 204;
}
複製代碼
能夠看出,register方法本質也是經過eurekaTransport 來發起與server的通訊的。
註解@PreDestroy修飾的shutdown()會在Servlet被完全卸載以前執行。
public synchronized void shutdown() {
if (isShutdown.compareAndSet(false, true)) {
logger.info("Shutting down DiscoveryClient ...");
//移除監聽
if (statusChangeListener != null && applicationInfoManager != null) {
applicationInfoManager.unregisterStatusChangeListener(statusChangeListener.getId());
}
//取消任務
cancelScheduledTasks();
// If APPINFO was registered
if (applicationInfoManager != null
&& clientConfig.shouldRegisterWithEureka()
&& clientConfig.shouldUnregisterOnShutdown()) {
applicationInfoManager.setInstanceStatus(InstanceStatus.DOWN);
//下線
unregister();
}
//通訊中斷
if (eurekaTransport != null) {
eurekaTransport.shutdown();
}
heartbeatStalenessMonitor.shutdown();
registryStalenessMonitor.shutdown();
logger.info("Completed shut down of DiscoveryClient");
}
}
複製代碼
EurekaAutoServiceRegistration實現了SmartLifecycle,會在spring啓動完畢後,調用其start()方法。
@Override
public void start() {
// 設置端口
if (this.port.get() != 0) {
if (this.registration.getNonSecurePort() == 0) {
this.registration.setNonSecurePort(this.port.get());
}
if (this.registration.getSecurePort() == 0 && this.registration.isSecure()) {
this.registration.setSecurePort(this.port.get());
}
}
if (!this.running.get() && this.registration.getNonSecurePort() > 0) {
//註冊
this.serviceRegistry.register(this.registration);
//發佈註冊成功事件
this.context.publishEvent(
new InstanceRegisteredEvent<>(this, this.registration.getInstanceConfig()));
this.running.set(true);//
}
}
複製代碼
this.serviceRegistry.register(this.registration);
@Override
public void register(EurekaRegistration reg) {
maybeInitializeClient(reg);
// 更改狀態,會觸發監聽器的執行
reg.getApplicationInfoManager()
.setInstanceStatus(reg.getInstanceConfig().getInitialStatus());
reg.getHealthCheckHandler().ifAvailable(healthCheckHandler ->
reg.getEurekaClient().registerHealthCheck(healthCheckHandler));
}
複製代碼
1.註冊流程:
EurekaAutoServiceRegistration.start()
-->EurekaServiceRegistry.register(EurekaRegistration)
-->ApplicationInfoManager.setInstanceStatus 狀態改變,
-->StatusChangeListener 監聽器監聽到狀態改變
-->InstanceInfoReplicator.onDemandUpdate() 更新狀態到server
-->InstanceInfoReplicator.run()
-->DiscoveryClient.register()
-->eurekaTransport.registrationClient.register(instanceInfo);
-->jerseyClient
2.客戶端初始化的幾個定時任務:
3.客戶端的幾個主要操做:
因爲篇幅緣由,不少細節不能一一展現。本文志在說說一些原理,具體細節能夠研讀源碼,會發現框架真優秀啊。