直接看圖
node
上圖中虛線表示進入具體流程,實線表示下一步,爲了後面講解方便每一個步驟都加了編號。
先簡單介紹下啓動流程主要涉及的類:apache
// we want the JVM to think there is a security manager installed so that if internal policy decisions that would be based on the
// presence of a security manager or lack thereof act as if there is a security manager present (e.g., DNS cache policy)
//咱們但願JVM認爲已經安裝了一個安全管理器,這樣,若是基於安全管理器的存在或缺乏安全管理器的內部策略決策就會像有一個安全管理器同樣(e.g.、DNS緩存策略)
// grant all permissions so that we can later set the security manager to the one that we want
//授予全部權限,以便稍後能夠將安全管理器設置爲所需的權限
添加StatusConsoleListener到STATUS_LOGGER:bootstrap
We want to detect situations where we touch logging before the configuration is loaded . If we do this , Log 4 j will status log an error message at the error level . With this error listener , we can capture if this happens . More broadly , we can detect any error - level status log message which likely indicates that something is broken . The listener is installed immediately on startup , and then when we get around to configuring logging we check that no error - level log messages have been logged by the status logger . If they have we fail startup and any such messages can be seen on the console
咱們但願檢測在加載配置以前進行日誌記錄的狀況。若是這樣作,log4j將在錯誤級別記錄一條錯誤消息。使用這個錯誤監聽器,咱們能夠捕捉到這種狀況。更普遍地說,咱們能夠檢測任何錯誤級別的狀態日誌消息,這些消息可能表示某個東西壞了。偵聽器在啓動時當即安裝,而後在配置日誌記錄時,咱們檢查狀態日誌記錄器沒有記錄錯誤級別的日誌消息。若是它們啓動失敗,咱們能夠在控制檯上看到任何此類消息。
實例化Elasticsearch:緩存
Elasticsearch() { super("starts elasticsearch", () -> {}); // () -> {} 是啓動前的回調 //下面解析version,daemonize,pidfile,quiet參數 versionOption = parser.acceptsAll(Arrays.asList("V", "version"), "Prints elasticsearch version information and exits"); daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"), "Starts Elasticsearch in the background") .availableUnless(versionOption); pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"), "Creates a pid file in the specified path on start") .availableUnless(versionOption) .withRequiredArg() .withValuesConvertedBy(new PathConverter()); quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"), "Turns off standard output/error streams logging in console") .availableUnless(versionOption) .availableUnless(daemonizeOption); }
shutdownHookThread = new Thread(() -> { try { this.close(); } catch (final IOException e) { try ( StringWriter sw = new StringWriter(); PrintWriter pw = new PrintWriter(sw)) { e.printStackTrace(pw); terminal.println(sw.toString()); } catch (final IOException impossible) { // StringWriter#close declares a checked IOException from the Closeable interface but the Javadocs for StringWriter // say that an exception here is impossible throw new AssertionError(impossible); } } }); Runtime.getRuntime().addShutdownHook(shutdownHookThread);
而後調用beforeMain.run(),其實就是上面實例化Elasticsearch對象時建立的()->{} lambda表達式。安全
void mainWithoutErrorHandling(String[] args, Terminal terminal) throws Exception { final OptionSet options = parser.parse(args);//根據提供給解析器的選項規範解析給定的命令行參數 if (options.has(helpOption)) { printHelp(terminal); return; } if (options.has(silentOption)) {//terminal打印最少內容 terminal.setVerbosity(Terminal.Verbosity.SILENT); } else if (options.has(verboseOption)) {//terminal打印詳細內容 terminal.setVerbosity(Terminal.Verbosity.VERBOSE); } else { terminal.setVerbosity(Terminal.Verbosity.NORMAL); } execute(terminal, options); }
protected void execute(Terminal terminal, OptionSet options) throws Exception { final Map<String, String> settings = new HashMap<>(); for (final KeyValuePair kvp : settingOption.values(options)) { if (kvp.value.isEmpty()) { throw new UserException(ExitCodes.USAGE, "setting [" + kvp.key + "] must not be empty"); } if (settings.containsKey(kvp.key)) { final String message = String.format( Locale.ROOT, "setting [%s] already set, saw [%s] and [%s]", kvp.key, settings.get(kvp.key), kvp.value); throw new UserException(ExitCodes.USAGE, message); } settings.put(kvp.key, kvp.value); } //確保給定的設置存在,若是還沒有設置,則從系統屬性中讀取它。 putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data"); putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home"); putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs"); execute(terminal, options, createEnv(terminal, settings)); }
protected void execute(Terminal terminal, OptionSet options, Environment env) throws UserException { if (options.nonOptionArguments().isEmpty() == false) { throw new UserException(ExitCodes.USAGE, "Positional arguments not allowed, found " + options.nonOptionArguments()); } if (options.has(versionOption)) { //若是有 -v 參數,打印版本號後直接退出 terminal.println("Version: " + Version.displayVersion(Version.CURRENT, Build.CURRENT.isSnapshot()) + ", Build: " + Build.CURRENT.shortHash() + "/" + Build.CURRENT.date() + ", JVM: " + JvmInfo.jvmInfo().version()); return; } final boolean daemonize = options.has(daemonizeOption); final Path pidFile = pidfileOption.value(options); final boolean quiet = options.has(quietOption); try { init(daemonize, pidFile, quiet, env); } catch (NodeValidationException e) { throw new UserException(ExitCodes.CONFIG, e.getMessage()); } }
Bootstrap() { keepAliveThread = new Thread(new Runnable() { @Override public void run() { try { keepAliveLatch.await(); } catch (InterruptedException e) { // bail out } } }, "elasticsearch[keepAlive/" + Version.CURRENT + "]"); keepAliveThread.setDaemon(false); // keep this thread alive (non daemon thread) until we shutdown 保持這個線程存活(非守護進程線程),直到咱們關機 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { keepAliveLatch.countDown(); } }); }
private static void checkLucene() { if (Version.CURRENT.luceneVersion.equals(org.apache.lucene.util.Version.LATEST) == false) { throw new AssertionError("Lucene version mismatch this version of Elasticsearch requires lucene version [" + Version.CURRENT.luceneVersion + "] but the current lucene version is [" + org.apache.lucene.util.Version.LATEST + "]"); } }
// install the default uncaught exception handler; must be done before security is // initialized as we do not want to grant the runtime permission // 安裝默認未捕獲異常處理程序;必須在初始化security以前完成,由於咱們不想授予運行時權限 // setDefaultUncaughtExceptionHandler Thread.setDefaultUncaughtExceptionHandler( new ElasticsearchUncaughtExceptionHandler(() -> Node.NODE_NAME_SETTING.get(environment.settings())));
檢查用戶是否做爲根用戶運行,是的話拋異常;系統調用和mlockAll檢查;嘗試設置最大線程數,最大虛擬內存,最大FD等。
初始化探針initializeProbes(),用於操做系統,進程,jvm的監控。app
if (addShutdownHook) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { IOUtils.close(node, spawner); LoggerContext context = (LoggerContext) LogManager.getContext(false); Configurator.shutdown(context); } catch (IOException ex) { throw new ElasticsearchException("failed to stop node", ex); } } }); }
try { // look for jar hell JarHell.checkJarHell(); } catch (IOException | URISyntaxException e) { throw new BootstrapException(e); } // Log ifconfig output before SecurityManager is installed IfConfig.logIfNecessary(); // install SM after natives, shutdown hooks, etc. try { Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings)); } catch (IOException | NoSuchAlgorithmException e) { throw new BootstrapException(e); }
重寫validateNodeBeforeAcceptingRequests方法。具體主要包括三部分,第一是啓動插件服務(es提供了插件功能來進行擴展功能,這也是它的一個亮點),加載須要的插件,第二是配置node環境,最後就是經過guice加載各個模塊。下面22~32就是具體步驟。框架
// create the environment based on the finalized (processed) view of the settings 根據設置的最終(處理)視圖建立環境 // this is just to makes sure that people get the same settings, no matter where they ask them from 這只是爲了確保人們獲得相同的設置,不管他們從哪裏詢問 this.environment = new Environment(this.settings, environment.configFile());
ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterServiceless
ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModulejvm
client.initialize(injector.getInstance(new Key<Map<GenericAction, TransportAction>>() {}), () -> clusterService.localNode().getId());
if (NetworkModule.HTTP_ENABLED.get(settings)) { logger.debug("initializing HTTP handlers ..."); // 初始化http handler actionModule.initRestHandlers(() -> clusterService.state().nodes()); }
LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportServiceelasticsearch
if (WRITE_PORTS_FILE_SETTING.get(settings)) { if (NetworkModule.HTTP_ENABLED.get(settings)) { HttpServerTransport http = injector.getInstance(HttpServerTransport.class); writePortsFile("http", http.boundAddress()); } TransportService transport = injector.getInstance(TransportService.class); writePortsFile("transport", transport.boundAddress()); }
1.知道底層實現,可以更好地使用,出問題可以快速定位和解決。2.學習別人優秀的代碼和處理問題的方式,提升本身的系統設計能力。3.有機會能夠對其進行擴展和改造。