本文主要簡要介紹Elasticsearch單節點的啓動和關閉流程。Elasticsearch版本:6.3.2java
一、Google Guice 快速入門
二、Elasticsearch 中的 Guice
三、教你編譯調試Elasticsearch 6.3.2源碼
四、Elasticsearch 6.3.2 啓動過程node
Elasticsearch的啓動引導類爲 Bootstrap 類,在建立節點 Node 對象以前,Bootstrap 會解析配置和進行一些安全檢查等react
environment 對象主要是解析出來的配置信息bootstrap
建立節點過程的主要工做是建立各個模塊對象和服務對象,完成 Guice 依賴綁定,獲取並初始化探測器。設計模式
ModulesBuilder 用於統一管理 Module安全
ModulesBuilder modules = new ModulesBuilder(); ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService); modules.add(clusterModule); // 將模塊加入管理 //.... // 實例綁定 modules.add(b -> { b.bind(Node.class).toInstance(this); b.bind(NodeService.class).toInstance(nodeService); b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry); b.bind(PluginsService.class).toInstance(pluginsService); b.bind(Client.class).toInstance(client); b.bind(NodeClient.class).toInstance(client); b.bind(Environment.class).toInstance(this.environment); b.bind(ThreadPool.class).toInstance(threadPool); b.bind(NodeEnvironment.class).toInstance(nodeEnvironment); // .... } ); injector = modules.createInjector(); // 生成注入器
主要的服務類簡介以下:微信
服務 | 簡介 |
---|---|
ResourceWatcherService | 通用資源監視服務 |
HttpServerTransport | HTTP傳輸服務,提供Rest接口服務 |
SnapshotsService | 快照服務 |
SnapshotShardsService | 負責啓動和中止shard級快照 |
IndicesClusterStateService | 根據收到的集羣狀態信息,處理相關索引 |
Discovery | 集羣拓撲管理 |
RoutingService | 處理路由(節點之間遷移shard) |
ClusterService | 集羣管理服務,主要處理集羣任務,發佈集羣狀態 |
NodeConnectionsService | 節點鏈接管理服務 |
MonitorService | 提供進程級、系統級、文件系統和JVM的監控服務 |
GatewayService | 負責集羣元數據持久化與恢復 |
SearchService | 處理搜索請求 |
TransportService | 底層傳輸服務 |
plugins | 插件 |
IndicesService | 負責建立、刪除索引等索引操做 |
啓動節點的主要工做是啓動各個模塊的服務對象,服務對象從注入器 injector
中取出來,而後調用它們的 start
方法,服務對象的 start
方法的工做基本是初始化內部數據、建立線程池、啓動線程池等,詳細的流程留到後面的文章中再介紹。網絡
injector.getInstance(MappingUpdatedAction.class).setClient(client); injector.getInstance(IndicesService.class).start(); injector.getInstance(IndicesClusterStateService.class).start();
在啓動 Discovery 和 ClusterService 以前,還會調用 validateNodeBeforeAcceptingRequests 方法來檢測環境外部,外部環境主要是JVM、操做系統相關參數,將一些影響性能的配置標記爲錯誤以引發用戶的重視。多線程
節點的環境檢測代碼都封裝在 BootstrapChecks 類中,BootstrapChecks 類經過責任鏈模式對十幾個檢測項進行檢測,關於責任鏈模式能夠翻看這篇文章《設計模式之責任鏈模式及典型應用》app
這裏的責任鏈模式中的抽象處理者由 BootstrapCheck 接口扮演,它定義了一個處理方法 check
,而每一個檢查項則是具體處理者,都有對應的一個靜態類,具體的檢查則在 check
接口中完成
以第一個檢查項 "堆大小檢查" 爲例,從 JvmInfo 類中獲取配置的堆的初始值和最大值進行比較,不相等則格式化提示信息,最後返回檢查結果
static class HeapSizeCheck implements BootstrapCheck { @Override public BootstrapCheckResult check(BootstrapContext context) { final long initialHeapSize = getInitialHeapSize(); final long maxHeapSize = getMaxHeapSize(); if (initialHeapSize != 0 && maxHeapSize != 0 && initialHeapSize != maxHeapSize) { final String message = String.format(Locale.ROOT, "initial heap size [%d] not equal to maximum heap size [%d]; " + "this can cause resize pauses and prevents mlockall from locking the entire heap", getInitialHeapSize(), getMaxHeapSize()); return BootstrapCheckResult.failure(message); } else { return BootstrapCheckResult.success(); } } long getInitialHeapSize() { return JvmInfo.jvmInfo().getConfiguredInitialHeapSize(); } long getMaxHeapSize() { return JvmInfo.jvmInfo().getConfiguredMaxHeapSize(); } }
把全部檢查項的對象添加到一個 List 鏈中
static List<BootstrapCheck> checks() { final List<BootstrapCheck> checks = new ArrayList<>(); checks.add(new HeapSizeCheck()); final FileDescriptorCheck fileDescriptorCheck = Constants.MAC_OS_X ? new OsXFileDescriptorCheck() : new FileDescriptorCheck(); checks.add(fileDescriptorCheck); checks.add(new MlockallCheck()); if (Constants.LINUX) { checks.add(new MaxNumberOfThreadsCheck()); } if (Constants.LINUX || Constants.MAC_OS_X) { checks.add(new MaxSizeVirtualMemoryCheck()); } if (Constants.LINUX || Constants.MAC_OS_X) { checks.add(new MaxFileSizeCheck()); } if (Constants.LINUX) { checks.add(new MaxMapCountCheck()); } checks.add(new ClientJvmCheck()); checks.add(new UseSerialGCCheck()); checks.add(new SystemCallFilterCheck()); checks.add(new OnErrorCheck()); checks.add(new OnOutOfMemoryErrorCheck()); checks.add(new EarlyAccessCheck()); checks.add(new G1GCCheck()); checks.add(new AllPermissionCheck()); return Collections.unmodifiableList(checks); }
for 循環分別調用 check 方法進行檢查,有些檢查項檢查不經過是能夠忽略的,若是有不能忽略的錯誤則會拋出異常
for (final BootstrapCheck check : checks) { final BootstrapCheck.BootstrapCheckResult result = check.check(context); if (result.isFailure()) { if (!(enforceLimits || enforceBootstrapChecks) && !check.alwaysEnforce()) { ignoredErrors.add(result.getMessage()); } else { errors.add(result.getMessage()); } } }
那麼檢查項有哪些呢?
堆大小檢查
:若是開啓了bootstrap.memory_lock
,則JVM在啓動時將鎖定堆的初始大小,若配置的初始值與最大值不等,堆變化後沒法保證堆都鎖定在內存中文件描述符檢查
:ES進程須要很是多的文件描述符,因此須配置系統的文件描述符的最大數量 ulimit -n 65535
內存鎖定檢查
:ES容許進程只使用物理內存,若使用交換分區可能會帶來不少問題,因此最好讓ES鎖定內存最大線程數檢查
:ES進程會建立不少線程,這個數最少需2048最大虛擬內存檢查
最大文件大小檢查
:段文件和事務日誌文件可能會很是大,建議這個數設置爲無限虛擬內存區域最大數量檢查
JVM Client模式檢查
串行收集檢查
:ES默認使用 CMS 垃圾回收器,而不是 Serial 收集器系統調用過濾器檢查
OnError與OnOutOfMemoryError檢查
Early-access檢查
:ES最好運行在JVM的穩定版本上G1GC檢查
順便一提,JvmInfo 則是利用了 JavaSDK 自帶的 ManagementFactory 類來獲取JVM信息的,獲取的 JVM 屬性以下所示
long pid; // 進程ID String version; // Java版本 String vmName; // JVM名稱 String vmVersion; // JVM版本 String vmVendor; // JVM開發商 long startTime; // 啓動時間 long configuredInitialHeapSize; // 配置的堆的初始值 long configuredMaxHeapSize; // 配置的堆的最大值 Mem mem; // 內存信息 String[] inputArguments; // JVM啓動時輸入的參數 String bootClassPath; String classPath; Map<String, String> systemProperties; // 系統環境變量 String[] gcCollectors; String[] memoryPools; String onError; String onOutOfMemoryError; String useCompressedOops; String useG1GC; // 是否使用 G1 垃圾回收器 String useSerialGC; // 是否使用 Serial 垃圾回收器
在啓動引導類 Bootstrap 的 start 方法中,啓動節點以後還會啓動一個 keepAlive 線程
private void start() throws NodeValidationException { node.start(); keepAliveThread.start(); } // CountDownLatch 初始值爲 1 private final CountDownLatch keepAliveLatch = new CountDownLatch(1); Bootstrap() { keepAliveThread = new Thread(new Runnable() { @Override public void run() { try { keepAliveLatch.await(); // 一直等待直到 CountDownLatch 減爲 0 } catch (InterruptedException e) { // bail out } } }, "elasticsearch[keepAlive/" + Version.CURRENT + "]"); keepAliveThread.setDaemon(false); // false 用戶線程 // keep this thread alive (non daemon thread) until we shutdown Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { // 當進程收到關閉 SIGTERM 或 SIGINT 信號時,CountDownLatch 減1 keepAliveLatch.countDown(); } }); } if (addShutdownHook) { Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { try { IOUtils.close(node, spawner); LoggerContext context = (LoggerContext) LogManager.getContext(false); Configurator.shutdown(context); } catch (IOException ex) { throw new ElasticsearchException("failed to stop node", ex); } } }); }
keepAliveThread 線程自己不作具體的工做。主線程執行完啓動流程後會退出,keepAliveThread 線程是惟一的用戶線程,做用是保持進程運行。在Java程序中,一個進程至少須要有一個用戶線程,當用戶線程爲零時將退出進程。
作個試驗,將 keepAliveThread.setDaemon(false);
中的 false
改成 true
,會發現Elasticsearch啓動後立刻就中止了
[2019-01-08T01:28:47,522][INFO ][o.e.n.Node ] [1yGidog] started [2019-01-08T01:28:47,525][INFO ][o.e.n.Node ] [1yGidog] stopping ...
關閉的順序大體爲:
public static void close(final Exception ex, final Iterable<? extends Closeable> objects) throws IOException { Exception firstException = ex; for (final Closeable object : objects) { try { if (object != null) { object.close(); } } catch (final IOException | RuntimeException e) { if (firstException == null) { firstException = e; } else { firstException.addSuppressed(e); } } } // ... } private Node stop() { if (!lifecycle.moveToStopped()) { return this; } Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings)); logger.info("stopping ..."); injector.getInstance(ResourceWatcherService.class).stop(); if (NetworkModule.HTTP_ENABLED.get(settings)) { injector.getInstance(HttpServerTransport.class).stop(); } injector.getInstance(SnapshotsService.class).stop(); injector.getInstance(SnapshotShardsService.class).stop(); // stop any changes happening as a result of cluster state changes injector.getInstance(IndicesClusterStateService.class).stop(); // close discovery early to not react to pings anymore. // This can confuse other nodes and delay things - mostly if we're the master and we're running tests. injector.getInstance(Discovery.class).stop(); // we close indices first, so operations won't be allowed on it injector.getInstance(RoutingService.class).stop(); injector.getInstance(ClusterService.class).stop(); injector.getInstance(NodeConnectionsService.class).stop(); nodeService.getMonitorService().stop(); injector.getInstance(GatewayService.class).stop(); injector.getInstance(SearchService.class).stop(); injector.getInstance(TransportService.class).stop(); pluginLifecycleComponents.forEach(LifecycleComponent::stop); // we should stop this last since it waits for resources to get released // if we had scroll searchers etc or recovery going on we wait for to finish. injector.getInstance(IndicesService.class).stop(); logger.info("stopped"); return this; }
節點的關閉固然沒那麼簡單。更多細節敬請期待。
參考:
張超.Elasticsearch源碼解析與優化實戰
歡迎評論、轉發、分享,您的支持是我最大的動力
更多內容可訪問個人我的博客:http://laijianfeng.org
關注【小旋鋒】微信公衆號,及時接收博文推送