[10]elasticsearch源碼深刻分析——線程池的封裝

本篇爲elasticsearch源碼分析系列文章的第十篇,本篇延續上一篇ElasticSearch的Plugin引出的內容,進行各類Plugin中線程池的分析。服務器

上篇講到了ElasticSearch中插件的基本概念,以及Node實例化中涉及到的PluginService初始化編碼,本篇將會繼續研究Node實例化的過程當中PluginsService發揮的做用,也就是經過PluginsService中的參數構建線程池框架。框架

線程池在什麼時候初始化

當Node完成了PluginsService的構造後,緊接會經過getExecutorBuilders方法取得線程池的Executor構造器列表,代碼以下:elasticsearch

List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings)
複製代碼

此時PluginsService對象中已經有了須要加載的全部plugin了,包含modules路徑和plugins路徑中的全部組件,這裏統稱爲plugin。以下圖所示總共是包含了13個已加載的Plugin,分別是modules路徑中的默認必須加載的12個和Plugins路徑中的自定義安裝的1個(ICU分詞器)。以下圖所示源碼分析

路徑中的plugin對象

內存中的plugin對象

構建線程池框架

初始化ExecutorBuilder集合

Node實例化過程當中,經過代碼:fetch

List<ExecutorBuilder<?>> executorBuilders = pluginsService.getExecutorBuilders(settings);
複製代碼

查找到自定義的線程池Executor構建器。再得到自定義線程池構建器集合後,開始構建線程池(ThreadPool)。優化

ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
複製代碼

首先經過代碼得到處理器CPU的數量,ui

Runtime.getRuntime().availableProcessors()
複製代碼

固然這個值是能夠被Setting中設置的變量processors來覆蓋的。這個變量在代碼中被標記爲availableProcessors。而後建立變量編碼

  • halfProcMaxAt5,這個變量的意思是availableProcessors的一半,但最大不超過5。
  • halfProcMaxAt10,這個變量的意思是availableProcessors的一半,但最大不超過10。

這兩個變量在後面建立各類線程池構造器中反覆用到。spa

在肯定了可以使用的處理器數量後,就能肯定線程池的最小值(genericThreadPoolMax),ElasticSearch中是肯定爲:可用CPU處理器數量的4倍,且固定範圍爲最小128,最大爲512插件

因而可知若是用通常服務器的話,線程池上限最終會被肯定爲128,能夠說仍是比較高的設定了。

接下來開始構造執行不一樣操做時線程池Executor,ElasticSearch中把各個操做的Executor構造爲Map,Map<String, ExecutorBuilder>,下面是各個Executor對象的解釋:

  • 普通操做的Executor:構建一個可伸縮的Executor構建器,value爲ScalingExecutorBuilder對象。接收參數和對應操做以下:

    • name:線程池執行者的名稱,也就是generic
    • core:線程池中線程的最小值,固定爲4。將thread_pool.generic.core的設爲這個值。
    • max:線程池中線程的最大值,對應上面提到的genericThreadPoolMax,在本機跑的結果是128
    • keepAlive:超過4個線程後,線程保持活躍的時間。這個值固定爲30秒。這個參數被設定爲變量thread_pool.generic.keep_alive
  • 索引操做的Executor:構建一個固定的Executor構建器。key爲index,value爲FixedExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.index.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是idnex
    • size:線程的固定大小,和參數name一塊兒構造配置變量thread_pool.index.size的值爲size的值,本機跑的結果是4
    • queueSize:阻塞隊列的大小,構造配置變量thread_pool.index.queue_size的值爲200,注意這個值固定爲200
  • 批處理操做的Executor:構建一個固定的Executor構建器。key爲bulk,value爲FixedExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.bulk.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是bulk
    • size:線程的固定大小,和參數name一塊兒構造配置變量thread_pool.bulk.size的值爲size的值,本機跑的結果是4
    • queueSize:阻塞隊列的大小,構造配置變量thread_pool.bulk.queue_size的值爲200,注意這個值固定爲200
  • get操做的Executor:構建一個固定的Executor構建器。key爲get,value爲FixedExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.get.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是get
    • size:線程的固定大小,和參數name一塊兒構造配置變量thread_pool.get.size的值爲size的值,本機跑的結果是4
    • queueSize:阻塞隊列的大小,構造配置變量thread_pool.get.queue_size的值爲1000,注意這個值固定爲1000
  • 查詢操做的Executor:構建一個根據利特爾法則自動擴展長度的Executor構建器,這個構建器的邏輯與其餘構建器不一樣,也顯得比較複雜,也說明了對於查詢操做,ElasticSearch作了特殊的優化。key爲search,value爲AutoQueueAdjustingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.search.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是search
    • size:線程的固定大小,和參數name一塊兒構造配置變量thread_pool.search.size的值爲size的值,本機跑的結果是7
    • initialQueueSize:初始化隊列的大小,固定設置爲1000,造配置變量thread_pool.search.queue_size的值爲200
    • minQueueSize:隊列的最小長度,固定設置爲1000設定配置變量thread_pool.search.min_queue_size的值爲1000
    • maxQueueSize:隊列的最大長度,固定設置爲1000,設定配置變量thread_pool.search.max_queue_size的值爲1000
    • frameSize:隊列的步進長度,固定設置爲2000,構造配置變量thread_pool.search.auto_queue_frame_size的值爲200,注意這個值固定爲200
    • thread_pool.search.target_response_time針對search操做的相應被設置爲1S,
  • 管理操做的Executor:構建一個可伸縮的Executor構建器。key爲management,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.management.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是management
    • size:線程的固定大小,和參數name一塊兒構造配置變量thread_pool.management.size的值爲size的值,本機跑的結果是1
    • queueSize:阻塞隊列的大小,構造配置變量thread_pool.management.queue_size的值爲200,注意這個值固定爲200
    • keepAlive:超過1個線程後,線程保持活躍的時間。這個值固定爲5分鐘。這個參數被設定爲變量thread_pool.management.keep_alive
  • 監聽操做的Executor:構建一個固定的Executor構建器。key爲listener,value爲FixedExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.listener.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是listener
    • size:線程的固定大小,上文提到的halfProcMaxAt10,和參數name一塊兒構造配置變量thread_pool.listener.size的值爲size的值,本機跑的結果是2
    • queueSize:阻塞隊列的大小,構造配置變量thread_pool.listener.queue_size的值爲**-1**,意思就沒有阻塞隊列。
  • flush操做的Executor:構建一個可伸縮的Executor構建器。key爲flush,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.flush.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是flush
    • size:線程的固定大小,上文提到的halfProcMaxAt5,和參數name一塊兒構造配置變量thread_pool.flush.size的值爲size的值,本機跑的結果是4
    • keepAlive:超過1個線程後,線程保持活躍的時間。這個值固定爲5分鐘。這個參數被設定爲變量thread_pool.management.keep_alive
  • refresh操做的Executor:構建一個可伸縮的Executor構建器。key爲refresh,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.refresh.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是refresh
    • size:線程的固定大小,上文提到的halfProcMaxAt10,和參數name一塊兒構造配置變量thread_pool.refresh.size的值爲size的值,本機跑的結果是4
    • keepAlive:超過1個線程後,線程保持活躍的時間。這個值固定爲5分鐘。這個參數被設定爲變量thread_pool.management.keep_alive
  • warmer操做的Executor:構建一個可伸縮的Executor構建器。key爲warmer,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.warmer.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是warmer
    • size:線程的固定大小,上文提到的halfProcMaxAt5,和參數name一塊兒構造配置變量thread_pool.warmer.size的值爲size的值,本機跑的結果是4
    • keepAlive:超過1個線程後,線程保持活躍的時間。這個值固定爲5分鐘。這個參數被設定爲變量thread_pool.management.keep_alive
  • snapshot操做的Executor:構建一個可伸縮的Executor構建器。key爲snapshot,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.snapshot.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是snapshot
    • size:線程的固定大小,上文提到的halfProcMaxAt5,和參數name一塊兒構造配置變量thread_pool.snapshot.size的值爲size的值,本機跑的結果是4
    • keepAlive:超過1個線程後,線程保持活躍的時間。這個值固定爲5分鐘。這個參數被設定爲變量thread_pool.management.keep_alive
  • 碎片處理操做的Executor:構建一個可伸縮的Executor構建器。key爲fetch_shard_started,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.fetch_shard_started.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是fetch_shard_started
    • size:線程的固定大小,和參數name一塊兒構造配置變量thread_pool.fetch_shard_started.size的值爲size的值,本機跑的結果是4
    • queueSize:阻塞隊列的大小,構造配置變量thread_pool.fetch_shard_started.queue_size的值爲200,注意這個值固定爲200
  • 強制merge操做的Executor:構建一個可伸縮的Executor構建器。key爲force_merge,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.force_merge.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是force_merge
    • size:線程的固定大小,和參數name一塊兒構造配置變量thread_pool.force_merge.size的值爲size的值,本機跑的結果是4
    • queueSize:阻塞隊列的大小,構造配置變量thread_pool.force_merge.queue_size的值爲200,注意這個值固定爲200
  • 獲取碎片操做的Executor:構建一個可伸縮的Executor構建器。key爲fetch_shard_store,value爲ScalingExecutorBuilder對象,接收參數和對應操做以下:

    • settings:Node的配置settings。設定配置變量thread_pool.fetch_shard_store.size的值爲該參數中cpu的數量
    • name:線程池執行者的名稱,也就是fetch_shard_store
    • size:線程的固定大小,和參數name一塊兒構造配置變量thread_pool.fetch_shard_store.size的值爲size的值,本機跑的結果是4
    • queueSize:阻塞隊列的大小,構造配置變量thread_pool.fetch_shard_store.queue_size的值爲200,注意這個值固定爲200

至此就完成了org.elasticsearch.threadpool.ThreadPool對象的建立。

ThreadPool對象的做用

獲得ThreadPool的對象後,經過線程池進行了NodeClient的構建。

client = new NodeClient(settings, threadPool);
複製代碼

ResourceWatcherService對象的構建,

final ResourceWatcherService resourceWatcherService = new ResourceWatcherService(settings, threadPool);
複製代碼

後面還有不少的組件都用到了線程池,好比:

  • IngestService
  • ClusterInfoService
  • MonitorService
  • ActionModule
  • IndicesService
  • NetworkModule
  • TransportService
  • DiscoveryModule
  • NodeService

能夠看出都是ElasticSearch的核心組件,這些組件的功能和原理,我都會在之後的文章中講解,而像ElasticSearch這種存儲搜索系統來講IO操做確定很是頻繁,而線程池是專門致力於解決系統的IO問題,它在這些服務組件中的做用也顯得愈發重要。

利特爾法則

查詢操做中提到的利特爾法則是一種描述穩定系統中,三個變量之間關係的法則。

其中L表示平均請求數量,λ表示請求的頻率,W表示響應請求的平均時間。舉例來講,若是每秒請求數爲10次,每一個請求處理時間爲1秒,那麼在任什麼時候刻都有10個請求正在被處理。回到咱們的話題,就是須要使用10個線程來進行處理。若是單個請求的處理時間翻倍,那麼處理的線程數也要翻倍,變成20個。

理解了處理時間對於請求處理效率的影響以後,咱們會發現,一般理論上限可能不是線程池大小的最佳值。線程池上限還須要參考任務處理時間。

假設JVM能夠並行處理1000個任務,若是每一個請求處理時間不超過30秒,那麼在最壞狀況下,每秒最多隻能處理33.3個請求。然而,若是每一個請求只須要500毫秒,那麼應用程序每秒能夠處理2000個請求。

相關文章
相關標籤/搜索