本文主要研究一下spring cloud的ConsulCatalogWatchjava
spring-cloud-consul-discovery-2.1.2.RELEASE-sources.jar!/org/springframework/cloud/consul/discovery/ConsulCatalogWatch.javagit
public class ConsulCatalogWatch implements ApplicationEventPublisherAware, SmartLifecycle { private static final Log log = LogFactory.getLog(ConsulDiscoveryClient.class); private final ConsulDiscoveryProperties properties; private final ConsulClient consul; private final TaskScheduler taskScheduler; private final AtomicReference<BigInteger> catalogServicesIndex = new AtomicReference<>(); private final AtomicBoolean running = new AtomicBoolean(false); private ApplicationEventPublisher publisher; private ScheduledFuture<?> watchFuture; public ConsulCatalogWatch(ConsulDiscoveryProperties properties, ConsulClient consul) { this(properties, consul, getTaskScheduler()); } public ConsulCatalogWatch(ConsulDiscoveryProperties properties, ConsulClient consul, TaskScheduler taskScheduler) { this.properties = properties; this.consul = consul; this.taskScheduler = taskScheduler; } private static ThreadPoolTaskScheduler getTaskScheduler() { ThreadPoolTaskScheduler taskScheduler = new ThreadPoolTaskScheduler(); taskScheduler.initialize(); return taskScheduler; } @Override public void setApplicationEventPublisher(ApplicationEventPublisher publisher) { this.publisher = publisher; } @Override public boolean isAutoStartup() { return true; } @Override public void stop(Runnable callback) { this.stop(); callback.run(); } @Override public void start() { if (this.running.compareAndSet(false, true)) { this.watchFuture = this.taskScheduler.scheduleWithFixedDelay( this::catalogServicesWatch, this.properties.getCatalogServicesWatchDelay()); } } @Override public void stop() { if (this.running.compareAndSet(true, false) && this.watchFuture != null) { this.watchFuture.cancel(true); } } @Override public boolean isRunning() { return false; } @Override public int getPhase() { return 0; } @Timed("consul.watch-catalog-services") public void catalogServicesWatch() { try { long index = -1; if (this.catalogServicesIndex.get() != null) { index = this.catalogServicesIndex.get().longValue(); } Response<Map<String, List<String>>> response = this.consul.getCatalogServices( new QueryParams(this.properties.getCatalogServicesWatchTimeout(), index), this.properties.getAclToken()); Long consulIndex = response.getConsulIndex(); if (consulIndex != null) { this.catalogServicesIndex.set(BigInteger.valueOf(consulIndex)); } if (log.isTraceEnabled()) { log.trace("Received services update from consul: " + response.getValue() + ", index: " + consulIndex); } this.publisher.publishEvent(new HeartbeatEvent(this, consulIndex)); } catch (Exception e) { log.error("Error watching Consul CatalogServices", e); } } }
spring-cloud-consul-discovery-2.1.2.RELEASE-sources.jar!/org/springframework/cloud/consul/discovery/ConsulDiscoveryClientConfiguration.javagithub
@Configuration @ConditionalOnConsulEnabled @ConditionalOnProperty(value = "spring.cloud.consul.discovery.enabled", matchIfMissing = true) @ConditionalOnDiscoveryEnabled @EnableConfigurationProperties @AutoConfigureBefore({ SimpleDiscoveryClientAutoConfiguration.class, CommonsClientAutoConfiguration.class }) public class ConsulDiscoveryClientConfiguration { /** * Name of the catalog watch task scheduler bean. */ public static final String CATALOG_WATCH_TASK_SCHEDULER_NAME = "catalogWatchTaskScheduler"; @Autowired private ConsulClient consulClient; @Bean @ConditionalOnMissingBean @ConditionalOnProperty("spring.cloud.consul.discovery.heartbeat.enabled") // TODO: move to service-registry for Edgware public TtlScheduler ttlScheduler(HeartbeatProperties heartbeatProperties) { return new TtlScheduler(heartbeatProperties, this.consulClient); } @Bean @ConditionalOnMissingBean // TODO: move to service-registry for Edgware public HeartbeatProperties heartbeatProperties() { return new HeartbeatProperties(); } @Bean @ConditionalOnMissingBean // TODO: Split appropriate values to service-registry for Edgware public ConsulDiscoveryProperties consulDiscoveryProperties(InetUtils inetUtils) { return new ConsulDiscoveryProperties(inetUtils); } @Bean @ConditionalOnMissingBean public ConsulDiscoveryClient consulDiscoveryClient( ConsulDiscoveryProperties discoveryProperties) { return new ConsulDiscoveryClient(this.consulClient, discoveryProperties); } @Bean @ConditionalOnMissingBean @ConditionalOnProperty(name = "spring.cloud.consul.discovery.catalog-services-watch.enabled", matchIfMissing = true) public ConsulCatalogWatch consulCatalogWatch( ConsulDiscoveryProperties discoveryProperties, @Qualifier(CATALOG_WATCH_TASK_SCHEDULER_NAME) TaskScheduler taskScheduler) { return new ConsulCatalogWatch(discoveryProperties, this.consulClient, taskScheduler); } @Bean(name = CATALOG_WATCH_TASK_SCHEDULER_NAME) @ConditionalOnProperty(name = "spring.cloud.consul.discovery.catalog-services-watch.enabled", matchIfMissing = true) public TaskScheduler catalogWatchTaskScheduler() { return new ThreadPoolTaskScheduler(); } }
ConsulCatalogWatch構造器接收ConsulDiscoveryProperties、ConsulClient、TaskScheduler;其start方法會使用taskScheduler.scheduleWithFixedDelay註冊catalogServicesWatch的定時任務;stop方法則是cancel掉這個定時任務;catalogServicesWatch方法使用consul.getCatalogServices方法獲取consulIndex而後更新本地的catalogServicesIndex,發佈HeartbeatEventspring