在ElasticSearch 線程池類型分析之SizeBlockingQueue這篇文章中分析了ES的fixed類型的線程池。本文分析scaling類型的線程池,以及該線程池所使用的任務隊列:ExecutorScalingQueue
從ThreadPool類中可看出,scaling線程池主要用來執行ES的系統操做:FLUSH、FORCE_MERGE、REFRESH、SNAPSHOT...而fixed類型的線程池則執行用戶發起的操做:SEARCH、INDEX、GET、WRITE。系統操做有什麼特色呢?系統操做請求量小、可容忍必定的延時。從線程池的角度看,執行系統操做的任務不會被線程池的拒絕策略拒絕,而這正是由ExecutorScalingQueue任務隊列和ForceQueuePolicy拒絕策略實現的。html
org.elasticsearch.common.util.concurrent.EsExecutors.newScalingjava
public static EsThreadPoolExecutor newScaling(String name, int min, int max, long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory, ThreadContext contextHolder) { ExecutorScalingQueue<Runnable> queue = new ExecutorScalingQueue<>(); EsThreadPoolExecutor executor = new EsThreadPoolExecutor(name, min, max, keepAliveTime, unit, queue, threadFactory, new ForceQueuePolicy(), contextHolder); queue.executor = executor; return executor; }
線程池對象是 EsThreadPoolExecutor、任務隊列是 ExecutorScalingQueue、拒絕策略是 ForceQueuePolicyelasticsearch
ForceQueuePolicy和ExecutorScalingQueue都是org.elasticsearch.common.util.concurrent.EsExecutors.EsExecutors 的內部類。EsExecutors是一個工具類,用來建立ThreadPoolExecutor對象。ide
org.elasticsearch.common.util.concurrent.EsExecutors.newScaling
org.elasticsearch.common.util.concurrent.EsExecutors.newFixed
org.elasticsearch.common.util.concurrent.EsExecutors.newAutoQueueFixed
再加上 private static final ExecutorService DIRECT_EXECUTOR_SERVICE = new AbstractExecutorService()...
ES中全部的線程池對象都由EsExecutors建立了。工具
當向 EsThreadPoolExecutor 提交任務時,若是觸發了拒絕策略,則會執行以下的rejectedExecution方法:將任務再添加到任務隊列中。ui
/** * A handler for rejected tasks that adds the specified element to this queue, * waiting if necessary for space to become available. */ static class ForceQueuePolicy implements XRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // force queue policy should only be used with a scaling queue assert executor.getQueue() instanceof ExecutorScalingQueue; //將被"拒絕"的任務再put到任務隊列中 executor.getQueue().put(r); } catch (final InterruptedException e) { // a scaling queue never blocks so a put to it can never be interrupted throw new AssertionError(e); } } //由於任務不會被拒絕,因此這裏的被拒絕的任務計數老是返回0 @Override public long rejected() { return 0; } }
ExecutorScalingQueue 繼承了LinkedTransferQueue,因此是一個無界隊列。它和 SizeBlockingQueue 所不一樣的是:SizeBlockingQueue的容量是有限制的,而ExecutorScalingQueue沒有長度限制,這意味着能夠將任意多個任務提交到 ExecutorScalingQueue中排隊等待,這與它一塊兒搭配使用的拒絕策略ForceQueuePolicy是吻合的。同時,這也代表FLUSH、REFRESH、SNAPSHOT等這些操做都不會被拒絕,不過這些操做的執行頻率都很低。
試想,對於SEARCH(搜索請求)、INDEX(索引文檔請求)、WRITE(添加文檔請求)這些由用戶觸發的操做,可能QPS會很是大,而REFRESH(刷新段segment)、FLUSH這樣的操做是系統層面的操做,執行頻率很低。所以分開交由不一樣的線程池處理是很是有必要的,這樣就能夠爲線程池配置不一樣的特色(有界隊列、無界隊列)的任務隊列以及拒絕處理策略了。this
在任務入隊列時,ExecutorScalingQueue的offer方法先判斷線程池中是否有空閒線程,如有空閒線程,tryTransfer方法會當即成功返回true,任務直接交由線程處理而不須要入隊列再排隊等待了。
這裏也能夠看出: LinkedBlockingQueue 與 LinkedTransferQueue 的區別,我想這也是爲何ES選擇LinkedTransferQueue做爲任務隊列的緣由之一吧。若線程池中沒有空閒的線程,再判斷線程池中當前已有線程數量是否達到了最大線程數量(max pool size),若未達到,則新建線程來處理任務,不然任務就進入隊列排隊等待處理,而因爲ExecutorScalingQueue是個無界隊列,沒有長度限制,而REFRESH這樣的操做又沒有低響應時間要求,所以長時間排隊也可以接受。編碼
/** * ExecutorScalingQueue 必須與 ForceQueuePolicy 拒絕策略搭配使用. * * 採用 ExecutorScalingQueue 做爲任務隊列的線程池它的 core pool size 和 max pool size 能夠不相等 * 當不斷地向線程池提交任務,線程的個數達到了core pool size但還沒有達到 max pool size時, left大於0成立,返回false * 觸發 ThreadPoolExecutor#execute方法中if語句 workQueue.offer(command) 爲false,從而致使if語句不成立 * 因而執行 addWorker 方法建立新線程來執行任務,若是 addWorker 不當心失敗了,會執行 rejected(command),可是這個任務是不能 * 被拒絕的,由於咱們只是想讓 線程池 優先建立 max pool size個線程來處理任務. * 因而採用 ForceQueuePolicy 保證任務必定是提交到隊列裏,從而保證任務"不被拒絕" * @param e * @return */ static class ExecutorScalingQueue<E> extends LinkedTransferQueue<E> { ThreadPoolExecutor executor; ExecutorScalingQueue() { } @Override public boolean offer(E e) { // first try to transfer to a waiting worker thread //若是線程池中有空閒的線程,tryTransfer會當即成功,直接將任務交由線程處理(省去了任務的排隊過程) if (!tryTransfer(e)) { // check if there might be spare capacity in the thread // pool executor int left = executor.getMaximumPoolSize() - executor.getCorePoolSize(); if (left > 0) { //線程池當前已有的線程數量還沒有達到 max pool size, 返回false, 觸發ThreadPoolExecutor的addWorker方法被調用,從而建立新線程 // reject queuing the task to force the thread pool // executor to add a worker if it can; combined // with ForceQueuePolicy, this causes the thread // pool to always scale up to max pool size and we // only queue when there is no spare capacity return false; } else { //線程池當前已有的線程數量 已是 max pool size了, 任務入隊列排隊等待 return super.offer(e); } } else { return true; } } }
本文分析了 ES中FLUSH、FORCE_MERGE、REFRESH、SNAPSHOT...操做所使用的線程池及其任務隊列、拒絕策略。理解線程池的實現原理有助於各類操做的調優,有時候寫數據到ES或執行大量的查詢請求時,可能會發現ES的日誌裏面有一些操做被拒絕的提示,這時,就能針對性地去調整線程池的配置了。
不論是refresh刷新segment,仍是 snapshot 快照備份,這些操做可理解爲"系統操做",這與用戶操做(search、get)是有區別的:write/get 須要良好的響應時間,這意味着任務不能長時間排隊過久。write/get 請求量可能很是大、QPS很是高,須要一些限制,因此這也是爲何它們的任務隊列容量是固定的,當wirte/get的請求量大處處理不過來時,就會觸發拒絕策略,任務被拒絕執行了。而對於refresh這類操做,執行不是太頻繁,有些系統操做還很重要,這種任務提交時就不能被拒絕,所以ForcePolicy是一個很好的選擇。從這裏也能夠看出:在一個大系統裏面,有各類類型的操做,所以有必要使用多個線程池來分別處理這些操做。而如何協調統一管理多個線程池(EsExecutors類、ExecutorBuilder類),及時回收空閒線程,設置合適的任務隊列長度(各類類型的任務隊列:ExecutorScalingQueue、SizeBlockingQueue、ResizableBlockingQueue),將全部的任務處理操做都統一到一套代碼流程邏輯(AbstractRunnable類、EsThreadPoolExecutor類的doExecute()方法)下執行,這些都須要很強的編碼能力。
最後,提一下search操做,很特殊。ES主要是用來作搜索的,那麼負責執行search操做的線程池是如何實現的呢?它又採用了什麼任務隊列呢?它的拒絕策略又是什麼呢?提早透露一下:search操做的線程池的任務隊列可動態調整任務隊列的長度,而且以一種十分巧妙的方式統計每一個任務的執行時間。讀完源碼以後,感嘆這些代碼的設計思路是那麼優美。spa
參考文章:
ElasticSearch 線程池類型分析之SizeBlockingQueue線程
原文:https://www.cnblogs.com/hapjin/p/11005676.html