儘管前面寫好幾篇ES線程池分析的文章(見文末參考連接),但都不太滿意。但從ES的線程池中瞭解到了很多JAVA線程池的使用技巧,因而忍不住再寫一篇(ES6.3.2版本的源碼)。文中給出的每一個代碼片段,都標明瞭這些代碼是來自哪一個類的哪一個方法。
ElasticSearch裏面一共有四種類型的線程池,源碼:ThreadPool.ThreadPoolTypehtml
DIRECT("direct"), FIXED("fixed"), FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), SCALING("scaling");
GET、SEARCH、WRITE、INDEX、FLUSH...等各類操做是交由這些線程池實現的。爲何定義不一樣類型的線程池呢?舉個最簡單的例子:程序裏面有IO密集型任務,也有CPU密集型任務,這些任務都提交到一個線程池中執行?仍是根據任務的執行特色將CPU密集型的任務都提交到一個線程池,IO密集型任務都提交到另外一個線程池執行?java
不一樣種類的操做(INDEX、SEARCH...)交由不一樣類型的線程池執行是有不少好處的:node
再來講一下ES中的線程池都是如何建立的?
ES節點啓動時,執行Node類的構造方法 :org.elasticsearch.node.Node.Node(org.elasticsearch.env.Environment, java.util.Collection<java.lang.Class<? extends org.elasticsearch.plugins.Plugin>>)
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
new ThreadPool對象,從這裏開始建立線程池。看懂了ThreadPool類,就理解了ES線程池的一半。多線程
每一個操做都有一個線程池,每一個線程池都有一個相應的 ExecutorBuilder 對象,線程池都是經過ExecutorBuilder類的build()方法建立的。
在org.elasticsearch.threadpool.ThreadPool.ThreadPool的構建函數裏面建立各類ExecutorBuilder對象。能夠看出:INDEX操做的線程池的 ExecutorBuilder對象實際類型是FixedExecutorBuilder異步
builders.put(Names.GENERIC, new ScalingExecutorBuilder(Names.GENERIC, 4, genericThreadPoolMax, TimeValue.timeValueSeconds(30))); builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true)); builders.put(Names.WRITE, new FixedExecutorBuilder(settings, Names.WRITE, "bulk", availableProcessors, 200)); builders.put(Names.GET, new FixedExecutorBuilder(settings, Names.GET, availableProcessors, 1000)); builders.put(Names.ANALYZE, new FixedExecutorBuilder(settings, Names.ANALYZE, 1, 16)); builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
如上代碼所示,雖然ES爲咱們內置好了許多線程池(GENERIC、INDEX、WRITE、GET...),但還能夠自定義 ExecutorBuilder對象,建立自定義的線程池。全部的ExecutorBuilder對象建立完畢後,保存到一個HashMap裏面。elasticsearch
for (final ExecutorBuilder<?> builder : customBuilders) { if (builders.containsKey(builder.name())) { throw new IllegalArgumentException("builder with name [" + builder.name() + "] already exists"); } builders.put(builder.name(), builder); }
最後,遍歷builders 這個HashMap 取出 ExecutorBuilder對象,調用它的build()方法建立線程池ide
for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) { final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings); //這裏執行build方法建立線程池 final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); if (executors.containsKey(executorHolder.info.getName())) { throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered"); } logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info)); executors.put(entry.getKey(), executorHolder); }
建立INDEX操做的線程池須要指定任務隊列,這個任務隊列就是:SizeBlockingQueue。固然了,也有一些其餘操做(好比GET操做)的線程池的任務隊列也是SizeBlockingQueue。
下面參數可看出:該任務隊列的長度爲200,org.elasticsearch.threadpool.ThreadPool.ThreadPool的構造方法:函數
builders.put(Names.INDEX, new FixedExecutorBuilder(settings, Names.INDEX, availableProcessors, 200, true));
前面已經提到了,每一個線程池都由ExecutorBuilder的build方法建立的。具體到INDEX操做的線程池,它的ExecutorBuilder實例對象是: FixedExecutorBuilder對象,在ExecutorBuilder 保存一些線程池參數信息:(core pool size、max pool size、queue size...)性能
final ExecutorService executor = EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
若是queue_size配置爲 -1,那就是一個無界隊列(LinkedTransferQueue)。咱們是能夠修改線程池配置參數的:關於線程池隊列長度的配置信息參考:官方文檔threadpool
而INDEX操做對應的線程池的任務隊列長度爲200,所以下面代碼建立了一個長度爲200的 SizeBlockingQueue,在代碼最後一行,爲該線程池指定的拒絕策略是 EsAbortPolicyui
public static EsThreadPoolExecutor newFixed(String name, int size, int queueCapacity, ThreadFactory threadFactory, ThreadContext contextHolder) { BlockingQueue<Runnable> queue; if (queueCapacity < 0) { queue = ConcurrentCollections.newBlockingQueue(); } else { queue = new SizeBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), queueCapacity); } return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder); }
下面開始分析SizeBlockingQueue的源碼:
通常在自定義線程池時,要麼是直接 new ThreadPoolExecutor,要麼是繼承ThreadPoolExecutor,在建立ThreadPoolExecutor對象時須要指定線程池的配置參數。好比,線程池的核心線程數(core pool size),最大線程數,任務隊列,拒絕策略。這裏我想提一下拒絕策略,由於某些ES的操做具備"強制"執行的特色:若是某個任務被標記爲強制執行,那麼向線程池提交該任務時,就不能拒絕它。是否是很厲害?想一想,線程池是如何作到的?
下面舉個例子:
//建立任務隊列,這裏沒有指定任務隊列的長度,那麼這就是一個無界隊列 private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(); //建立線程工廠,由它來建立線程 private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("thread-%d").setUncaughtExceptionHandler(exceptionHandler).build(); //建立線程池,核心線程數爲4,最大線程數爲16 private ThreadPoolExecutor executor = new ThreadPoolExecutor(4, 16, 1, TimeUnit.DAYS, taskQueue, threadFactory, rejectExecutionHandler);
這裏建立的線程池,它的線程數量永遠不可能達到最大線程數量16,爲何?由於咱們的任務隊列是一個無界隊列,當向線程池中提交任務時,LinkedBlockingQueue.offer方法不會返回false。而在JDK源碼java.util.concurrent.ThreadPoolExecutor.execute中,當任務入隊列失敗返回false時,纔有可能觸發addWork建立新線程。這個時候,你可能會說:在 new LinkedBlockingQueue的時候指定隊列長度不就完了?好比這樣指定隊列長度爲1024
private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1024);
可是,有沒有一種方法,可以作到:當core pool size 個核心線程數處理不過來時,先讓線程池的線程數量建立到最大值(max pool size),而後,若還有任務提交到線程池,則讓任務排隊等待處理?SizeBlockingQueue 重寫了BlockingQueue的offer方法,實現了這個功能。
另外,我再反問一下?如何肯定1024就是一個合適的隊列容量?萬一提交任務速度很快,一會兒任務隊列就滿了,長度1024就會致使大量的任務被拒絕。
ES中的 ResizableBlockingQueue 實現了一種可動態調整隊列長度的任務隊列,有興趣的能夠去研究一下。
SizeBlockingQueue 封裝了 LinkedTransferQueue,而 LinkedTransferQueue 是一個無界隊列,與LinkedBlockingQueue不一樣的是,LinkedTransferQueue的構造方法是不能指定任務隊列的長度(capacity)的。所以,SizeBlockingQueue定義一個capacity屬性提供了隊列有界的功能。
好,來看看SizeBlockingQueue是如何重寫offer方法的:org.elasticsearch.common.util.concurrent.SizeBlockingQueue.offer(E)
@Override public boolean offer(E e) { while (true) { //獲取當前任務隊列的長度,即:當前任務隊列裏面有多少個任務正在排隊等待執行 final int current = size.get(); //若是正在等待排隊的任務數量大於等於任務隊列長度的最大值(容量), //返回false 就有可能 觸發 java.util.concurrent.ThreadPoolExecutor.addWorker 調用建立新線程 if (current >= capacity()) { return false; } //當前正在排隊的任務數量還沒有超過隊列的最大長度,使用CAS 先將任務隊列長度加1,[CAS的經典用法] if (size.compareAndSet(current, 1 + current)) { break; } } //將任務添加到隊列 boolean offered = queue.offer(e); if (!offered) { //若是未添加成功,再把數量減回去便可 size.decrementAndGet(); } return offered; }
上面,就是經過先判斷當前排隊的任務是否小於任務隊列的最大長度(容量) 來實現:優先建立線程數量到 max pool size。下面來模擬一下使用 SizeBlockingQueue 時處理任務的步驟: 根據前面的介紹:線程池 core pool size=4,max pool size=16,taskQueue 是 SizeBlockingQueue,任務隊列的最大長度是200
1,提交1-4號 四個任務給線程池,線程池建立4個線程處理這些任務
2,1-4號 四個任務正在執行中...此時又提交了8個任務到線程池
3,這時,線程池是再繼續建立8個線程,處理 5-12號任務。此時,線程池中一共有4+8=12個線程,小於max pool size
4,假設 1-12號任務都正在處理中,此時又提交了8個任務到線程池
5,這時,線程池會再建立4個新線程處理其中的13-16號 這4個任務,線程數量已經達到max pool size,不能再建立新線程了,還有4個任務(17-20號)入隊列排隊等待。
有沒有興趣模擬一下使用LinkedBlockingQueue做爲任務隊列時,線程池又是如何處理這一共提交的20個任務的?
最後來分析一下 SizeBlockingQueue 如何支持:當向線程池提交任務時,若是任務被某種拒絕策略拒絕了,若是這種任務又很重要,那能不能強制將該任務提交到線程池的任務隊列中呢?
這裏就涉及到:在建立線程池時,爲線程池配置了何種拒絕策略了。下面以INDEX操做的線程池爲例說明:
在org.elasticsearch.common.util.concurrent.EsExecutors.newFixed 中:可知該線程池所使用的拒絕策略是:EsAbortPolicy
return new EsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, threadFactory, new EsAbortPolicy(), contextHolder);
看 EsAbortPolicy 的源碼:org.elasticsearch.common.util.concurrent.EsAbortPolicy.rejectedExecution
if (r instanceof AbstractRunnable) { //判斷該任務是否是一個 可強制提交的任務 if (((AbstractRunnable) r).isForceExecution()) { BlockingQueue<Runnable> queue = executor.getQueue(); if (!(queue instanceof SizeBlockingQueue)) { throw new IllegalStateException("forced execution, but expected a size queue"); } //是一個可強制提交的任務,而且 線程池的任務隊列是 SizeBlockingQueue時,強制提交任務 try { ((SizeBlockingQueue) queue).forcePut(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("forced execution, but got interrupted", e); } return; } } rejected.inc(); //任務被拒絕且未能強制執行, 拋出EsRejectedExecutionException異常後,會被 EsThreadPoolExecutor.doExecute catch, 進行相應的處理 throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
AbstractRunnable 是提交的Runnable任務,只要Runnable任務的 isForceExecution()返回true,就代表這個任務須要「強制提交」。關於AbstractRunnable,可參考:Elasticsearch中各類線程池分析
那爲何只有當任務隊列是 SizeBlockingQueue 時,才能夠強制提交呢?這很好理解:首先SizeBlockingQueue它封裝了LinkedTransferQueue,LinkedTransferQueue本質上是一個無界隊列,實際上能夠添加無窮多個任務(不考慮OOM),只不過是用 capacity 屬性限制了隊列的長度而已。
若是,任務隊列是 new LinkedBlockingQueue<>(1024)
,確定是不能支持強制提交的,由於當LinkedBlockingQueue長度達到1024後,再提交任務,直接返回false了。從這裏也能夠借鑑ES線程池任務隊列的設計方式,應用到項目中去。
綜上:只有Runnable任務 isForceExecution返回true,而且線程池的任務隊列是SizeBlockingQueue時,向線程池提交任務時,老是能提交成功(強制執行機制保證)。其餘狀況下,任務被拒絕時,會拋出EsRejectedExecutionException異常。
強制提交,把任務添加到任務隊列 SizeBlockingQueue 中,源碼以下:
org.elasticsearch.common.util.concurrent.SizeBlockingQueue.forcePut
/** * Forces adding an element to the queue, without doing size checks. */ public void forcePut(E e) throws InterruptedException { size.incrementAndGet(); try { queue.put(e); } catch (InterruptedException ie) { size.decrementAndGet(); throw ie; } }
ES會爲每種操做建立一個線程池,本文基於INDEX操做分析了ES中線程池的任務隊列SizeBlockingQueue。對於 INDEX 操做而言,它的線程池是由org.elasticsearch.threadpool.FixedExecutorBuilder 的build方法建立的,線程池的最大核心線程數和最大線程數相同,使用的任務隊列是 SizeBlockingQueue,長度爲200,拒絕策略是:org.elasticsearch.common.util.concurrent.EsAbortPolicy。
爲何要爲不一樣的操做分配不一樣的線程池呢?
假設 index 操做 和 snapshot 操做使用同一個線程池,若是某節點發生故障,index操做被阻塞了,而 Client發起的索引文檔操做的 QPS又很高,就很容易影響 snapshot 服務了。
SizeBlockingQueue 本質上是一個 LinkedTransferQueue,其實ES中全部的任務隊列都是封裝LinkedTransferQueue實現的,並無使用LinkedBlockingQueue。
ES中的全部任務(Runnable)都是基於org.elasticsearch.common.util.concurrent.AbstractRunnable這個抽象類封裝的,固然有一些任務是經過Lambda表達式的形式提交的。任務的具體處理邏輯在 org.elasticsearch.common.util.concurrent.AbstractRunnable#doRun 方法中,任務執行完成由onAfter()處理,執行出現異常由onFailure()處理。線程池的 org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor#doExecute 方法 裏面就是整個任務的處理流程:
protected void doExecute(final Runnable command) { try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } }
最後,ES的線程池模塊代碼主要在 org.elasticsearch.threadpool 和 org.elasticsearch.common.util.concurrent 包下。整體來講,threadpool模塊相比於ES的其餘模塊,是一個小模塊,代碼不算複雜。可是threadpool又很重要,由於它是其餘模塊執行邏輯的基礎,threadpool 再配上異步執行機制,是ES源碼中其餘操做的源碼實現思路。