本文主要研究一下elasticsearch的SeedHostsProviderjava
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsProvider.javagit
/** * A pluggable provider of the list of seed hosts to use for discovery. */ public interface SeedHostsProvider { /** * Returns a list of seed hosts to use for discovery. Called repeatedly while discovery is active (i.e. while there is no master) * so that this list may be dynamic. */ List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver); /** * Helper object that allows to resolve a list of hosts to a list of transport addresses. * Each host is resolved into a transport address (or a collection of addresses if the * number of ports is greater than one) */ interface HostsResolver { List<TransportAddress> resolveHosts(List<String> hosts, int limitPortCounts); } }
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SettingsBasedSeedHostsProvider.javagithub
public class SettingsBasedSeedHostsProvider implements SeedHostsProvider { private static final Logger logger = LogManager.getLogger(SettingsBasedSeedHostsProvider.class); public static final Setting<List<String>> LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING = Setting.listSetting("discovery.zen.ping.unicast.hosts", emptyList(), Function.identity(), Property.NodeScope, Property.Deprecated); public static final Setting<List<String>> DISCOVERY_SEED_HOSTS_SETTING = Setting.listSetting("discovery.seed_hosts", emptyList(), Function.identity(), Property.NodeScope); // these limits are per-address private static final int LIMIT_FOREIGN_PORTS_COUNT = 1; private static final int LIMIT_LOCAL_PORTS_COUNT = 5; private final List<String> configuredHosts; private final int limitPortCounts; public SettingsBasedSeedHostsProvider(Settings settings, TransportService transportService) { if (LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.exists(settings)) { if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) { throw new IllegalArgumentException("it is forbidden to set both [" + DISCOVERY_SEED_HOSTS_SETTING.getKey() + "] and [" + LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.getKey() + "]"); } configuredHosts = LEGACY_DISCOVERY_ZEN_PING_UNICAST_HOSTS_SETTING.get(settings); // we only limit to 1 address, makes no sense to ping 100 ports limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } else if (DISCOVERY_SEED_HOSTS_SETTING.exists(settings)) { configuredHosts = DISCOVERY_SEED_HOSTS_SETTING.get(settings); // we only limit to 1 address, makes no sense to ping 100 ports limitPortCounts = LIMIT_FOREIGN_PORTS_COUNT; } else { // if unicast hosts are not specified, fill with simple defaults on the local machine configuredHosts = transportService.getLocalAddresses(); limitPortCounts = LIMIT_LOCAL_PORTS_COUNT; } logger.debug("using initial hosts {}", configuredHosts); } @Override public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) { return hostsResolver.resolveHosts(configuredHosts, limitPortCounts); } }
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/FileBasedSeedHostsProvider.java多線程
public class FileBasedSeedHostsProvider implements SeedHostsProvider { private static final Logger logger = LogManager.getLogger(FileBasedSeedHostsProvider.class); public static final String UNICAST_HOSTS_FILE = "unicast_hosts.txt"; private final Path unicastHostsFilePath; public FileBasedSeedHostsProvider(Path configFile) { this.unicastHostsFilePath = configFile.resolve(UNICAST_HOSTS_FILE); } private List<String> getHostsList() { if (Files.exists(unicastHostsFilePath)) { try (Stream<String> lines = Files.lines(unicastHostsFilePath)) { return lines.filter(line -> line.startsWith("#") == false) // lines starting with `#` are comments .collect(Collectors.toList()); } catch (IOException e) { logger.warn(() -> new ParameterizedMessage("failed to read file [{}]", unicastHostsFilePath), e); return Collections.emptyList(); } } logger.warn("expected, but did not find, a dynamic hosts list at [{}]", unicastHostsFilePath); return Collections.emptyList(); } @Override public List<TransportAddress> getSeedAddresses(HostsResolver hostsResolver) { final List<TransportAddress> transportAddresses = hostsResolver.resolveHosts(getHostsList(), 1); logger.debug("seed addresses: {}", transportAddresses); return transportAddresses; } }
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/zen/UnicastZenPing.javaelasticsearch
private SeedHostsProvider.HostsResolver createHostsResolver() { return (hosts, limitPortCounts) -> SeedHostsResolver.resolveHostsLists(unicastZenPingExecutorService, logger, hosts, limitPortCounts, transportService, resolveTimeout); }
elasticsearch-7.0.0/server/src/main/java/org/elasticsearch/discovery/SeedHostsResolver.javaide
public class SeedHostsResolver extends AbstractLifecycleComponent implements ConfiguredHostsResolver { //...... public static List<TransportAddress> resolveHostsLists( final ExecutorService executorService, final Logger logger, final List<String> hosts, final int limitPortCounts, final TransportService transportService, final TimeValue resolveTimeout) { Objects.requireNonNull(executorService); Objects.requireNonNull(logger); Objects.requireNonNull(hosts); Objects.requireNonNull(transportService); Objects.requireNonNull(resolveTimeout); if (resolveTimeout.nanos() < 0) { throw new IllegalArgumentException("resolve timeout must be non-negative but was [" + resolveTimeout + "]"); } // create tasks to submit to the executor service; we will wait up to resolveTimeout for these tasks to complete final List<Callable<TransportAddress[]>> callables = hosts .stream() .map(hn -> (Callable<TransportAddress[]>) () -> transportService.addressesFromString(hn, limitPortCounts)) .collect(Collectors.toList()); final List<Future<TransportAddress[]>> futures; try { futures = executorService.invokeAll(callables, resolveTimeout.nanos(), TimeUnit.NANOSECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); return Collections.emptyList(); } final List<TransportAddress> transportAddresses = new ArrayList<>(); final Set<TransportAddress> localAddresses = new HashSet<>(); localAddresses.add(transportService.boundAddress().publishAddress()); localAddresses.addAll(Arrays.asList(transportService.boundAddress().boundAddresses())); // ExecutorService#invokeAll guarantees that the futures are returned in the iteration order of the tasks so we can associate the // hostname with the corresponding task by iterating together final Iterator<String> it = hosts.iterator(); for (final Future<TransportAddress[]> future : futures) { final String hostname = it.next(); if (!future.isCancelled()) { assert future.isDone(); try { final TransportAddress[] addresses = future.get(); logger.trace("resolved host [{}] to {}", hostname, addresses); for (int addressId = 0; addressId < addresses.length; addressId++) { final TransportAddress address = addresses[addressId]; // no point in pinging ourselves if (localAddresses.contains(address) == false) { transportAddresses.add(address); } } } catch (final ExecutionException e) { assert e.getCause() != null; final String message = "failed to resolve host [" + hostname + "]"; logger.warn(message, e.getCause()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); // ignore } } else { logger.warn("timed out after [{}] resolving host [{}]", resolveTimeout, hostname); } } return Collections.unmodifiableList(transportAddresses); } //...... }