Elasticsearch源碼分析 | 單節點的啓動和關閉

本文主要簡要介紹Elasticsearch單節點的啓動和關閉流程。Elasticsearch版本:6.3.2html

相關文章

一、Google Guice 快速入門
二、Elasticsearch 中的 Guice
三、教你編譯調試Elasticsearch 6.3.2源碼
四、Elasticsearch 6.3.2 啓動過程java

建立節點

Elasticsearch的啓動引導類爲 Bootstrap 類,在建立節點 Node 對象以前,Bootstrap 會解析配置和進行一些安全檢查等node

建立節點對象

environment 對象主要是解析出來的配置信息react

environment 對象

建立節點過程的主要工做是建立各個模塊對象和服務對象,完成 Guice 依賴綁定,獲取並初始化探測器。bootstrap

ModulesBuilder 用於統一管理 Module設計模式

ModulesBuilder modules = new ModulesBuilder();
ClusterModule clusterModule = new ClusterModule(settings, clusterService, clusterPlugins, clusterInfoService);
modules.add(clusterModule);     // 將模塊加入管理
//....
// 實例綁定
modules.add(b -> {
        b.bind(Node.class).toInstance(this);
        b.bind(NodeService.class).toInstance(nodeService);
        b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
        b.bind(PluginsService.class).toInstance(pluginsService);
        b.bind(Client.class).toInstance(client);
        b.bind(NodeClient.class).toInstance(client);
        b.bind(Environment.class).toInstance(this.environment);
        b.bind(ThreadPool.class).toInstance(threadPool);
        b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
        // ....
    }
);
injector = modules.createInjector();    // 生成注入器

主要的服務類簡介以下:安全

服務 簡介
ResourceWatcherService 通用資源監視服務
HttpServerTransport HTTP傳輸服務,提供Rest接口服務
SnapshotsService 快照服務
SnapshotShardsService 負責啓動和中止shard級快照
IndicesClusterStateService 根據收到的集羣狀態信息,處理相關索引
Discovery 集羣拓撲管理
RoutingService 處理路由(節點之間遷移shard)
ClusterService 集羣管理服務,主要處理集羣任務,發佈集羣狀態
NodeConnectionsService 節點鏈接管理服務
MonitorService 提供進程級、系統級、文件系統和JVM的監控服務
GatewayService 負責集羣元數據持久化與恢復
SearchService 處理搜索請求
TransportService 底層傳輸服務
plugins 插件
IndicesService 負責建立、刪除索引等索引操做

啓動節點

啓動節點的主要工做是啓動各個模塊的服務對象,服務對象從注入器 injector 中取出來,而後調用它們的 start 方法,服務對象的 start 方法的工做基本是初始化內部數據、建立線程池、啓動線程池等,詳細的流程留到後面的文章中再介紹。微信

injector.getInstance(MappingUpdatedAction.class).setClient(client);
injector.getInstance(IndicesService.class).start();
injector.getInstance(IndicesClusterStateService.class).start();

在啓動 Discovery 和 ClusterService 以前,還會調用 validateNodeBeforeAcceptingRequests 方法來檢測環境外部,外部環境主要是JVM、操做系統相關參數,將一些影響性能的配置標記爲錯誤以引發用戶的重視。網絡

環境檢測

節點的環境檢測代碼都封裝在 BootstrapChecks 類中,BootstrapChecks 類經過責任鏈模式對十幾個檢測項進行檢測,關於責任鏈模式能夠翻看這篇文章《設計模式之責任鏈模式及典型應用多線程

這裏的責任鏈模式中的抽象處理者由 BootstrapCheck 接口扮演,它定義了一個處理方法 check,而每一個檢查項則是具體處理者,都有對應的一個靜態類,具體的檢查則在 check 接口中完成

以第一個檢查項 "堆大小檢查" 爲例,從 JvmInfo 類中獲取配置的堆的初始值和最大值進行比較,不相等則格式化提示信息,最後返回檢查結果

static class HeapSizeCheck implements BootstrapCheck {
        @Override
        public BootstrapCheckResult check(BootstrapContext context) {
            final long initialHeapSize = getInitialHeapSize();
            final long maxHeapSize = getMaxHeapSize();
            if (initialHeapSize != 0 && maxHeapSize != 0 && initialHeapSize != maxHeapSize) {
                final String message = String.format(Locale.ROOT,
                        "initial heap size [%d] not equal to maximum heap size [%d]; " +
                                "this can cause resize pauses and prevents mlockall from locking the entire heap",
                        getInitialHeapSize(), getMaxHeapSize());
                return BootstrapCheckResult.failure(message);
            } else {
                return BootstrapCheckResult.success();
            }
        }
        long getInitialHeapSize() { 
            return JvmInfo.jvmInfo().getConfiguredInitialHeapSize();
        }
        long getMaxHeapSize() {
            return JvmInfo.jvmInfo().getConfiguredMaxHeapSize();
        }
    }

把全部檢查項的對象添加到一個 List 鏈中

static List<BootstrapCheck> checks() {
        final List<BootstrapCheck> checks = new ArrayList<>();
        checks.add(new HeapSizeCheck());
        final FileDescriptorCheck fileDescriptorCheck
            = Constants.MAC_OS_X ? new OsXFileDescriptorCheck() : new FileDescriptorCheck();
        checks.add(fileDescriptorCheck);
        checks.add(new MlockallCheck());
        if (Constants.LINUX) {
            checks.add(new MaxNumberOfThreadsCheck());
        }
        if (Constants.LINUX || Constants.MAC_OS_X) {
            checks.add(new MaxSizeVirtualMemoryCheck());
        }
        if (Constants.LINUX || Constants.MAC_OS_X) {
            checks.add(new MaxFileSizeCheck());
        }
        if (Constants.LINUX) {
            checks.add(new MaxMapCountCheck());
        }
        checks.add(new ClientJvmCheck());
        checks.add(new UseSerialGCCheck());
        checks.add(new SystemCallFilterCheck());
        checks.add(new OnErrorCheck());
        checks.add(new OnOutOfMemoryErrorCheck());
        checks.add(new EarlyAccessCheck());
        checks.add(new G1GCCheck());
        checks.add(new AllPermissionCheck());
        return Collections.unmodifiableList(checks);
    }

for 循環分別調用 check 方法進行檢查,有些檢查項檢查不經過是能夠忽略的,若是有不能忽略的錯誤則會拋出異常

for (final BootstrapCheck check : checks) {
    final BootstrapCheck.BootstrapCheckResult result = check.check(context);
    if (result.isFailure()) {
        if (!(enforceLimits || enforceBootstrapChecks) && !check.alwaysEnforce()) {
            ignoredErrors.add(result.getMessage());
        } else {
            errors.add(result.getMessage());
        }
    }
}

那麼檢查項有哪些呢?

  • 堆大小檢查:若是開啓了bootstrap.memory_lock,則JVM在啓動時將鎖定堆的初始大小,若配置的初始值與最大值不等,堆變化後沒法保證堆都鎖定在內存中
  • 文件描述符檢查:ES進程須要很是多的文件描述符,因此須配置系統的文件描述符的最大數量 ulimit -n 65535
  • 內存鎖定檢查:ES容許進程只使用物理內存,若使用交換分區可能會帶來不少問題,因此最好讓ES鎖定內存
  • 最大線程數檢查:ES進程會建立不少線程,這個數最少需2048
  • 最大虛擬內存檢查
  • 最大文件大小檢查:段文件和事務日誌文件可能會很是大,建議這個數設置爲無限
  • 虛擬內存區域最大數量檢查
  • JVM Client模式檢查
  • 串行收集檢查:ES默認使用 CMS 垃圾回收器,而不是 Serial 收集器
  • 系統調用過濾器檢查
  • OnError與OnOutOfMemoryError檢查
  • Early-access檢查:ES最好運行在JVM的穩定版本上
  • G1GC檢查

順便一提,JvmInfo 則是利用了 JavaSDK 自帶的 ManagementFactory 類來獲取JVM信息的,獲取的 JVM 屬性以下所示

long pid;   // 進程ID
String version; // Java版本
String vmName;  // JVM名稱
String vmVersion;   // JVM版本
String vmVendor;    // JVM開發商
long startTime;     // 啓動時間
long configuredInitialHeapSize; // 配置的堆的初始值
long configuredMaxHeapSize;     // 配置的堆的最大值
Mem mem;            // 內存信息
String[] inputArguments;    // JVM啓動時輸入的參數
String bootClassPath;
String classPath;   
Map<String, String> systemProperties;   // 系統環境變量
String[] gcCollectors;
String[] memoryPools;
String onError;
String onOutOfMemoryError;
String useCompressedOops;
String useG1GC;     // 是否使用 G1 垃圾回收器
String useSerialGC; // 是否使用 Serial 垃圾回收器

keepAlive 線程

在啓動引導類 Bootstrap 的 start 方法中,啓動節點以後還會啓動一個 keepAlive 線程

private void start() throws NodeValidationException {
    node.start();
    keepAliveThread.start();
}

// CountDownLatch 初始值爲 1
private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
Bootstrap() {
    keepAliveThread = new Thread(new Runnable() {
        @Override
        public void run() {
            try {
                keepAliveLatch.await(); // 一直等待直到 CountDownLatch 減爲 0
            } catch (InterruptedException e) {
                // bail out
            }
        }
    }, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
    keepAliveThread.setDaemon(false);   // false 用戶線程
    // keep this thread alive (non daemon thread) until we shutdown
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            // 當進程收到關閉 SIGTERM 或 SIGINT 信號時,CountDownLatch 減1 
            keepAliveLatch.countDown();
        }
    });
}

if (addShutdownHook) {
    Runtime.getRuntime().addShutdownHook(new Thread() {
        @Override
        public void run() {
            try {
                IOUtils.close(node, spawner);
                LoggerContext context = (LoggerContext) LogManager.getContext(false);
                Configurator.shutdown(context);
            } catch (IOException ex) {
                throw new ElasticsearchException("failed to stop node", ex);
            }
        }
    });
}

keepAliveThread 線程自己不作具體的工做。主線程執行完啓動流程後會退出,keepAliveThread 線程是惟一的用戶線程,做用是保持進程運行。在Java程序中,一個進程至少須要有一個用戶線程,當用戶線程爲零時將退出進程。

作個試驗,將 keepAliveThread.setDaemon(false); 中的 false 改成 true,會發現Elasticsearch啓動後立刻就中止了

[2019-01-08T01:28:47,522][INFO ][o.e.n.Node               ] [1yGidog] started
[2019-01-08T01:28:47,525][INFO ][o.e.n.Node               ] [1yGidog] stopping ...

關閉節點

關閉的順序大體爲:

  • 關閉快照和HTTPServer,再也不響應用戶REST請求
  • 關閉集羣拓撲管理,再也不響應ping請求
  • 關閉網絡模塊,讓節點離線
  • 執行各個插件的關閉流程
  • 關閉IndicesService,這期間須要等待釋放的資源最多,時間最長
public static void close(final Exception ex, final Iterable<? extends Closeable> objects) throws IOException {
    Exception firstException = ex;
    for (final Closeable object : objects) {
        try {
            if (object != null) {
                object.close();
            }
        } catch (final IOException | RuntimeException e) {
            if (firstException == null) {
                firstException = e;
            } else {
                firstException.addSuppressed(e);
            }
        }
    }
    // ...
}

private Node stop() {
    if (!lifecycle.moveToStopped()) {
        return this;
    }
    Logger logger = Loggers.getLogger(Node.class, NODE_NAME_SETTING.get(settings));
    logger.info("stopping ...");

    injector.getInstance(ResourceWatcherService.class).stop();
    if (NetworkModule.HTTP_ENABLED.get(settings)) {
        injector.getInstance(HttpServerTransport.class).stop();
    }

    injector.getInstance(SnapshotsService.class).stop();
    injector.getInstance(SnapshotShardsService.class).stop();
    // stop any changes happening as a result of cluster state changes
    injector.getInstance(IndicesClusterStateService.class).stop();
    // close discovery early to not react to pings anymore.
    // This can confuse other nodes and delay things - mostly if we're the master and we're running tests.
    injector.getInstance(Discovery.class).stop();
    // we close indices first, so operations won't be allowed on it
    injector.getInstance(RoutingService.class).stop();
    injector.getInstance(ClusterService.class).stop();
    injector.getInstance(NodeConnectionsService.class).stop();
    nodeService.getMonitorService().stop();
    injector.getInstance(GatewayService.class).stop();
    injector.getInstance(SearchService.class).stop();
    injector.getInstance(TransportService.class).stop();

    pluginLifecycleComponents.forEach(LifecycleComponent::stop);
    // we should stop this last since it waits for resources to get released
    // if we had scroll searchers etc or recovery going on we wait for to finish.
    injector.getInstance(IndicesService.class).stop();
    logger.info("stopped");

    return this;
}

節點的關閉固然沒那麼簡單。更多細節敬請期待。

參考:
張超.Elasticsearch源碼解析與優化實戰

後記

歡迎評論、轉發、分享,您的支持是我最大的動力

更多內容可訪問個人我的博客:http://laijianfeng.org

關注【小旋鋒】微信公衆號,及時接收博文推送

關注_小旋鋒_微信公衆號

原文出處:https://www.cnblogs.com/whirly/p/10236827.html

相關文章
相關標籤/搜索