前言
本文探究Elasticsearch 6.3.2的啓動流程javascript
環境準備
使用工具:IDEA,XMindcss
關於ES調試環境的搭建,能夠參考前面的文章 《教你編譯調試Elasticsearch 6.3.2源碼》java
而後經過設置斷點,從 org.elasticsearch.bootstrap.ElasticSearch
的入口函數開始,一步一步調試node
上圖爲使用 IDEA 2018.2 進行調試的一個截圖,左上角84行出紅點爲一個斷點,一、二、3編號的3個按鈕是較爲經常使用的按鈕,做用以下:sql
按鈕1:step over,執行到下一行,遇到方法不進入方法內部apache
按鈕2:step into,執行到下一句代碼,遇到方法則進入方法內部編程
按鈕3:Run to cursor,執行到下一個斷點處,後面沒有斷點則執行到結束bootstrap
經過XMind記錄ES啓動流程的整個過程
根據上圖,做者大概地把ES啓動流程分爲四個階段:緩存
Elasticsearch 解析 Command,加載配置安全
Bootstrap 初始化,資源檢查
Node 建立節點
Bootstrap 啓動節點和保活線程
Elasticsearch 解析 Command,加載配置
首先能夠看一下入口方法 Elasticsearch.main
:
public static void main(final String[] args) throws Exception {
System.setSecurityManager(new SecurityManager() {
@Override
public void checkPermission(Permission perm) {
// grant all permissions so that we can later set the security manager to the one that we want
}
});
LogConfigurator.registerErrorListener();
final Elasticsearch elasticsearch = new Elasticsearch();
int status = main(args, elasticsearch, Terminal.DEFAULT);
if (status != ExitCodes.OK) {
exit(status);
}
}
1.1, 建立 SecurityManager 安全管理器
關於 SecurityManager:
安全管理器在Java語言中的做用就是檢查操做是否有權限執行,經過則順序進行,不然拋出一個異常
網上一篇文章:Java安全——安全管理器、訪問控制器和類裝載器
1.2, LogConfigurator.registerErrorListener() 註冊偵聽器
1.3, 建立Elasticsearch對象
Elasticsearch 入口類的繼承關係以下:
能夠看到Elasticsearch繼承了EnvironmentAwareCommand,Command,這幾個類的功能簡要介紹以下:
Elasticsearch: This class starts elasticsearch.
EnvironmentAwareCommand: A cli command which requires an
org.elasticsearch.env.Environment
to use current paths and settingsCommand: An action to execute within a cli.
能夠看出Elasticsearch的一個重要做用是解析命令參數
執行帶 -h
參數的Elasticsearch啓動命令
能夠發現這幾個參數與 Cammand 類 和 Elasticsearch 的幾個私有變量是對應的
Elasticsearch的構造函數以下:
Elasticsearch() {
super("starts elasticsearch", () -> {}); // we configure logging later so we override the base class from configuring logging
versionOption = parser.acceptsAll(Arrays.asList("V", "version"), "Prints elasticsearch version information and exits");
daemonizeOption = parser.acceptsAll(Arrays.asList("d", "daemonize"), "Starts Elasticsearch in the background")
.availableUnless(versionOption);
pidfileOption = parser.acceptsAll(Arrays.asList("p", "pidfile"), "Creates a pid file in the specified path on start")
.availableUnless(versionOption).withRequiredArg().withValuesConvertedBy(new PathConverter());
quietOption = parser.acceptsAll(Arrays.asList("q", "quiet"), "Turns off standard output/error streams logging in console")
.availableUnless(versionOption).availableUnless(daemonizeOption);
}
1.4, 接着進入 Command.main
方法
該方法給當前Runtime類添加一個hook線程,該線程做用是:當Runtime異常關閉時打印異常信息
1.5, Command.mainWithoutErrorHandling
方法,根據命令行參數,打印或者設置參數,而後執行命令,有異常則拋出全部異常
1.6, EnvironmentAwareCommand.execute
,確保 es.path.data
, es.path.home
, es.path.logs
等參數已設置,不然從 System.properties
中讀取
putSystemPropertyIfSettingIsMissing(settings, "path.data", "es.path.data");
putSystemPropertyIfSettingIsMissing(settings, "path.home", "es.path.home");
putSystemPropertyIfSettingIsMissing(settings, "path.logs", "es.path.logs");
execute(terminal, options, createEnv(terminal, settings));
1.7, EnvironmentAwareCommand.createEnv
,讀取config下的配置文件elasticsearch.yml
內容,收集plugins,bin,lib,modules等目錄下的文件信息
createEnv最後返回一個 Environment 對象,執行結果以下
1.8, Elasticsearch.execute
,讀取daemonize, pidFile,quiet 的值,並 確保配置的臨時目錄(temp)是有效目錄
進入Bootstrap初始化階段
Bootstrap.init(!daemonize, pidFile, quiet, initialEnv);
Bootstrap初始化階段
Bootstrap.init
2.1, 進入 Bootstrap.init
, This method is invoked by Elasticsearch#main(String[])
to startup elasticsearch.
INSTANCE = new Bootstrap();
, 建立一個Bootstrap對象做爲類對象,該類構造函數會建立一個用戶線程,添加到Runtime Hook中,進行 countDown 操做
private final CountDownLatch keepAliveLatch = new CountDownLatch(1);
/** creates a new instance */
Bootstrap() {
keepAliveThread = new Thread(new Runnable() {
@Override
public void run() {
try {
keepAliveLatch.await();
} catch (InterruptedException e) {
}
}
}, "elasticsearch[keepAlive/" + Version.CURRENT + "]");
keepAliveThread.setDaemon(false);
// keep this thread alive (non daemon thread) until we shutdown
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
keepAliveLatch.countDown();
}
});
}
CountDownLatch是一個同步工具類,它容許一個或多個線程一直等待,直到其餘線程執行完後再執行。例如,應用程序的主線程但願在負責啓動框架服務的線程已經啓動全部框架服務以後執行。
CountDownLatch是經過一個計數器來實現的,計數器的初始化值爲線程的數量。每當一個線程完成了本身的任務後,計數器的值就相應得減1。當計數器到達0時,表示全部的線程都已完成任務,而後在閉鎖上等待的線程就能夠恢復執行任務。
更多介紹請看文章:併發工具類 CountDownLatch
2.2, 加載 keystore 安全配置,keystore文件不存在則建立,保存;存在則解密,更新keystore
2.3, 根據已有的配置信息,建立一個Environment對象
2.4, LogConfigurator log4j日誌配置
2.5, 檢查pid文件是否存在,不存在則建立
關於 pid 文件:
(1) pid文件的內容:pid文件爲文本文件,內容只有一行,記錄了該進程的ID,用cat命令能夠看到。
(2) pid文件的做用:防止進程啓動多個副本。只有得到pid文件(固定路徑固定文件名)寫入權限(F_WRLCK)的進程才能正常啓動並把自身的PID寫入該文件中,其它同一個程序的多餘進程則自動退出。
2.6, 檢查Lucene版本與實際的Lucene Jar文件的版本是否一致,不一致則拋異常
2.7, 設置未捕獲異常的處理 Thread.setDefaultUncaughtExceptionHandler
在Thread ApI中提供了UncaughtExceptionHandle,它能檢測出某個因爲未捕獲的異常而終結的狀況
朱小廝 JAVA多線程之UncaughtExceptionHandler——處理非正常的線程停止
INSTANCE.setup(true, environment);
3.1,spawner.spawnNativeControllers(environment);
遍歷每一個模塊,生成本機控制類(native Controller):讀取modules文件夾下全部的文件夾中的模塊信息,保存爲一個 PluginInfo 對象,爲合適的模塊生成控制類,經過 Files.isRegularFile(spawnPath)
來判斷
嘗試爲給定模塊生成控制器(native Controller)守護程序。 生成的進程將經過其stdin,stdout和stderr流保持與此JVM的鏈接,但對此包以外的代碼不能使用對這些流的引用。
3.2, initializeNatives(Path tmpFile, boolean mlockAll, boolean systemCallFilter, boolean ctrlHandler)
初始化本地資源
檢查用戶是否爲root用戶,是則拋異常;
嘗試啓用 系統調用過濾器 system call filter;
若是設置了則進行 mlockall
Windows關閉事件監聽器
init lucene random seed.
這個過程當中使用到了 Natives 類:
Natives類是一個包裝類,用於檢查調用本機方法所需的類是否在啓動時可用。若是它們不可用,則此類將避免調用加載這些類的代碼
3.3, 添加一個Hook: Runtime.getRuntime().addShutdownHook,當ES退出時用於關閉必要的IO流,日誌器上下文和配置器等
3.4, 使用 JarHell 檢查重複的 jar 文件
3.5, 初始化 SecurityManager
// install SM after natives, shutdown hooks, etc.
Security.configure(environment, BootstrapSettings.SECURITY_FILTER_BAD_DEFAULTS_SETTING.get(settings));
建立 node 節點
node = new Node(environment) {
@Override
protected void validateNodeBeforeAcceptingRequests(
final BootstrapContext context,
final BoundTransportAddress boundTransportAddress, List<BootstrapCheck> checks) throws NodeValidationException {
BootstrapChecks.check(context, boundTransportAddress, checks);
}
};
4.1, 這裏直接貼一下代碼(前半部分)(限於篇幅,完整文章請點擊閱讀原文)
這裏進行的主要操做有:
生命週期Lifecycle設置爲 初始化狀態 INITIALIZED
建立一個 NodeEnvironment 對象保存節點環境信息,如各類數據文件的路徑
讀取JVM信息
建立 PluginsService 對象,建立過程當中會讀取並加載全部的模塊和插件
建立一個最終的 Environment 對象
建立線程池 ThreadPool 後面各種對象基本都是經過線程來提供服務,這個線程池能夠管理各種線程
建立 節點客戶端 NodeClient
這裏重點介紹 PluginsService 和 ThreadPool 這兩個類
PluginsService
在構造該類對象是傳入的參數以下:
在構造方法中加載全部的模塊
Set<Bundle> seenBundles = new LinkedHashSet<>();
List<PluginInfo> modulesList = new ArrayList<>();
Set<Bundle> modules = getModuleBundles(modulesDirectory);
for (Bundle bundle : modules) {
modulesList.add(bundle.plugin);
}
seenBundles.addAll(modules);
/** Get bundles for plugins installed in the given modules directory. */
static Set<Bundle> getModuleBundles(Path modulesDirectory) throws IOException {
return findBundles(modulesDirectory, "module").stream().flatMap(b -> b.bundles().stream()).collect(Collectors.toSet());
}
其中的 Bundle是一個內部類(a "bundle" is a group of plugins in a single classloader)
而 PluginInfo 則是 An in-memory representation of the plugin descriptor. 存在內存中的用來描述一個 plugin 的類
插件加載的實際代碼以下:
/**
* Reads the plugin descriptor file.
*
* @param path the path to the root directory for the plugin
* @return the plugin info
* @throws IOException if an I/O exception occurred reading the plugin descriptor
*/
public static PluginInfo readFromProperties(final Path path) throws IOException {
final Path descriptor = path.resolve(ES_PLUGIN_PROPERTIES);
final Map<String, String> propsMap;
{
final Properties props = new Properties();
try (InputStream stream = Files.newInputStream(descriptor)) {
props.load(stream);
}
propsMap = props.stringPropertyNames().stream().collect(Collectors.toMap(Function.identity(), props::getProperty));
}
final String name = propsMap.remove("name");
if (name == null || name.isEmpty()) {
throw new IllegalArgumentException(
"property [name] is missing in [" + descriptor + "]");
}
final String description = propsMap.remove("description");
if (description == null) {
throw new IllegalArgumentException(
"property [description] is missing for plugin [" + name + "]");
}
final String version = propsMap.remove("version");
if (version == null) {
throw new IllegalArgumentException(
"property [version] is missing for plugin [" + name + "]");
}
final String esVersionString = propsMap.remove("elasticsearch.version");
if (esVersionString == null) {
throw new IllegalArgumentException(
"property [elasticsearch.version] is missing for plugin [" + name + "]");
}
final Version esVersion = Version.fromString(esVersionString);
final String javaVersionString = propsMap.remove("java.version");
if (javaVersionString == null) {
throw new IllegalArgumentException(
"property [java.version] is missing for plugin [" + name + "]");
}
JarHell.checkVersionFormat(javaVersionString);
final String classname = propsMap.remove("classname");
if (classname == null) {
throw new IllegalArgumentException(
"property [classname] is missing for plugin [" + name + "]");
}
final String extendedString = propsMap.remove("extended.plugins");
final List<String> extendedPlugins;
if (extendedString == null) {
extendedPlugins = Collections.emptyList();
} else {
extendedPlugins = Arrays.asList(Strings.delimitedListToStringArray(extendedString, ","));
}
final String hasNativeControllerValue = propsMap.remove("has.native.controller");
final boolean hasNativeController;
if (hasNativeControllerValue == null) {
hasNativeController = false;
} else {
switch (hasNativeControllerValue) {
case "true":
hasNativeController = true;
break;
case "false":
hasNativeController = false;
break;
default:
final String message = String.format(
Locale.ROOT,
"property [%s] must be [%s], [%s], or unspecified but was [%s]",
"has_native_controller",
"true",
"false",
hasNativeControllerValue);
throw new IllegalArgumentException(message);
}
}
if (esVersion.before(Version.V_6_3_0) && esVersion.onOrAfter(Version.V_6_0_0_beta2)) {
propsMap.remove("requires.keystore");
}
if (propsMap.isEmpty() == false) {
throw new IllegalArgumentException("Unknown properties in plugin descriptor: " + propsMap.keySet());
}
return new PluginInfo(name, description, version, esVersion, javaVersionString,
classname, extendedPlugins, hasNativeController);
}
其中的兩個常量的值
public static final String ES_PLUGIN_PROPERTIES = "plugin-descriptor.properties";
public static final String ES_PLUGIN_POLICY = "plugin-security.policy";
從以上代碼能夠看出模塊的加載過程:
讀取模塊的配置文件
plugin-descriptor.properties
,解析出內容並存儲到 Map 中分別校驗
name
,description
,version
,elasticsearch.version
,java.version
,classname
,extended.plugins
,has.native.controller
,requires.keystore
這些配置項,缺失或者不按要求則拋出異常根據配置項構造一個 PluginInfo 對象返回
舉例:讀取出的 aggs-matrix-stats 模塊的配置項信息以下
加載插件與加載模塊調用的是相同的方法
ThreadPool 線程池
線程池的構造方法以下:
public ThreadPool(final Settings settings, final ExecutorBuilder<?>... customBuilders) {
super(settings);
assert Node.NODE_NAME_SETTING.exists(settings);
final Map<String, ExecutorBuilder> builders = new HashMap<>();
final int availableProcessors = EsExecutors.numberOfProcessors(settings);
final int halfProcMaxAt5 = halfNumberOfProcessorsMaxFive(availableProcessors);
final int halfProcMaxAt10 = halfNumberOfProcessorsMaxTen(availableProcessors);
final int genericThreadPoolMax = boundedBy(4 * availableProcessors, 128, 512);
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30)));
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200));
builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000));
builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16));
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings,
Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
builders.put(Names.MANAGEMENT, new ScalingExecutorBuilder(Names.MANAGEMENT, 1, 5, TimeValue.timeValueMinutes(5)));
// no queue as this means clients will need to handle rejections on listener queue even if the operation succeeded
// the assumption here is that the listeners should be very lightweight on the listeners side
builders.put(Names.LISTENER, new FixedExecutorBuilder(settings, Names.LISTENER, halfProcMaxAt10, -1));
builders.put(Names.FLUSH, new ScalingExecutorBuilder(Names.FLUSH, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.REFRESH, new ScalingExecutorBuilder(Names.REFRESH, 1, halfProcMaxAt10, TimeValue.timeValueMinutes(5)));
builders.put(Names.WARMER, new ScalingExecutorBuilder(Names.WARMER, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.SNAPSHOT, new ScalingExecutorBuilder(Names.SNAPSHOT, 1, halfProcMaxAt5, TimeValue.timeValueMinutes(5)));
builders.put(Names.FETCH_SHARD_STARTED, new ScalingExecutorBuilder(Names.FETCH_SHARD_STARTED, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
builders.put(Names.FORCE_MERGE, new FixedExecutorBuilder(settings, Names.FORCE_MERGE, 1, -1));
builders.put(Names.FETCH_SHARD_STORE, new ScalingExecutorBuilder(Names.FETCH_SHARD_STORE, 1, 2 * availableProcessors, TimeValue.timeValueMinutes(5)));
for (final ExecutorBuilder<?> builder : customBuilders) {
if (builders.containsKey(builder.name())) {
throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists");
}
builders.put(builder.name(), builder);
}
this.builders = Collections.unmodifiableMap(builders);
threadContext = new ThreadContext(settings);
final Map<String, ExecutorHolder> executors = new HashMap<>();
for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) {
final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings);
final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
if (executors.containsKey(executorHolder.info.getName())) {
throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered");
}
logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info));
executors.put(entry.getKey(), executorHolder);
}
executors.put(Names.SAME, new ExecutorHolder(DIRECT_EXECUTOR, new Info(Names.SAME, ThreadPoolType.DIRECT)));
this.executors = unmodifiableMap(executors);
this.scheduler = Scheduler.initScheduler(settings);
TimeValue estimatedTimeInterval = ESTIMATED_TIME_INTERVAL_SETTING.get(settings);
this.cachedTimeThread = new CachedTimeThread(EsExecutors.threadName(settings, "[timer]"), estimatedTimeInterval.millis());
this.cachedTimeThread.start();
}
參考着文檔來理解這裏的代碼:Elasticsearch Reference [6.4] » Modules » Thread Pool 和 apachecn 線程池
線程池類型 ThreadPoolType
fixed(固定):fixed線程池擁有固定數量的線程來處理請求,在沒有空閒線程時請求將被掛在隊列中。queue_size參數能夠控制在沒有空閒線程時,能排隊掛起的請求數
fixed_auto_queue_size:此類型爲實驗性的,將被更改或刪除,不關注
scaling(彈性):scaling線程池擁有的線程數量是動態的,這個數字介於core和max參數的配置之間變化。keep_alive參數用來控制線程在線程池中空閒的最長時間
direct:此類線程是一種不支持關閉的線程,就意味着一旦使用,則會一直存活下去.
一些重要的線程池
generic:用於通用的請求(例如:後臺節點發現),線程池類型爲 scaling。
index:用於index/delete請求,線程池類型爲 fixed, 大小的爲處理器數量,隊列大小爲200,最大線程數爲 1 + 處理器數量。
search:用於count/search/suggest請求。線程池類型爲 fixed, 大小的爲 int((處理器數量 3) / 2) +1,隊列大小爲1000。*
get:用於get請求。線程池類型爲 fixed,大小的爲處理器數量,隊列大小爲1000。
analyze:用於analyze請求。線程池類型爲 fixed,大小的1,隊列大小爲16
write:用於單個文檔的 index/delete/update 請求以及 bulk 請求,線程池類型爲 fixed,大小的爲處理器數量,隊列大小爲200,最大線程數爲 1 + 處理器數量。
snapshot:用於snaphost/restore請求。線程池類型爲 scaling,線程保持存活時間爲5分鐘,最大線程數爲min(5, (處理器數量)/2)。
warmer:用於segment warm-up請求。線程池類型爲 scaling,線程保持存活時間爲5分鐘,最大線程數爲min(5, (處理器數量)/2)。
refresh:用於refresh請求。線程池類型爲 scaling,線程空閒保持存活時間爲5分鐘,最大線程數爲min(10, (處理器數量)/2)。
listener:主要用於Java客戶端線程監聽器被設置爲true時執行動做。線程池類型爲 scaling,最大線程數爲min(10, (處理器數量)/2)。
ThreadPool 類中除了以上線程隊列,還能夠看到有 CachedTimeThread(緩存系統時間)、ExecutorService(在當前線程上執行提交的任務)、ThreadContext(線程上下文)、ScheduledThreadPoolExecutor(Java任務調度)等
參考文章:Java併發編程14-ScheduledThreadPoolExecutor詳解
Java線程池原理分析ScheduledThreadPoolExecutor篇
關於 ScheduledThreadPoolExecutor 更多的細節應該看書或者官方文檔
關於線程
瞭解了線程池,繼續深究ES線程是什麼樣子的
在 ScalingExecutorBuilder.build
中能夠發現 ExecutorService
對象是由 EsExecutors.newScaling
建立的
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) {
ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>();
EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder);
queue.executor = executor;
return executor;
}
再看看 EsThreadPoolExecutor
這個類的繼承關係,其是擴展自Java的線程池 ThreadPoolExecutor
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
ThreadContext contextHolder) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
this.name = name;
this.contextHolder = contextHolder;
}
回到 Node 節點的建立
4.2, 建立各類服務類對象 ResourceWatcherService、NetworkService、ClusterService、IngestService、ClusterInfoService、UsageService、MonitorService、CircuitBreakerService、MetaStateService、IndicesService、MetaDataIndexUpgradeService、TemplateUpgradeService、TransportService、ResponseCollectorService、SearchTransportService、NodeService、SearchService、PersistentTasksClusterService
這些服務類是的功能能夠根據名稱作一個大概的判斷,具體還須要看文檔和源碼,限於篇幅,在此不作探究
4.3, ModulesBuilder類加入各類模塊 ScriptModule、AnalysisModule、SettingsModule、pluginModule、ClusterModule、IndicesModule、SearchModule、GatewayModule、RepositoriesModule、ActionModule、NetworkModule、DiscoveryModule
4.4, guice 綁定依賴以及依賴注入
關於 guice 能夠參考以前的文章:
Google Guice 快速入門
Elasticsearch 中的 Guice
elasticsearch裏面的組件基本都進行進行了模塊化管理,elasticsearch對guice進行了封裝,經過ModulesBuilder類構建es的模塊(通常包括的模塊在 4.3 中列舉了)
// 依賴綁定
modules.add(b -> {
b.bind(Node.class).toInstance(this);
b.bind(NodeService.class).toInstance(nodeService);
b.bind(NamedXContentRegistry.class).toInstance(xContentRegistry);
b.bind(PluginsService.class).toInstance(pluginsService);
b.bind(Client.class).toInstance(client);
b.bind(NodeClient.class).toInstance(client);
b.bind(Environment.class).toInstance(this.environment);
b.bind(ThreadPool.class).toInstance(threadPool);
b.bind(NodeEnvironment.class).toInstance(nodeEnvironment);
b.bind(ResourceWatcherService.class).toInstance(resourceWatcherService);
b.bind(CircuitBreakerService.class).toInstance(circuitBreakerService);
b.bind(BigArrays.class).toInstance(bigArrays);
b.bind(ScriptService.class).toInstance(scriptModule.getScriptService());
b.bind(AnalysisRegistry.class).toInstance(analysisModule.getAnalysisRegistry());
b.bind(IngestService.class).toInstance(ingestService);
b.bind(UsageService.class).toInstance(usageService);
b.bind(NamedWriteableRegistry.class).toInstance(namedWriteableRegistry);
b.bind(MetaDataUpgrader.class).toInstance(metaDataUpgrader);
b.bind(MetaStateService.class).toInstance(metaStateService);
b.bind(IndicesService.class).toInstance(indicesService);
b.bind(SearchService.class).toInstance(searchService);
b.bind(SearchTransportService.class).toInstance(searchTransportService);
b.bind(SearchPhaseController.class).toInstance(new SearchPhaseController(settings,
searchService::createReduceContext));
b.bind(Transport.class).toInstance(transport);
b.bind(TransportService.class).toInstance(transportService);
b.bind(NetworkService.class).toInstance(networkService);
b.bind(UpdateHelper.class).toInstance(new UpdateHelper(settings, scriptModule.getScriptService()));
b.bind(MetaDataIndexUpgradeService.class).toInstance(metaDataIndexUpgradeService);
b.bind(ClusterInfoService.class).toInstance(clusterInfoService);
b.bind(GatewayMetaState.class).toInstance(gatewayMetaState);
b.bind(Discovery.class).toInstance(discoveryModule.getDiscovery());
{
RecoverySettings recoverySettings = new RecoverySettings(settings, settingsModule.getClusterSettings());
processRecoverySettings(settingsModule.getClusterSettings(), recoverySettings);
b.bind(PeerRecoverySourceService.class).toInstance(new PeerRecoverySourceService(settings, transportService,
indicesService, recoverySettings));
b.bind(PeerRecoveryTargetService.class).toInstance(new PeerRecoveryTargetService(settings, threadPool,
transportService, recoverySettings, clusterService));
}
httpBind.accept(b);
pluginComponents.stream().forEach(p -> b.bind((Class) p.getClass()).toInstance(p));
b.bind(PersistentTasksService.class).toInstance(persistentTasksService);
b.bind(PersistentTasksClusterService.class).toInstance(persistentTasksClusterService);
b.bind(PersistentTasksExecutorRegistry.class).toInstance(registry);
}
);
injector = modules.createInjector();
Bootstrap 啓動
5.1, 經過 injector
獲取各個類的對象,調用 start()
方法啓動(實際進入各個類的中 doStart
方法): LifecycleComponent、IndicesService、IndicesClusterStateService、SnapshotsService、SnapshotShardsService、RoutingService、SearchService、MonitorService、NodeConnectionsService、ResourceWatcherService、GatewayService、Discovery、TransportService
這裏簡要介紹一下各個服務類的職能:
IndicesService:索引管理
IndicesClusterStateService:跨集羣同步
SnapshotsService:負責建立快照
SnapshotShardsService:此服務在數據和主節點上運行,並控制這些節點上當前快照的分片。 它負責啓動和中止分片級別快照
RoutingService:偵聽集羣狀態,當它收到ClusterChangedEvent(集羣改變事件)將驗證集羣狀態,路由表可能會更新
SearchService:搜索服務
MonitorService:監控
NodeConnectionsService:此組件負責在節點添加到羣集狀態後鏈接到節點,並在刪除它們時斷開鏈接。 此外,它會按期檢查全部鏈接是否仍處於打開狀態,並在須要時還原它們。 請注意,若是節點斷開/不響應ping,則此組件不負責從羣集中刪除節點。 這是由NodesFaultDetection完成的。 主故障檢測由連接MasterFaultDetection完成。
ResourceWatcherService:通用資源觀察器服務
GatewayService:網關
若是該節點是主節點或數據節點,還須要進行相關的職能操做
5.2, 集羣發現與監控等,啓動 HttpServerTransport, 綁定服務端口
5.3, 啓動保活線程 keepAliveThread.start 進行心跳檢測
小結
過程很漫長,後面不少類的功能未了解,以後補上
本文因爲篇幅關係,部分代碼補全,文章全文請點擊 [閱讀全文]
有理解錯誤的地方請你們多多指教
更多內容請訪問個人我的博客:http://laijianfeng.org/
關注【小旋鋒】微信公衆號,及時接收博文推送
本文分享自微信公衆號 - 小旋鋒(whirlysBigData)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。