上一篇文章說了ES的源碼編譯以及如何在本地編譯。這一篇文章主要說明ES的啓動過程。html
參考ElasticSearch源碼編譯和Debug。java
說明:本文章使用的ES版本是:6.7.0
node
啓動函數:org.elasticsearch.bootstrap.ElasticSearchgit
設置以下斷點:github
啓動在上一篇文章中介紹的Debug模式中的一種,這裏我用的遠程Debug模式。算法
跟着Debug流程走一遍,能夠看出ES啓動流程大概分爲如下幾個階段:bootstrap
org.elasticsearch.node.Node 啓動單機節點,建立keepAlive線程segmentfault
爲建立Node對象作準備,並最終建立Node對象api
建立Node對象安全
程序入口代碼以下:
建立 SecurityManager 安全管理器
SecurityManager:安全管理器在Java語言中的做用就是檢查操做是否有權限執行,經過則順序進行,不然拋出一個異常
Elasticsearch類繼承了EnvironmentAwareCommand、Command,其完整的繼承關係以下
因此Elasticsearch也能夠解析命令行參數。
elasticsearch.main(args, terminal); 這裏的main方法是其父類中的main方法,這裏由於繼承關係,方法執行的順序以下:
Bootstrap階段作的事情比較多,主要方法以下:
/** * This method is invoked by {@link Elasticsearch#main(String[])} to startup elasticsearch. */ static void init( final boolean foreground, final Path pidFile, final boolean quiet, final Environment initialEnv) throws BootstrapException, NodeValidationException, UserException { // force the class initializer for BootstrapInfo to run before // the security manager is installed BootstrapInfo.init(); INSTANCE = new Bootstrap(); final SecureSettings keystore = loadSecureSettings(initialEnv); final Environment environment = createEnvironment(foreground, pidFile, keystore, initialEnv.settings(), initialEnv.configFile()); if (Node.NODE_NAME_SETTING.exists(environment.settings())) { LogConfigurator.setNodeName(Node.NODE_NAME_SETTING.get(environment.settings())); } try { LogConfigurator.configure(environment); } catch (IOException e) { throw new BootstrapException(e); } if (environment.pidFile() != null) { try { PidFile.create(environment.pidFile(), true); } catch (IOException e) { throw new BootstrapException(e); } } final boolean closeStandardStreams = (foreground == false) || quiet; try { if (closeStandardStreams) { final Logger rootLogger = LogManager.getRootLogger(); final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class); if (maybeConsoleAppender != null) { Loggers.removeAppender(rootLogger, maybeConsoleAppender); } closeSystOut(); } // fail if somebody replaced the lucene jars checkLucene(); // install the default uncaught exception handler; must be done before security is // initialized as we do not want to grant the runtime permission // setDefaultUncaughtExceptionHandler Thread.setDefaultUncaughtExceptionHandler(new ElasticsearchUncaughtExceptionHandler()); INSTANCE.setup(true, environment); try { // any secure settings must be read during node construction IOUtils.close(keystore); } catch (IOException e) { throw new BootstrapException(e); } INSTANCE.start(); if (closeStandardStreams) { closeSysError(); } } catch (NodeValidationException | RuntimeException e) { // disable console logging, so user does not see the exception twice (jvm will show it already) final Logger rootLogger = LogManager.getRootLogger(); final Appender maybeConsoleAppender = Loggers.findAppender(rootLogger, ConsoleAppender.class); if (foreground && maybeConsoleAppender != null) { Loggers.removeAppender(rootLogger, maybeConsoleAppender); } Logger logger = LogManager.getLogger(Bootstrap.class); // HACK, it sucks to do this, but we will run users out of disk space otherwise if (e instanceof CreationException) { // guice: log the shortened exc to the log file ByteArrayOutputStream os = new ByteArrayOutputStream(); PrintStream ps = null; try { ps = new PrintStream(os, false, "UTF-8"); } catch (UnsupportedEncodingException uee) { assert false; e.addSuppressed(uee); } new StartupException(e).printStackTrace(ps); ps.flush(); try { logger.error("Guice Exception: {}", os.toString("UTF-8")); } catch (UnsupportedEncodingException uee) { assert false; e.addSuppressed(uee); } } else if (e instanceof NodeValidationException) { logger.error("node validation exception\n{}", e.getMessage()); } else { // full exception logger.error("Exception", e); } // re-enable it if appropriate, so they can see any logging during the shutdown process if (foreground && maybeConsoleAppender != null) { Loggers.addAppender(rootLogger, maybeConsoleAppender); } throw e; } }
詳細流程以下:
INSTANCE = new Bootstrap();, 建立Bootstrap對象實例,該類構造函數會建立一個用戶線程,添加到Runtime Hook中,進行 countDown 操做
CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程執行完後再執行。例如,應用程序的主線程但願在負責啓動框架服務的線程已經啓動全部框架服務以後執行。
log4j2.properties
配置文件加載日誌相關配置setDefaultUncaughtExceptionHandler:設置程序中產生的某些未捕獲的異常的處理方式
UncaughtExceptionHandler:在多線程中,有時沒法捕獲其餘線程產生的異常,這時候須要某種機制捕獲並處理異常,UncaughtExceptionHandler就是來作這件事情的
在第二個階段中的最後兩步都就是和建立節點相關的。
在Bootstrap.init中調用該方法。
setup方法以下:
private void setup(boolean addShutdownHook, Environment environment) throws BootstrapException { Settings settings = environment.settings(); try { spawner.spawnNativeControllers(environment); } catch (IOException e) { throw new BootstrapException(e); } initializeNatives( environment.tmpFile(), BootstrapSettings.MEMORY_LOCK_SETTING.get(settings), BootstrapSettings.SYSTEM_CALL_FILTER_SETTING.get(settings), BootstrapSettings.CTRLHANDLER_SETTING.get(settings)); // initialize probes before the security manager is installed initializeProbes(); if (addShutdownHook) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { IOUtils.close(node, spawner); LoggerContext context = (LoggerContext) LogManager.getContext(false); Configurator.shutdown(context); } catch (IOException ex) { throw new ElasticsearchException("failed to stop node", ex); } } }); } try { // look for jar hell final Logger logger = LogManager.getLogger(JarHell.class); JarHell.checkJarHell(logger::debug); } catch (IOException | URISyntaxException e) { throw new BootstrapException(e); } // Log ifconfig output before SecurityManager is installed IfConfig.logIfNecessary(); // install SM after natives, shutdown hooks, etc. try { Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings)); } catch (IOException | NoSuchAlgorithmException e) { throw new BootstrapException(e); } node = new Node(environment) { @Override protected void validateNodeBeforeAcceptingRequests( final BootstrapContext context, final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException { BootstrapChecks.check(context, boundTransportAddress, checks); } @Override protected void registerDerivedNodeNameWithLogger(String nodeName) { LogConfigurator.setNodeName(nodeName); } }; }
initializeNatives(Path tmpFile, boolean mlockAll, boolean systemCallFilter, boolean ctrlHandler):初始化本地資源
Node的建立過程很複雜,這裏只大概說一下里面作了哪些事情,詳細的過程還需讀者細度源碼。其部分代碼以下:
/** * Constructs a node * * @param environment the environment for this node * @param classpathPlugins the plugins to be loaded from the classpath * @param forbidPrivateIndexSettings whether or not private index settings are forbidden when creating an index; this is used in the * test framework for tests that rely on being able to set private settings */ protected Node( final Environment environment, Collection<Class<? extends Plugin>> classpathPlugins, boolean forbidPrivateIndexSettings) { logger = LogManager.getLogger(Node.class); final List<Closeable> resourcesToClose = new ArrayList<>(); // register everything we need to release in the case of an error boolean success = false; try { Settings tmpSettings = Settings.builder().put(environment.settings()) .put(Client.CLIENT_TYPE_SETTING_S.getKey(), CLIENT_TYPE).build(); /* * Create the node environment as soon as possible so we can * recover the node id which we might have to use to derive the * node name. And it is important to get *that* as soon as possible * so that log lines can contain it. */ boolean nodeNameExplicitlyDefined = NODE_NAME_SETTING.exists(tmpSettings); try { Consumer<String> nodeIdConsumer = nodeNameExplicitlyDefined ? nodeId -> {} : nodeId -> registerDerivedNodeNameWithLogger(nodeIdToNodeName(nodeId)); nodeEnvironment = new NodeEnvironment(tmpSettings, environment, nodeIdConsumer); resourcesToClose.add(nodeEnvironment); } catch (IOException ex) { throw new IllegalStateException("Failed to create node environment", ex); } if (nodeNameExplicitlyDefined) { logger.info("node name [{}], node ID [{}]", NODE_NAME_SETTING.get(tmpSettings), nodeEnvironment.nodeId()); } else { tmpSettings = Settings.builder() .put(tmpSettings) .put(NODE_NAME_SETTING.getKey(), nodeIdToNodeName(nodeEnvironment.nodeId())) .build(); logger.info("node name derived from node ID [{}]; set [{}] to override", nodeEnvironment.nodeId(), NODE_NAME_SETTING.getKey()); } final JvmInfo jvmInfo = JvmInfo.jvmInfo(); logger.info( "version[{}], pid[{}], build[{}/{}/{}/{}], OS[{}/{}/{}], JVM[{}/{}/{}/{}]", Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()), jvmInfo.pid(), Build.CURRENT.flavor().displayName(), Build.CURRENT.type().displayName(), Build.CURRENT.shortHash(), Build.CURRENT.date(), Constants.OS_NAME, Constants.OS_VERSION, Constants.OS_ARCH, Constants.JVM_VENDOR, Constants.JVM_NAME, Constants.JAVA_VERSION, Constants.JVM_VERSION); logger.info("JVM arguments {}", Arrays.toString(jvmInfo.getInputArguments())); warnIfPreRelease(Version.CURRENT, Build.CURRENT.isSnapshot(), logger); if (logger.isDebugEnabled()) { logger.debug("using config [{}], data [{}], logs [{}], plugins [{}]", environment.configFile(), Arrays.toString(environment.dataFiles()), environment.logsFile(), environment.pluginsFile()); } ... }
Node 實例化對象過程以下:
若是啓動了http配置,則加載rest中的全部ActionHandler,用於處理各類http請求,代碼以下:
if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); actionModule.initRestHandlers(() -> clusterService.state().nodes()); }
在new PluginsService中有代碼:Set<Bundle> modules = getModuleBundles(modulesDirectory);,用來加載模塊和插件,跟進代碼來到org.elasticsearch.plugins.PluginsService#readPluginBundle方法以下:
// get a bundle for a single plugin dir private static Bundle readPluginBundle(final Set<Bundle> bundles, final Path plugin, String type) throws IOException { LogManager.getLogger(PluginsService.class).trace("--- adding [{}] [{}]", type, plugin.toAbsolutePath()); final PluginInfo info; try { info = PluginInfo.readFromProperties(plugin); } catch (final IOException e) { throw new IllegalStateException("Could not load plugin descriptor for " + type + " directory [" + plugin.getFileName() + "]", e); } final Bundle bundle = new Bundle(info, plugin); if (bundles.add(bundle) == false) { throw new IllegalStateException("duplicate " + type + ": " + info); } return bundle; }
其中的info = PluginInfo.readFromProperties(plugin);就是從指定目錄加載模塊或者插件,代碼以下:
/** * Reads the plugin descriptor file. * * @param path the path to the root directory for the plugin * @return the plugin info * @throws IOException if an I/O exception occurred reading the plugin descriptor */ public static PluginInfo readFromProperties(final Path path) throws IOException { final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES); final Map<String, String> propsMap; { final Properties props = new Properties(); try (InputStream stream = Files.newInputStream(descriptor)) { props.load(stream); } propsMap = props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(), props::getProperty)); } final String name = propsMap.remove("name"); if (name == null || name.isEmpty()) { throw new IllegalArgumentException( "property [name] is missing in [" + descriptor + "]"); } final String description = propsMap.remove("description"); if (description == null) { throw new IllegalArgumentException( "property [description] is missing for plugin [" + name + "]"); } final String version = propsMap.remove("version"); if (version == null) { throw new IllegalArgumentException( "property [version] is missing for plugin [" + name + "]"); } final String esVersionString = propsMap.remove("elasticsearch.version"); if (esVersionString == null) { throw new IllegalArgumentException( "property [elasticsearch.version] is missing for plugin [" + name + "]"); } final Version esVersion = Version.fromString(esVersionString); final String javaVersionString = propsMap.remove("java.version"); if (javaVersionString == null) { throw new IllegalArgumentException( "property [java.version] is missing for plugin [" + name + "]"); } JarHell.checkVersionFormat(javaVersionString); final String classname = propsMap.remove("classname"); if (classname == null) { throw new IllegalArgumentException( "property [classname] is missing for plugin [" + name + "]"); } final String extendedString = propsMap.remove("extended.plugins"); final List<String> extendedPlugins; if (extendedString == null) { extendedPlugins = Collections.emptyList(); } else { extendedPlugins = Arrays.asList(Strings.delimitedListToStringArray(extendedString, ",")); } final String hasNativeControllerValue = propsMap.remove("has.native.controller"); final boolean hasNativeController; if (hasNativeControllerValue == null) { hasNativeController = false; } else { switch (hasNativeControllerValue) { case "true": hasNativeController = true; break; case "false": hasNativeController = false; break; default: final String message = String.format( Locale.ROOT, "property [%s] must be [%s], [%s], or unspecified but was [%s]", "has_native_controller", "true", "false", hasNativeControllerValue); throw new IllegalArgumentException(message); } } if (esVersion.before(Version.V_6_3_0) && esVersion.onOrAfter(Version.V_6_0_0_beta2)) { propsMap.remove("requires.keystore"); } if (propsMap.isEmpty() == false) { throw new IllegalArgumentException("Unknown properties in plugin descriptor: " + propsMap.keySet()); } return new PluginInfo(name, description, version, esVersion, javaVersionString, classname, extendedPlugins, hasNativeController); }
PluginInfo類有兩個全局常量:
public static final String ES_PLUGIN_PROPERTIES = "plugin-descriptor.properties"; public static final String ES_PLUGIN_POLICY = "plugin-security.policy";
這是兩個配置模板,每一個插件和模塊都會按照plugin-descriptor.properties中的模板讀取響應的配置:name、description、version、elasticsearch.version、java.version、classname、has.native.controller、require.keystore。用這些配置,最終封裝成一個PluginInfo對象。最終返回給PluginsService的數據結構以下:Set<Bundle(PluginInfo, path)>
ES的線程池類型:
public enum ThreadPoolType { DIRECT("direct"), FIXED("fixed"), FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), SCALING("scaling"); private final String type; public String getType() { return type; } ThreadPoolType(String type) { this.type = type; } private static final Map<String, ThreadPoolType> TYPE_MAP; static { Map<String, ThreadPoolType> typeMap = new HashMap<>(); for (ThreadPoolType threadPoolType : ThreadPoolType.values()) { typeMap.put(threadPoolType.getType(), threadPoolType); } TYPE_MAP = Collections.unmodifiableMap(typeMap); } public static ThreadPoolType fromType(String type) { ThreadPoolType threadPoolType = TYPE_MAP.get(type); if (threadPoolType == null) { throw new IllegalArgumentException("no ThreadPoolType for " + type); } return threadPoolType; } }
如上,四種類型分別爲:
這一步當中,ThreadPool()建立了不少線程池,線程池的名稱以下:
public static class Names { public static final String SAME = "same"; public static final String GENERIC = "generic"; public static final String LISTENER = "listener"; public static final String GET = "get"; public static final String ANALYZE = "analyze"; public static final String INDEX = "index"; public static final String WRITE = "write"; public static final String SEARCH = "search"; public static final String SEARCH_THROTTLED = "search_throttled"; public static final String MANAGEMENT = "management"; public static final String FLUSH = "flush"; public static final String REFRESH = "refresh"; public static final String WARMER = "warmer"; public static final String SNAPSHOT = "snapshot"; public static final String FORCE_MERGE = "force_merge"; public static final String FETCH_SHARD_STARTED = "fetch_shard_started"; public static final String FETCH_SHARD_STORE = "fetch_shard_store"; }
參考官方文檔能夠查看各個線程池的做用,線程池類型,線程數量,等待隊列數量等。
在Bootstrap.init中調用該方法。
完成上面的步驟以後,若是是控制檯啓動服務,能夠再控制檯看到輸出以下:
若是看到日誌:
[elasticsearch] [2019-04-09T20:01:12,428][INFO ][o.e.n.Node ] [node-0] starting ...
就說明Node已經開始啓動了。
Node 的啓動步驟,大概作了這些事情:
啓動各類服務:
服務名 | 簡介 |
---|---|
IndicesService | 索引管理 |
IndicesClusterStateService | 跨集羣同步 |
SnapshotsService | 負責建立快照 |
SnapshotShardsService | 此服務在數據和主節點上運行,並控制這些節點上當前快照的分片。 它負責啓動和中止分片級別快照 |
RoutingService | 偵聽集羣狀態,當它收到ClusterChangedEvent(集羣改變事件)將驗證集羣狀態,路由表可能會更新 |
SearchService | 搜索服務 |
ClusterService | 集羣管理 |
NodeConnectionsService | 此組件負責在節點添加到羣集狀態後鏈接到節點,並在刪除它們時斷開鏈接。 此外,它會按期檢查全部鏈接是否仍處於打開狀態,並在須要時還原它們。 請注意,若是節點斷開/不響應ping,則此組件不負責從羣集中刪除節點。 這是由NodesFaultDetection完成的。 主故障檢測由連接MasterFaultDetection完成。 |
ResourceWatcherService | 通用資源觀察器服務 |
GatewayService | 網關服務 |
Discovery | 節點發現? |
TransportService | 節點間數據同步網絡服務 |
TaskResultsService | |
HttpServerTransport | 外部網絡服務 |
當看到控制檯以下輸出則說明該節點啓動成功:
[elasticsearch] [2019-04-09T20:04:16,388][INFO ][o.e.n.Node ] [node-0] started
從上面的步驟能夠看出Elasticsearch的單節點啓動過程仍是很複雜的,並且文章只是列出了大概的啓動步驟,還有不少細節沒有深挖,好比節點和集羣的相互發現與加入,節點間的數據同步,集羣master是如何選舉的等。細節還需各位讀者深讀源碼。
參考:http://laijianfeng.org/2018/09/Elasticsearch-6-3-2-%E5%90%AF%E5%8A%A8%E8%BF%87%E7%A8%8B/