本篇爲elasticsearch源碼分析系列文章的第十篇,本篇延續上一篇ElasticSearch的Plugin引出的內容,進行各類Plugin中線程池的分析。服務器
上篇講到了ElasticSearch中插件的基本概念,以及Node實例化中涉及到的PluginService初始化編碼,本篇將會繼續研究Node實例化的過程當中PluginsService發揮的做用,也就是經過PluginsService中的參數構建線程池框架。框架
當Node完成了PluginsService的構造後,緊接會經過getExecutorBuilders
方法取得線程池的Executor構造器列表,代碼以下:elasticsearch
List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings)
複製代碼
此時PluginsService對象中已經有了須要加載的全部plugin了,包含modules
路徑和plugins
路徑中的全部組件,這裏統稱爲plugin。以下圖所示總共是包含了13個已加載的Plugin,分別是modules路徑中的默認必須加載的12個和Plugins路徑中的自定義安裝的1個(ICU分詞器)。以下圖所示源碼分析
Node實例化過程當中,經過代碼:fetch
List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
複製代碼
查找到自定義的線程池Executor構建器。再得到自定義線程池構建器集合後,開始構建線程池(ThreadPool)。優化
ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
複製代碼
首先經過代碼得到處理器CPU的數量,ui
Runtime.getRuntime().availableProcessors()
複製代碼
固然這個值是能夠被Setting中設置的變量processors來覆蓋的。這個變量在代碼中被標記爲availableProcessors。而後建立變量編碼
這兩個變量在後面建立各類線程池構造器中反覆用到。spa
在肯定了可以使用的處理器數量後,就能肯定線程池的最小值(genericThreadPoolMax),ElasticSearch中是肯定爲:可用CPU處理器數量的4倍,且固定範圍爲最小128,最大爲512。插件
因而可知若是用通常服務器的話,線程池上限最終會被肯定爲128,能夠說仍是比較高的設定了。
接下來開始構造執行不一樣操做時線程池Executor,ElasticSearch中把各個操做的Executor構造爲Map,Map<String, ExecutorBuilder>
,下面是各個Executor對象的解釋:
普通操做的Executor:構建一個可伸縮的Executor構建器,value爲ScalingExecutorBuilder對象。接收參數和對應操做以下:
thread_pool.generic.core
的設爲這個值。thread_pool.generic.keep_alive
索引操做的Executor:構建一個固定的Executor構建器。key爲index
,value爲FixedExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.index.size
的值爲該參數中cpu的數量thread_pool.index.size
的值爲size的值,本機跑的結果是4。thread_pool.index.queue_size
的值爲200,注意這個值固定爲200。批處理操做的Executor:構建一個固定的Executor構建器。key爲bulk
,value爲FixedExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.bulk.size
的值爲該參數中cpu的數量thread_pool.bulk.size
的值爲size的值,本機跑的結果是4。thread_pool.bulk.queue_size
的值爲200,注意這個值固定爲200。get操做的Executor:構建一個固定的Executor構建器。key爲get
,value爲FixedExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.get.size
的值爲該參數中cpu的數量thread_pool.get.size
的值爲size的值,本機跑的結果是4。thread_pool.get.queue_size
的值爲1000,注意這個值固定爲1000。查詢操做的Executor:構建一個根據利特爾法則自動擴展長度的Executor構建器,這個構建器的邏輯與其餘構建器不一樣,也顯得比較複雜,也說明了對於查詢操做,ElasticSearch作了特殊的優化。key爲search
,value爲AutoQueueAdjustingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.search.size
的值爲該參數中cpu的數量thread_pool.search.size
的值爲size的值,本機跑的結果是7。thread_pool.search.queue_size
的值爲200thread_pool.search.min_queue_size
的值爲1000thread_pool.search.max_queue_size
的值爲1000thread_pool.search.auto_queue_frame_size
的值爲200,注意這個值固定爲200。thread_pool.search.target_response_time
針對search操做的相應被設置爲1S,管理操做的Executor:構建一個可伸縮的Executor構建器。key爲management
,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.management.size
的值爲該參數中cpu的數量thread_pool.management.size
的值爲size的值,本機跑的結果是1。thread_pool.management.queue_size
的值爲200,注意這個值固定爲200。thread_pool.management.keep_alive
。監聽操做的Executor:構建一個固定的Executor構建器。key爲listener
,value爲FixedExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.listener.size
的值爲該參數中cpu的數量thread_pool.listener.size
的值爲size的值,本機跑的結果是2。thread_pool.listener.queue_size
的值爲**-1**,意思就沒有阻塞隊列。flush操做的Executor:構建一個可伸縮的Executor構建器。key爲flush
,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.flush.size
的值爲該參數中cpu的數量thread_pool.flush.size
的值爲size的值,本機跑的結果是4。thread_pool.management.keep_alive
。refresh操做的Executor:構建一個可伸縮的Executor構建器。key爲refresh
,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.refresh.size
的值爲該參數中cpu的數量thread_pool.refresh.size
的值爲size的值,本機跑的結果是4。thread_pool.management.keep_alive
。warmer操做的Executor:構建一個可伸縮的Executor構建器。key爲warmer
,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.warmer.size
的值爲該參數中cpu的數量thread_pool.warmer.size
的值爲size的值,本機跑的結果是4。thread_pool.management.keep_alive
。snapshot操做的Executor:構建一個可伸縮的Executor構建器。key爲snapshot
,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.snapshot.size
的值爲該參數中cpu的數量thread_pool.snapshot.size
的值爲size的值,本機跑的結果是4。thread_pool.management.keep_alive
。碎片處理操做的Executor:構建一個可伸縮的Executor構建器。key爲fetch_shard_started
,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.fetch_shard_started.size
的值爲該參數中cpu的數量thread_pool.fetch_shard_started.size
的值爲size的值,本機跑的結果是4。thread_pool.fetch_shard_started.queue_size
的值爲200,注意這個值固定爲200。強制merge操做的Executor:構建一個可伸縮的Executor構建器。key爲force_merge
,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.force_merge.size
的值爲該參數中cpu的數量thread_pool.force_merge.size
的值爲size的值,本機跑的結果是4。thread_pool.force_merge.queue_size
的值爲200,注意這個值固定爲200。獲取碎片操做的Executor:構建一個可伸縮的Executor構建器。key爲fetch_shard_store
,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:
thread_pool.fetch_shard_store.size
的值爲該參數中cpu的數量thread_pool.fetch_shard_store.size
的值爲size的值,本機跑的結果是4。thread_pool.fetch_shard_store.queue_size
的值爲200,注意這個值固定爲200。至此就完成了org.elasticsearch.threadpool.ThreadPool對象的建立。
獲得ThreadPool的對象後,經過線程池進行了NodeClient的構建。
client = new NodeClient(settings, threadPool);
複製代碼
和ResourceWatcherService對象的構建,
final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
複製代碼
後面還有不少的組件都用到了線程池,好比:
能夠看出都是ElasticSearch的核心組件,這些組件的功能和原理,我都會在之後的文章中講解,而像ElasticSearch這種存儲搜索系統來講IO操做確定很是頻繁,而線程池是專門致力於解決系統的IO問題,它在這些服務組件中的做用也顯得愈發重要。
查詢操做中提到的利特爾法則是一種描述穩定系統中,三個變量之間關係的法則。
其中L表示平均請求數量,λ表示請求的頻率,W表示響應請求的平均時間。舉例來講,若是每秒請求數爲10次,每一個請求處理時間爲1秒,那麼在任什麼時候刻都有10個請求正在被處理。回到咱們的話題,就是須要使用10個線程來進行處理。若是單個請求的處理時間翻倍,那麼處理的線程數也要翻倍,變成20個。
理解了處理時間對於請求處理效率的影響以後,咱們會發現,一般理論上限可能不是線程池大小的最佳值。線程池上限還須要參考任務處理時間。
假設JVM能夠並行處理1000個任務,若是每一個請求處理時間不超過30秒,那麼在最壞狀況下,每秒最多隻能處理33.3個請求。然而,若是每一個請求只須要500毫秒,那麼應用程序每秒能夠處理2000個請求。