在上一篇文章 ElasticSearch 線程池類型分析之 ExecutorScalingQueue的末尾,談到了處理ES 搜索操做(search)的線程池的一些實現細節,本文就如下幾個問題分析SEARCH操做的線程池。html
在ThreadPool類的構造方法中構造SEARCH線程池:java
builders.put(Names.SEARCH, new AutoQueueAdjustingExecutorBuilder(settings, Names.SEARCH, searchThreadPoolSize(availableProcessors), 1000, 1000, 1000, 2000));
SEARCH 線程池的核數線程數與部署ES節點的機器的CPU個數有關,它的任務隊列的容量可動態調整,任務隊列的初始長度爲1000。SEARCH線程池的具體實現類是QueueResizingEsThreadPoolExecutor,採用的任務隊列是ResizableBlockingQueue,拒絕策略是 EsAbortPolicy。ResizableBlockingQueue 繼承了 SizeBlockingQueue,提供了可動態調整任務隊列容量的功能,關於SizeBlockingQueue 可參考ElasticSearch 線程池類型分析之 SizeBlockingQueue的分析。
org.elasticsearch.common.util.concurrent.EsExecutors.newAutoQueueFixednode
ResizableBlockingQueue<Runnable> queue = new ResizableBlockingQueue<>(ConcurrentCollections.<Runnable>newBlockingQueue(), initialQueueCapacity); return new QueueResizingEsThreadPoolExecutor(name, size, size, 0, TimeUnit.MILLISECONDS, queue, minQueueSize, maxQueueSize, TimedRunnable::new, frameSize, targetedResponseTime, threadFactory, new EsAbortPolicy(), contextHolder);
提交的Runnable任務會被封裝成TimedRunnable對象,從而可以統計任務的執行時間。在 new TimedRunnable 對象時,this.creationTimeNanos = System.nanoTime();
,記錄任務的建立時間。
finishTimeNanos-startTimeNanos
表明任務的執行時間,startTimeNanos-creationTimeNanos
表示任務的排隊時間,這樣就能記錄每一個Runnable任務的排隊時間和執行時間了,很是完美的設計思路。
org.elasticsearch.common.util.concurrent.TimedRunnablegit
//TimedRunnable的構造方法 TimedRunnable(final Runnable original) { this.original = original; this.creationTimeNanos = System.nanoTime(); } @Override public void doRun() { try { //任務執行開始時間 startTimeNanos = System.nanoTime(); //任務的執行邏輯 original.run(); } finally { //任務執行完成時間 finishTimeNanos = System.nanoTime(); } }
下面我來詳細分析如何統計提交到線程池的Runnable任務的執行時間。先看 QueueResizingEsThreadPoolExecutor 的構造方法參數,重點看 runnableWrapper 參數,我把它理解成"處理邏輯"。
從本文的第一個代碼片斷 new QueueResizingEsThreadPoolExecutor 可知,TimedRunnable::new 賦值給了 runnableWrapper,因爲它是java.util.function.Function接口,當java.util.function.Function.apply 方法被調用執行時,就是執行runnableWrapper處理邏輯,即:new 一個 TimedRunnable 對象。看TimedRunnable的構造方法可知,此時已經把任務的建立時間給記錄下來了。
這裏分析得這麼詳細的緣由是:ES源碼中大量地用到了函數式接口、Lambda表達式,剛看源碼時,一直不知道這段Lambda表達式所表明的"處理邏輯"是在哪裏執行的,當慢慢熟悉了這種Lambda表達式的寫法後,就明白這種寫法極大地提高了代碼的靈活性。github
//runnableWrapper聲明爲函數式接口Function,它接收一個Runnable參數,執行runnableWrapper處理邏輯,返回一個Runnable結果 private final Function<Runnable, Runnable> runnableWrapper; private final ResizableBlockingQueue<Runnable> workQueue; QueueResizingEsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, ResizableBlockingQueue<Runnable> workQueue, int minQueueSize, int maxQueueSize, Function<Runnable, Runnable> runnableWrapper, final int tasksPerFrame, TimeValue targetedResponseTime, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { super(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler, contextHolder); this.runnableWrapper = runnableWrapper; this.workQueue = workQueue; this.tasksPerFrame = tasksPerFrame; this.startNs = System.nanoTime(); this.minQueueSize = minQueueSize; this.maxQueueSize = maxQueueSize; this.targetedResponseTimeNanos = targetedResponseTime.getNanos(); this.executionEWMA = new ExponentiallyWeightedMovingAverage(EWMA_ALPHA, 0); logger.debug( "thread pool [{}] will adjust queue by [{}] when determining automatic queue size", getName(), QUEUE_ADJUSTMENT_AMOUNT); }
當任務提交時,就是執行QueueResizingEsThreadPoolExecutor的doExecute()方法:編程
@Override protected void doExecute(final Runnable command) { // we are submitting a task, it has not yet started running (because super.excute() has not // been called), but it could be immediately run, or run at a later time. We need the time // this task entered the queue, which we get by creating a TimedRunnable, which starts the // clock as soon as it is created. super.doExecute(this.runnableWrapper.apply(command));//apply方法 觸發 TimedRunnable::new執行,建立TimedRunnable對象 }
上面已經可以記錄每個任務的執行時間了,可是任務隊列的容量設置爲多少合適呢?這是由排隊理論裏面的little's law決定的。關於利特爾法則,可自行Google。api
/** * Calculate Little's Law (L), which is the "optimal" queue size for a particular task rate (lambda) and targeted response time. * * @param lambda the arrival rate of tasks in nanoseconds * @param targetedResponseTimeNanos nanoseconds for the average targeted response rate of requests * @return the optimal queue size for the give task rate and targeted response time */ static int calculateL(final double lambda, final long targetedResponseTimeNanos) { assert targetedResponseTimeNanos > 0 : "cannot calculate for instantaneous requests"; // L = λ * W return Math.toIntExact((long)(lambda * targetedResponseTimeNanos)); }
Little's law 須要2個參數,一個是lambda,另外一個是W。網絡
在ES中,這個平均響應時間能夠在配置文件中指定,若未指定,則默認爲1s。代碼以下:AutoQueueAdjustingExecutorBuilder的構造方法中將響應時間配置爲1s併發
final String targetedResponseTimeKey = settingsKey(prefix, "target_response_time"); this.targetedResponseTimeSetting = Setting.timeSetting(targetedResponseTimeKey, TimeValue.timeValueSeconds(1), TimeValue.timeValueMillis(10), Setting.Property.NodeScope);
統計線程池任務的執行個數和總耗時,是在 afterExecute 方法中完成的,ES自定義線程池重寫了ThreadPoolExecutor.afterExecute 方法,每當線程池中的任務執行完成時,會自動調用afterExecute方法作一些"後處理"oracle
@Override protected void afterExecute(Runnable r, Throwable t) { //重寫 afterExecute 方法時,要先調用 super.afterExecute super.afterExecute(r, t); // A task has been completed, it has left the building. We should now be able to get the // total time as a combination of the time in the queue and time spent running the task. We // only want runnables that did not throw errors though, because they could be fast-failures // that throw off our timings, so only check when t is null. //只統計 類型爲TimedRunnable任務 的執行時間和任務個數 assert r instanceof TimedRunnable : "expected only TimedRunnables in queue"; //單個任務的耗時(排隊時間加上執行時間) final long taskNanos = ((TimedRunnable) r).getTotalNanos(); //全部任務的總耗時(每一個任務的耗時累加求和) final long totalNanos = totalTaskNanos.addAndGet(taskNanos); //單個任務的執行時間(其實就是單個任務的耗時減去排隊時間) final long taskExecutionNanos = ((TimedRunnable) r).getTotalExecutionNanos(); assert taskExecutionNanos >= 0 : "expected task to always take longer than 0 nanoseconds, got: " + taskExecutionNanos; executionEWMA.addValue(taskExecutionNanos); //tasksPerFrame默認爲2000, 線程池每執行完一批任務(tasksPerFrame個)就進行一次任務隊列長度的調整。 if (taskCount.incrementAndGet() == this.tasksPerFrame) { final long endTimeNs = System.nanoTime(); //線程池從啓動時刻(startNs)開始,一共運行了多長時間(注意不只僅Runnable任務有生命週期,線程池也是有生命週期的) final long totalRuntime = endTimeNs - this.startNs; // Reset the start time for all tasks. At first glance this appears to need to be // volatile, since we are reading from a different thread when it is set, but it // is protected by the taskCount memory barrier. // See: https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/atomic/package-summary.html startNs = endTimeNs; // Calculate the new desired queue size try { //計算lambda,tasksPerFrame個任務執行成功的總時間是 totalNanos. 所以,lambda可理解爲處理速率 final double lambda = calculateLambda(tasksPerFrame, Math.max(totalNanos, 1L)); //根據 little's law 計算出來的任務隊列的理想容量(任務隊列所容許的最大長度) final int desiredQueueSize = calculateL(lambda, targetedResponseTimeNanos); //當前任務隊列的長度 final int oldCapacity = workQueue.capacity(); if (logger.isDebugEnabled()) { final long avgTaskTime = totalNanos / tasksPerFrame; logger.debug("[{}]: there were [{}] tasks in [{}], avg task time [{}], EWMA task execution [{}], " + "[{} tasks/s], optimal queue is [{}], current capacity [{}]", getName(), tasksPerFrame, TimeValue.timeValueNanos(totalRuntime), TimeValue.timeValueNanos(avgTaskTime), TimeValue.timeValueNanos((long)executionEWMA.getAverage()), String.format(Locale.ROOT, "%.2f", lambda * TimeValue.timeValueSeconds(1).nanos()), desiredQueueSize, oldCapacity); } // Adjust the queue size towards the desired capacity using an adjust of // QUEUE_ADJUSTMENT_AMOUNT (either up or down), keeping in mind the min and max // values the queue size can have. // 將任務隊列的容量從 oldCapacity 調整到 newCapacity,並非直接將任務隊列的長度調整到desiredQueueSize final int newCapacity = workQueue.adjustCapacity(desiredQueueSize, QUEUE_ADJUSTMENT_AMOUNT, minQueueSize, maxQueueSize); if (oldCapacity != newCapacity && logger.isDebugEnabled()) { logger.debug("adjusted [{}] queue size by [{}], old capacity: [{}], new capacity: [{}]", getName(), newCapacity > oldCapacity ? QUEUE_ADJUSTMENT_AMOUNT : -QUEUE_ADJUSTMENT_AMOUNT, oldCapacity, newCapacity); } } catch (ArithmeticException e) { // There was an integer overflow, so just log about it, rather than adjust the queue size logger.warn(() -> new ParameterizedMessage( "failed to calculate optimal queue size for [{}] thread pool, " + "total frame time [{}ns], tasks [{}], task execution time [{}ns]", getName(), totalRuntime, tasksPerFrame, totalNanos), e); } finally { // Finally, decrement the task count and time back to their starting values. We // do this at the end so there is no concurrent adjustments happening. We also // decrement them instead of resetting them back to zero, as resetting them back // to zero causes operations that came in during the adjustment to be uncounted int tasks = taskCount.addAndGet(-this.tasksPerFrame); assert tasks >= 0 : "tasks should never be negative, got: " + tasks; if (tasks >= this.tasksPerFrame) { // Start over, because we can potentially reach a "never adjusting" state, // // consider the following: // - If the frame window is 10, and there are 10 tasks, then an adjustment will begin. (taskCount == 10) // - Prior to the adjustment being done, 15 more tasks come in, the taskCount is now 25 // - Adjustment happens and we decrement the tasks by 10, taskCount is now 15 // - Since taskCount will now be incremented forever, it will never be 10 again, // so there will be no further adjustments logger.debug( "[{}]: too many incoming tasks while queue size adjustment occurs, resetting measurements to 0", getName()); //任務隊列的長度調整完成後,總任務耗時重置爲1,這樣可開始下一輪統計 totalTaskNanos.getAndSet(1); taskCount.getAndSet(0); startNs = System.nanoTime(); } else { // Do a regular adjustment totalTaskNanos.addAndGet(-totalNanos); } } } }
上面的代碼註釋大概描述了線程池任務隊列的長度是如何動態調整的,下面再記錄一些細節方便更好地理解整個調整過程。
關於線程池狀態的描述可參考java.util.concurrent.ThreadPoolExecutor類的源碼註釋。當線程池處於RUNNING狀態時,可接收新提交的任務而且能處理已在隊列中排隊的任務;當處於SHUTDOWN狀態時,不能接收新提交的任務,但能處理已在隊列中排隊等待的任務。當處於STOP狀態時,不能接收新提交的任務了,也不能處理在任務隊列中排隊等待的任務了,正在執行中的任務也會被強制中斷。因此,要想"正確"地關閉線程池,應該分步驟處理:這裏給一個ES中實現的處理定時任務的線程池如何關閉的示例:
org.elasticsearch.threadpool.Scheduler.terminate
static boolean terminate(ScheduledThreadPoolExecutor scheduledThreadPoolExecutor, long timeout, TimeUnit timeUnit) { //先調用 shutdown(), 線程池再也不接收新提交的任務了 scheduledThreadPoolExecutor.shutdown(); //超時等待, 若是在timeout時間內線程池中排隊的任務和正在執行的任務都執行完成了返回true,不然返回false if (awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit)) { return true; } //last resort. 在上面awaitTermination timeout後線程池中仍有任務在執行 //調用shutdownNow強制中斷任務,關閉線程池 scheduledThreadPoolExecutor.shutdownNow(); return awaitTermination(scheduledThreadPoolExecutor, timeout, timeUnit); }
這種先調用shutdown,再調用 awaitTermination,最後再調用shutdownNow的「三步曲」方式關閉線程池,awaitTermination起到了"緩衝"做用,儘量減小關閉線程池致使的任務執行結果不肯定的影響。看JDK源碼:java.util.concurrent.ScheduledThreadPoolExecutor.shutdownNow,可知:關閉線程池時,最好不要一開始就直接調用shutdownNow方法,而是分步驟地關閉線程池。
/** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. * * <p>This method does not wait for actively executing tasks to * terminate. Use {@link #awaitTermination awaitTermination} to * do that. * * <p>There are no guarantees beyond best-effort attempts to stop * processing actively executing tasks. This implementation * cancels tasks via {@link Thread#interrupt}, so any task that * fails to respond to interrupts may never terminate. * * @return list of tasks that never commenced execution. * Each element of this list is a {@link ScheduledFuture}, * including those tasks submitted using {@code execute}, * which are for scheduling purposes used as the basis of a * zero-delay {@code ScheduledFuture}. * @throws SecurityException {@inheritDoc} */ public List<Runnable> shutdownNow() { return super.shutdownNow(); }
shutdownNow方法會中止全部正在執行的任務(線程),stop all actively executing tasks。會停止全部處於等待狀態的任務 halts the processing of waiting tasks,這裏的waiting tasks,個人理解:就是在java.lang.Thread.State類中那些處於WAITING狀態的線程所執行的任務。而且,shutdownNow返回全部在任務隊列中排隊等待處理的全部任務 returns a list of the tasks that were awaiting execution.
shutdownNow方法不會等待正在執行的任務執行完成,而是經過中斷方式直接請求中斷該任務,This method does not wait for actively executing tasks to terminate。因爲,有些任務(線程)可能會忽略中斷請求、甚至屏蔽中斷請求,所以它只能作到 best-effort 結束線程。對於那些未能響應中斷的線程而言,有可能它所執行的任務就永遠不會結束了,so any task that fails to respond to interrupts may never terminate.
所以,從這裏可知:咱們在編程中 implements Runnable 接口時,run方法代碼邏輯裏面最好可以保證對中斷異常的響應,而不是直接把全部的異常都catch住,只作簡單的打印處理,也不向上拋出。
這樣顯然代價太大。而是執行完一批任務後,再進行調整。每批任務默認2000個,由tasksPerFrame變量決定每批任務個數。
任務隊列長度的調整並非直接調整到little's law 計算出來的理想任務隊列長度(desiredQueueSize)。每次調整是有限制的,長度的變化不超過QUEUE_ADJUSTMENT_AMOUNT
if (optimalCapacity > capacity + adjustmentAmount) { // adjust up final int newCapacity = Math.min(maxCapacity, capacity + adjustmentAmount); this.capacity = newCapacity; return newCapacity; } else if (optimalCapacity < capacity - adjustmentAmount) { // adjust down final int newCapacity = Math.max(minCapacity, capacity - adjustmentAmount); this.capacity = newCapacity; return newCapacity; } else { return this.capacity; }
本文記錄了ES6.3.2 SEARCH線程池的源碼實現。用戶發起的搜索請求會封裝成SEARCH操做。SEARCH操做的任務是由QueueResizingEsThreadPoolExecutor處理的,採用的任務隊列是 ResizableBlockingQueue,ResizableBlockingQueue封裝了LinkedTransferQueue,可是提供了容量限制。
隨着源源不斷的搜索請求被處理,可動態調整任務隊列的容量。SEARCH線程池採用的拒絕策略是 EsAbortPolicy,搜索請求太頻繁時線程池處理不過來時會被拒絕掉。
經過將Runnable任務封裝成TimedRunnable,可實現統計每一個搜索任務的執行時間、排隊時間。這些統計都是在線程池的afterExecute()方法中實現的。
另外,本文還分析瞭如何正確地關閉線程池,以及不恰當地關閉線程池給任務的執行結果帶來的不肯定性的分析。看完ES的線程池模塊的源碼後,對線程池的認識和理解深入了許多,後面還會分析在ES中如何實現執行定時任務、週期性任務的線程池,這種線程池可用來執行一些週期性的 ping 命令(節點之間的心跳包)等ES節點之間的通訊。以上如有錯誤,還請批評指正。
參考連接:
到這裏,ES的線程池模塊全部源碼分析都結束了。整體來講,ES對線程池的管理是"集中式"的,試想:一個大型系統,裏面有各類各樣複雜的操做,是將線程池散落在代碼各處呢,仍是在系統啓動時建立好,而後統一集中管理?
另外,因爲JDK java.util.concurrent.Future#get()獲取任務的執行結果時必須"阻塞",另外一個方法 java.util.concurrent.Future#get(long, java.util.concurrent.TimeUnit) 也是超時阻塞,這意味着線程池在提交任務執行後,在獲取結果這個步驟是必須阻塞等待的。那有沒有一種方法在獲取結果時也不阻塞呢?這就須要Listener機制(監聽器)了,Listener其實就是一種處理邏輯,一種怎樣處理某個結果(Runnable/Callable執行完成的結果)的處理邏輯。其大概思想是:當Runnable(Callable)任務執行完成後,有告終果,回調Listener,執行 處理結果的邏輯。這樣,就不用像 java.util.concurrent.Future#get() 那樣,get()阻塞直至獲取到結果,而後再執行某種處理邏輯 處理 get()獲取到的結果。
而說到異步回調處理,ES中還有一種類型的線程池,它可以執行優先級任務。該線程池採用的任務隊列是:java.util.concurrent.PriorityBlockingQueue
,具體實現是:org.elasticsearch.common.util.concurrent.PrioritizedEsThreadPoolExecutor
,這個線程池主要用來執行ES的集羣狀態更新變化操做。更進一步,org.elasticsearch.cluster.service.TaskBatcher
經過封裝 PrioritizedEsThreadPoolExecutor,實現了優先級任務的批量處理。當建立一個新索引,或者分片遷移時,集羣的狀態都會更新,這時會建立一個org.elasticsearch.cluster.service.MasterService.Batcher.UpdateTask
更新任務,UpdateTask 封裝了org.elasticsearch.cluster.ClusterStateTaskListener
監聽器實例,從而在執行Runnable任務後經過 Listener 執行通知回調。將多個UpdateTask提交給PrioritizedEsThreadPoolExecutor線程池執行,從而實現集羣的任務狀態更新。另外,將PrioritizedEsThreadPoolExecutor 的線程數量 core pool size 和 max pool size都設置成1,提交給該線程池的任務只能由一個線程順序執行,避免了多個狀態併發更新致使的數據不一致性,並且避免了使用鎖的方式來進行同步,這種思路很是值得借鑑。關於org.elasticsearch.cluster.service.MasterService
實現集羣狀態的更新的詳細實現,之後有時間再寫吧。
ES啓動時建立的線程池一覽:
[2019-08-15T18:30:38,829][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [force_merge], size [1], queue size [unbounded] [2019-08-15T18:30:44,782][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [fetch_shard_started], core [1], max [8], keep alive [5m] [2019-08-15T18:30:48,517][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [listener], size [2], queue size [unbounded] [2019-08-15T18:30:48,535][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [index], size [4], queue size [200] [2019-08-15T18:30:48,536][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [refresh], core [1], max [2], keep alive [5m] [2019-08-15T18:30:48,537][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [generic], core [4], max [128], keep alive [30s] [2019-08-15T18:30:48,537][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [rollup_indexing], size [4], queue size [4] [2019-08-15T18:30:48,538][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [warmer], core [1], max [2], keep alive [5m] [2019-08-15T18:30:48,551][DEBUG][o.e.c.u.c.QueueResizingEsThreadPoolExecutor] thread pool [debug_node/search] will adjust queue by [50] when determining automatic queue size [2019-08-15T18:30:48,552][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [search], size [7], queue size [1k] [2019-08-15T18:30:48,553][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [flush], core [1], max [2], keep alive [5m] [2019-08-15T18:30:48,553][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [fetch_shard_store], core [1], max [8], keep alive [5m] [2019-08-15T18:30:48,554][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [management], core [1], max [5], keep alive [5m] [2019-08-15T18:30:48,554][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [get], size [4], queue size [1k] [2019-08-15T18:30:48,555][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [analyze], size [1], queue size [16] [2019-08-15T18:30:48,556][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [write], size [4], queue size [200] [2019-08-15T18:30:48,556][DEBUG][o.e.t.ThreadPool ] [debug_node] created thread pool: name [snapshot], core [1], max [2], keep alive [5m]
最後扯一扯看源碼的一些體會:當開始看一個系統的源代碼時,通常是先用過它了,在使用的過程當中瞭解了一些功能,而後不知足於現狀,想要了解背後的原理。那面對一個幾十萬行代碼的系統,從哪一個地方入手開始看呢?我以爲有如下幾點可供參考:
原文:https://www.cnblogs.com/hapjin/p/11011712.html