本文主要研究一下Elasticsearch的EsThreadPoolExecutorjava
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecutor.javagit
public class EsThreadPoolExecutor extends ThreadPoolExecutor { private final ThreadContext contextHolder; private volatile ShutdownListener listener; private final Object monitor = new Object(); /** * Name used in error reporting. */ private final String name; final String getName() { return name; } EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, ThreadContext contextHolder) { this(name, corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, new EsAbortPolicy(), contextHolder); } @SuppressForbidden(reason = "properly rethrowing errors, see EsExecutors.rethrowErrors") EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; } public void shutdown(ShutdownListener listener) { synchronized (monitor) { if (this.listener != null) { throw new IllegalStateException("Shutdown was already called on this thread pool"); } if (isTerminated()) { listener.onTerminated(); } else { this.listener = listener; } } shutdown(); } @Override protected synchronized void terminated() { super.terminated(); synchronized (monitor) { if (listener != null) { try { listener.onTerminated(); } finally { listener = null; } } } } public interface ShutdownListener { void onTerminated(); } @Override public void execute(Runnable command) { command = wrapRunnable(command); try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } } @Override protected void afterExecute(Runnable r, Throwable t) { super.afterExecute(r, t); EsExecutors.rethrowErrors(unwrap(r)); assert assertDefaultContext(r); } private boolean assertDefaultContext(Runnable r) { try { assert contextHolder.isDefaultContext() : "the thread context is not the default context and the thread [" + Thread.currentThread().getName() + "] is being returned to the pool after executing [" + r + "]"; } catch (IllegalStateException ex) { // sometimes we execute on a closed context and isDefaultContext doen't bypass the ensureOpen checks // this must not trigger an exception here since we only assert if the default is restored and // we don't really care if we are closed if (contextHolder.isClosed() == false) { throw ex; } } return true; } /** * Returns a stream of all pending tasks. This is similar to {@link #getQueue()} but will expose the originally submitted * {@link Runnable} instances rather than potentially wrapped ones. */ public Stream<Runnable> getTasks() { return this.getQueue().stream().map(this::unwrap); } @Override public final String toString() { StringBuilder b = new StringBuilder(); b.append(getClass().getSimpleName()).append('['); b.append("name = ").append(name).append(", "); if (getQueue() instanceof SizeBlockingQueue) { @SuppressWarnings("rawtypes") SizeBlockingQueue queue = (SizeBlockingQueue) getQueue(); b.append("queue capacity = ").append(queue.capacity()).append(", "); } appendThreadPoolExecutorDetails(b); /* * ThreadPoolExecutor has some nice information in its toString but we * can't get at it easily without just getting the toString. */ b.append(super.toString()).append(']'); return b.toString(); } /** * Append details about this thread pool to the specified {@link StringBuilder}. All details should be appended as key/value pairs in * the form "%s = %s, " * * @param sb the {@link StringBuilder} to append to */ protected void appendThreadPoolExecutorDetails(final StringBuilder sb) { } protected Runnable wrapRunnable(Runnable command) { return contextHolder.preserveContext(command); } protected Runnable unwrap(Runnable runnable) { return contextHolder.unwrap(runnable); } }
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/XRejectedExecutionHandler.javagithub
public interface XRejectedExecutionHandler extends RejectedExecutionHandler { /** * The number of rejected executions. */ long rejected(); }
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsAbortPolicy.javaapp
public class EsAbortPolicy implements XRejectedExecutionHandler { private final CounterMetric rejected = new CounterMetric(); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r instanceof AbstractRunnable) { if (((AbstractRunnable) r).isForceExecution()) { BlockingQueue<Runnable> queue = executor.getQueue(); if (!(queue instanceof SizeBlockingQueue)) { throw new IllegalStateException("forced execution, but expected a size queue"); } try { ((SizeBlockingQueue) queue).forcePut(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("forced execution, but got interrupted", e); } return; } } rejected.inc(); throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); } @Override public long rejected() { return rejected.count(); } }
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.javaelasticsearch
static class ForceQueuePolicy implements XRejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { try { // force queue policy should only be used with a scaling queue assert executor.getQueue() instanceof ExecutorScalingQueue; executor.getQueue().put(r); } catch (final InterruptedException e) { // a scaling queue never blocks so a put to it can never be interrupted throw new AssertionError(e); } } @Override public long rejected() { return 0; } }
elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/AbstractRunnable.javaide
public abstract class AbstractRunnable implements Runnable { /** * Should the runnable force its execution in case it gets rejected? */ public boolean isForceExecution() { return false; } @Override public final void run() { try { doRun(); } catch (Exception t) { onFailure(t); } finally { onAfter(); } } /** * This method is called in a finally block after successful execution * or on a rejection. */ public void onAfter() { // nothing by default } /** * This method is invoked for all exception thrown by {@link #doRun()} */ public abstract void onFailure(Exception e); /** * This should be executed if the thread-pool executing this action rejected the execution. * The default implementation forwards to {@link #onFailure(Exception)} */ public void onRejection(Exception e) { onFailure(e); } /** * This method has the same semantics as {@link Runnable#run()} * @throws InterruptedException if the run method throws an InterruptedException */ protected abstract void doRun() throws Exception; }