聊聊Elasticsearch的EsThreadPoolExecutor

本文主要研究一下Elasticsearch的EsThreadPoolExecutorjava

EsThreadPoolExecutor

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.javagit

public class EsThreadPoolExecutor extends ThreadPoolExecutor {

    private final ThreadContext contextHolder;
    private volatile ShutdownListener listener;

    private final Object monitor = new Object();
    /**
     * Name used in error reporting.
     */
    private final String name;

    final String getName() {
        return name;
    }

    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) {
        this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder);
    }

    @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors")
    EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit,
            BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler,
            ThreadContext contextHolder) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
        this.name = name;
        this.contextHolder = contextHolder;
    }

    public void shutdown(ShutdownListener listener) {
        synchronized (monitor) {
            if (this.listener != null) {
                throw new IllegalStateException("Shutdown was already called on this thread pool");
            }
            if (isTerminated()) {
                listener.onTerminated();
            } else {
                this.listener = listener;
            }
        }
        shutdown();
    }

    @Override
    protected synchronized void terminated() {
        super.terminated();
        synchronized (monitor) {
            if (listener != null) {
                try {
                    listener.onTerminated();
                } finally {
                    listener = null;
                }
            }
        }
    }

    public interface ShutdownListener {
        void onTerminated();
    }

    @Override
    public void execute(Runnable command) {
        command = wrapRunnable(command);
        try {
            super.execute(command);
        } catch (EsRejectedExecutionException ex) {
            if (command instanceof AbstractRunnable) {
                // If we are an abstract runnable we can handle the rejection
                // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); EsExecutors.rethrowErrors(unwrap(r)); assert assertDefaultContext(r); } private boolean assertDefaultContext(Runnable r) { try { assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" + Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]"; } catch (IllegalStateException ex) { // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks
            // this must not trigger an exception here since we only assert if the default is restored and
            // we don't really care if we are closed if (contextHolder.isClosed() == false) { throw ex; } } return true; } /** * Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted * {@link Runnable} instances rather than potentially wrapped ones. */ public Stream<Runnable> getTasks() { return this.getQueue().stream().map(this::unwrap); } @Override public final String toString() { StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()).append('['); b.append("name = ").append(name).append(", "); if (getQueue() instanceof SizeBlockingQueue) { @SuppressWarnings("rawtypes") SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); b.append("queue capacity = ").append(queue.capacity()).append(", "); } appendThreadPoolExecutorDetails(b); /* * ThreadPoolExecutor has some nice information in its toString but we * can't get at it easily without just getting the toString.
         */
        b.append(super.toString()).append(']');
        return b.toString();
    }

    /**
     * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in
     * the form "%s = %s, "
     *
     * @param sb the {@link StringBuilder} to append to
     */
    protected void appendThreadPoolExecutorDetails(final StringBuilder sb) {

    }

    protected Runnable wrapRunnable(Runnable command) {
        return contextHolder.preserveContext(command);
    }

    protected Runnable unwrap(Runnable runnable) {
        return contextHolder.unwrap(runnable);
    }
}
複製代碼
  • EsThreadPoolExecutor繼承了ThreadPoolExecutor,它提供了兩個構造器,它們要求RejectedExecutionHandler爲XRejectedExecutionHandler類型,其中一個構造器默認爲EsAbortPolicy,它們還要求傳入ThreadContext
  • 它覆蓋了terminated、execute、afterExecute方法,其中terminated方法會回調listener.onTerminated();execute方法會捕獲EsRejectedExecutionException異常,在command爲AbstractRunnable類型時回調其onRejection及onAfter方法;afterExecute方法會執行EsExecutors.rethrowErrors(unwrap(r))方法
  • 它提供了wrapRunnable及unwrap方法,分別會調用contextHolder.preserveContext及contextHolder.unwrap方法

XRejectedExecutionHandler

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.javagithub

public interface XRejectedExecutionHandler extends RejectedExecutionHandler {

    /**
     * The number of rejected executions.
     */
    long rejected();
}
複製代碼
  • XRejectedExecutionHandler接口繼承了RejectedExecutionHandler接口,它定義了rejected方法返回rejected的數量;它有兩個實現類分別爲EsAbortPolicy及ForceQueuePolicy

EsAbortPolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.javabash

public class EsAbortPolicy implements XRejectedExecutionHandler {
    private final CounterMetric rejected = new CounterMetric();

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
        if (r instanceof AbstractRunnable) {
            if (((AbstractRunnable) r).isForceExecution()) {
                BlockingQueue<Runnable> queue = executor.getQueue();
                if (!(queue instanceof SizeBlockingQueue)) {
                    throw new IllegalStateException("forced execution, but expected a size queue");
                }
                try {
                    ((SizeBlockingQueue) queue).forcePut(r);
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                    throw new IllegalStateException("forced execution, but got interrupted", e);
                }
                return;
            }
        }
        rejected.inc();
        throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown());
    }

    @Override
    public long rejected() {
        return rejected.count();
    }
}
複製代碼
  • EsAbortPolicy實現了XRejectedExecutionHandler接口,其內部使用CounterMetric類維護rejected數量,而rejected方法直接返回該值;rejectedExecution方法對AbstractRunnable類型的runnable會判斷是否isForceExecution,且是SizeBlockingQueue,則調用SizeBlockingQueue的forcePut方法從新force執行該runnable,以後就是遞增rejected計數

ForceQueuePolicy

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.javaapp

static class ForceQueuePolicy implements XRejectedExecutionHandler {

        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                // force queue policy should only be used with a scaling queue
                assert executor.getQueue() instanceof ExecutorScalingQueue;
                executor.getQueue().put(r);
            } catch (final InterruptedException e) {
                // a scaling queue never blocks so a put to it can never be interrupted
                throw new AssertionError(e);
            }
        }

        @Override
        public long rejected() {
            return 0;
        }

    }
複製代碼
  • ForceQueuePolicy實現了XRejectedExecutionHandler接口,它的rejectedExecution方法僅僅對ExecutorScalingQueue進行從新入隊操做,而rejected方法返回0

AbstractRunnable

elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.javaelasticsearch

public abstract class AbstractRunnable implements Runnable {

    /**
     * Should the runnable force its execution in case it gets rejected?
     */
    public boolean isForceExecution() {
        return false;
    }

    @Override
    public final void run() {
        try {
            doRun();
        } catch (Exception t) {
            onFailure(t);
        } finally {
            onAfter();
        }
    }

    /**
     * This method is called in a finally block after successful execution
     * or on a rejection.
     */
    public void onAfter() {
        // nothing by default
    }

    /**
     * This method is invoked for all exception thrown by {@link #doRun()}
     */
    public abstract void onFailure(Exception e);

    /**
     * This should be executed if the thread-pool executing this action rejected the execution.
     * The default implementation forwards to {@link #onFailure(Exception)}
     */
    public void onRejection(Exception e) {
        onFailure(e);
    }

    /**
     * This method has the same semantics as {@link Runnable#run()}
     * @throws InterruptedException if the run method throws an InterruptedException
     */
    protected abstract void doRun() throws Exception;
}
複製代碼
  • AbstractRunnable聲明實現Runnable接口,它的run方法分別會回調doRun、onFailure、onAfter方法;另外它還定義了isForceExecution方法用於肯定當rejected的時候是否force execution

小結

  • EsThreadPoolExecutor繼承了ThreadPoolExecutor,它提供了兩個構造器,它們要求RejectedExecutionHandler爲XRejectedExecutionHandler類型,其中一個構造器默認爲EsAbortPolicy,它們還要求傳入ThreadContext
  • 它覆蓋了terminated、execute、afterExecute方法,其中terminated方法會回調listener.onTerminated();execute方法會捕獲EsRejectedExecutionException異常,在command爲AbstractRunnable類型時回調其onRejection及onAfter方法;afterExecute方法會執行EsExecutors.rethrowErrors(unwrap(r))方法
  • XRejectedExecutionHandler接口繼承了RejectedExecutionHandler接口,它定義了rejected方法返回rejected的數量;它有兩個實現類分別爲EsAbortPolicy及ForceQueuePolicy
  • EsAbortPolicy實現了XRejectedExecutionHandler接口,其內部使用CounterMetric類維護rejected數量,而rejected方法直接返回該值;rejectedExecution方法對AbstractRunnable類型的runnable會判斷是否isForceExecution,且是SizeBlockingQueue,則調用SizeBlockingQueue的forcePut方法從新force執行該runnable,以後就是遞增rejected計數
  • ForceQueuePolicy實現了XRejectedExecutionHandler接口,它的rejectedExecution方法僅僅對ExecutorScalingQueue進行從新入隊操做,而rejected方法返回0
  • AbstractRunnable聲明實現Runnable接口,它的run方法分別會回調doRun、onFailure、onAfter方法;另外它還定義了isForceExecution方法用於肯定當rejected的時候是否force execution

doc

相關文章
相關標籤/搜索