ElasticSearch裏面各類操做都是基於線程池+回調實現的,因此這篇文章記錄一下java.util.concurrent
涉及線程池實現和ElasticSearch中如何自定義本身的線程池的。由於咱們本身開發寫代碼,也常常會用到線程池,通常不多有機會本身去擴充實現一個本身的線程池,好比下面是我常常用的套路,其中SidSearchExceptionHandler
和SidSearchRejectExecutionHandler
都只是簡單地記錄日誌。html
//任務隊列 private BlockingQueue<Runnable> taskQueue = new LinkedBlockingQueue<>(1024); //線程在執行過程當中的異常處理器 private SidSearchExceptionHandler exceptionHandler = new SidSearchExceptionHandler(); //向線程池提交任務時,拒絕策略 private SidSearchRejectExecutionHandler rejectExecutionHandler = new SidSearchRejectExecutionHandler(); //藉助Guava包中的ThreadFactoryBuild建立線程工廠(主要是方便指定線程的名稱,debug起來清晰) private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("audio-%d").setUncaughtExceptionHandler(exceptionHandler).build(); //建立線程池 private ThreadPoolExecutor executor = new ThreadPoolExecutor(nThreads, nThreads, 1, TimeUnit.DAYS, taskQueue, threadFactory, rejectExecutionHandler);
好比下面這個自定義線程執行時異常處理策略,在線程執行過程當中拋出異常時,只是簡單地打印日誌:java
public class SidSearchExceptionHandler implements Thread.UncaughtExceptionHandler { public static final Logger logger = LoggerFactory.getLogger(SidSearchExceptionHandler.class); @Override public void uncaughtException(Thread t, Throwable e) { logger.error("sid search thread pool execution error,thread name:{},cause:{},msg:{}", t.getName(), e.getCause(), e.getMessage()); } }
所以,看ES自定義的線程池實現,看下大神們是如何繼承ThreadPoolExecutor,定義異常處理策略的。node
想要執行:任務、或者叫業務邏輯的載體是:經過定義一個類,implements Runnable接口,Override Runnable接口的run()方法,在run()方法裏面寫業務邏輯處理代碼(好比將數據寫入到數據庫、向ElasticSearch提交查詢請求……)數據庫
執行 java.util.concurrent.Executor
的 execute(Runnable runnable)方法,就能提交任務,線程池中某個具體的線程會執行提交的任務。併發
當全部的任務執行完成後,線程池是否要關閉?若是須要執行可返回結果的任務怎麼辦?因而乎ExecutorService 就擴展Executor接口:public interface ExecutorService extends Executor
,提供了這些功能。elasticsearch
相比於ExecutorService
,ThreadPoolExecutor
添加了兩個方法,這樣能夠在任務執行前和執行完成後作一些處理。ide
protected void beforeExecute(Thread t, Runnable r) { } protected void afterExecute(Runnable r, Throwable t) { }
ElasticSearch中的EsThreadPoolExecutor.java
就實現了這兩個方法。工具
而真正的任務執行是在ThreadPoolExecutor的內部類Worker中run()方法實現oop
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { // 接受一個Runnable任務,而後執行ThreadFactory newThread()建立線程執行任務 Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } /** Delegates main run loop to outer runWorker */ public void run() { runWorker(this); } }
Work implements Runnable,調用ThreadPoolExecutor的 final void runWorker(Worker w)
執行任務。ui
來看一下runWorker方法中的部分代碼:
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; try { //任務執行前作一些處理 beforeExecute(wt, task); Throwable thrown = null; try { //任務執行 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { //任務執行完成後作一些處理 afterExecute(task, thrown); }
任務是由具體的線程來執行的,所以還須要考慮線程是如何建立的。ThreadFactory
定義了建立線程池的方法newThread
public interface ThreadFactory { /** * Constructs a new {@code Thread}. Implementations may also initialize * priority, name, daemon status, {@code ThreadGroup}, etc. * * @param r a runnable to be executed by new thread instance * @return constructed thread, or {@code null} if the request to * create a thread is rejected */ Thread newThread(Runnable r); }
在Executors
工具類裏面定義了具體的工廠類,用來建立線程池
/** * The default thread factory */ static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
EsExecutors
的內部類EsThreadFactory
static class EsThreadFactory implements ThreadFactory { final ThreadGroup group; final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; EsThreadFactory(String namePrefix) { this.namePrefix = namePrefix; SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); } @Override public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0); t.setDaemon(true); return t; } }
線程組、線程數量、線程名稱
在建立線程時,通常會爲之指定一個線程執行的異常處理策略。驚奇的是EsThreadFactory
裏面並無顯示地定義線程執行時的異常處理策略(可能在其餘代碼中,經過匿名內部類的方式定義了異常處理策略吧)。而是使用ThreadGroup中定義的默認異常處理策略:
public class ThreadGroup implements Thread.UncaughtExceptionHandler {
若是要自定義線程執行過程當中出現異常的處理策略,只須要 implements Thread.UncaughtExceptionHandler
而且重寫它的uncaughtException(Thread t, Throwable e)
方法便可。若是未提供線程執行過程當中出現異常的處理策略,那麼就使用該默認的異常處理策略。
看java.lang.ThreadGroup
裏面的uncaughtException(Thread t, Throwable e)
方法的註釋:
Called by the Java Virtual Machine when a thread in this thread group stops because of an uncaught exception, and the thread does not have a specific Thread.UncaughtExceptionHandler installed. The uncaughtException method of ThreadGroup does the following: If this thread group has a parent thread group, the uncaughtException method of that parent is called with the same two arguments. Otherwise, this method checks to see if there is a Thread.getDefaultUncaughtExceptionHandler default uncaught exception handler installed, and if so, its uncaughtException method is called with the same two arguments.
若是在建立線程工廠的時候指定了UncaughtExceptionHandler,經過Thread.getDefaultUncaughtExceptionHandler 就能獲取到。
//在建立線程工廠時調用setUncaughtExceptionHandler方法設置一個自定義的:UncaughtExceptionHandler //若在線程執行過程當中出現了異常,那麼 exceptionHandler 對象的uncaughtException(Thread t, Throwable e) 方 //法就會被調用 private ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("audio-%d").setUncaughtExceptionHandler(exceptionHandler).build();
Otherwise, this method determines if the Throwable argument is an instance of ThreadDeath. If so, nothing special is done. Otherwise, a message containing the thread's name, as returned from the thread's Thread.getName method, and a stack backtrace,using the Throwable's Throwable.printStackTrace method, is printed to the System err
當未指定異常處理器時,若參數Throwable e
是一個ThreadDeath對象,那麼什麼也不作。
若是參數Throwable e
不是一個ThreadDeath對象,那麼就會經過方法Throwable.printStackTrac
打印異常
public class EsThreadPoolExecutor extends ThreadPoolExecutor { private final ThreadContext contextHolder; private volatile ShutdownListener listener;
A ThreadContext is a map of string headers and a transient map of keyed objects that are associated with a thread. It allows to store and retrieve header information across method calls, network calls as well as threads spawned from a thread that has a ThreadContext associated with.
從它的構造方法中可看出,多了個ThreadContext
(多了保存一些線程執行上下文信息的功能)
EsThreadPoolExecutor(String name, int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, XRejectedExecutionHandler handler, ThreadContext contextHolder) { super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler); this.name = name; this.contextHolder = contextHolder; }
再看EsThreadPoolExecutor Override ThreadPoolExecutor 的execute()方法:
@Override public void execute(final Runnable command) { doExecute(wrapRunnable(command)); } protected void doExecute(final Runnable command) { try { super.execute(command); } catch (EsRejectedExecutionException ex) { if (command instanceof AbstractRunnable) { // If we are an abstract runnable we can handle the rejection // directly and don't need to rethrow it. try { ((AbstractRunnable) command).onRejection(ex); } finally { ((AbstractRunnable) command).onAfter(); } } else { throw ex; } } }
doExecute()先執行super.execute(command);
在這裏面有任務拒絕策略的檢查邏輯,若是任務被拒絕了,就會調用EsAbortPolicy
的rejectedExecution()
if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) //調用拒絕策略 reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); }
/** * Invokes the rejected execution handler for the given command. * Package-protected for use by ScheduledThreadPoolExecutor. */ final void reject(Runnable command) { //handler 就是在new ThreadPoolExecutor對象 時傳遞的 RejectedExecutionHandler對象 handler.rejectedExecution(command, this); }
而後能夠在doExecute()裏面多作一些額外的處理:((AbstractRunnable) command).onRejection(ex);
任務被拒絕以後發個消息通知啥的。
ElasticSearch中的拒絕策略實現EsAbortPolicy
:
public class EsAbortPolicy implements XRejectedExecutionHandler { private final CounterMetric rejected = new CounterMetric(); @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { if (r instanceof AbstractRunnable) { //判斷任務是否要強制執行 if (((AbstractRunnable) r).isForceExecution()) { BlockingQueue<Runnable> queue = executor.getQueue(); //建立ThreadPoolExecutor指定的 任務隊列 類型是SizeBlockingQueue if (!(queue instanceof SizeBlockingQueue)) { throw new IllegalStateException("forced execution, but expected a size queue"); } try { //儘管任務執行失敗了,仍是再一次把它提交到任務隊列,這樣拒絕的任務又能夠有執行機會了 ((SizeBlockingQueue) queue).forcePut(r); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException("forced execution, but got interrupted", e); } return; } } rejected.inc(); throw new EsRejectedExecutionException("rejected execution of " + r + " on " + executor, executor.isShutdown()); } @Override public long rejected() { return rejected.count(); } }
public interface XRejectedExecutionHandler extends RejectedExecutionHandler { /** * The number of rejected executions. */ long rejected(); }
XRejectedExecutionHandler
統計任務被拒絕的次數。用的是java.util.concurrent.atomic.LongAdder
,又發現了一個新的計數器:關於LongAdder與AtomicLong的對比
看完這個實現,是否是下次也能夠模仿實現:當向 線程池 提交任務被拒絕了,也可以失敗重試^~^
前面講了這麼多,都是在對比ElasticSearch中的線程池與JDK併發包中的線程池背後執行的一些原理。ElasticSearch中的自定義線程池就是基於JDK併發包中的線程池實現的。
下面來正式分析下ElasticSearch源碼中線程池建立流程。
在節點啓動過程中,org.elasticsearch.node.Node.java
開始建立線程池:
final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0]));
看ThreadPool源碼:裏面有不少實例變量,以下:
public class ThreadPool extends AbstractComponent implements Scheduler, Closeable { private Map<String, ExecutorHolder> executors = new HashMap<>(); static final ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService(); private final Map<String, ExecutorBuilder> builders; private final ThreadContext threadContext; private final ScheduledThreadPoolExecutor scheduler;
好比說:ExecutorService DIRECT_EXECUTOR = EsExecutors.newDirectExecutorService();
就是一個線程池。還有一些線程池是經過ExecutorBuilder
來建立的(Map<String, ExecutorBuilder> builders
)
線程池類型:ThreadPool的內部類ThreadPoolType
public enum ThreadPoolType { DIRECT("direct"), FIXED("fixed"), FIXED_AUTO_QUEUE_SIZE("fixed_auto_queue_size"), SCALING("scaling");
一個HashMap存儲線程池名稱,以及相應的類型。
static { HashMap<String, ThreadPoolType> map = new HashMap<>(); map.put(Names.SAME, ThreadPoolType.DIRECT); map.put(Names.GENERIC, ThreadPoolType.SCALING); map.put(Names.LISTENER, ThreadPoolType.FIXED); map.put(Names.GET, ThreadPoolType.FIXED); map.put(Names.ANALYZE, ThreadPoolType.FIXED); map.put(Names.INDEX, ThreadPoolType.FIXED); map.put(Names.WRITE, ThreadPoolType.FIXED); map.put(Names.SEARCH, ThreadPoolType.FIXED_AUTO_QUEUE_SIZE); map.put(Names.MANAGEMENT, ThreadPoolType.SCALING); map.put(Names.FLUSH, ThreadPoolType.SCALING); map.put(Names.REFRESH, ThreadPoolType.SCALING); map.put(Names.WARMER, ThreadPoolType.SCALING); map.put(Names.SNAPSHOT, ThreadPoolType.SCALING); map.put(Names.FORCE_MERGE, ThreadPoolType.FIXED); map.put(Names.FETCH_SHARD_STARTED, ThreadPoolType.SCALING); map.put(Names.FETCH_SHARD_STORE, ThreadPoolType.SCALING); THREAD_POOL_TYPES = Collections.unmodifiableMap(map); }
而真正建立線程池的代碼,是在ThreadPool的構造方法中的for循環final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext);
,這行語句的build方法。
for (@SuppressWarnings("unchecked") final Map.Entry<String, ExecutorBuilder> entry : builders.entrySet()) { final ExecutorBuilder.ExecutorSettings executorSettings = entry.getValue().getSettings(settings); final ExecutorHolder executorHolder = entry.getValue().build(executorSettings, threadContext); if (executors.containsKey(executorHolder.info.getName())) { throw new IllegalStateException("duplicate executors with name [" + executorHolder.info.getName() + "] registered"); } logger.debug("created thread pool: {}", entry.getValue().formatInfo(executorHolder.info)); executors.put(entry.getKey(), executorHolder); }
前面枚舉類 ThreadPoolType 中有四種類型的線程池,對應着上圖的三個ExecutorBuild類,看org.elasticsearch.threadpool.FixedExecutorBuilder
的build方法:建立線程池須要參數FixedExecutorSettings,須要保存線程上下文 ThreadContext
@Override ThreadPool.ExecutorHolder build(final FixedExecutorSettings settings, final ThreadContext threadContext) { int size = settings.size; int queueSize = settings.queueSize; final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name())); final ExecutorService executor = EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext); final String name; if ("write".equals(name()) && Booleans.parseBoolean(System.getProperty("es.thread_pool.write.use_bulk_as_display_name", "false"))) { name = "bulk"; } else { name = name(); } final ThreadPool.Info info = new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize)); return new ThreadPool.ExecutorHolder(executor, info); }
其中的這兩行代碼:
final ThreadFactory threadFactory = EsExecutors.daemonThreadFactory(EsExecutors.threadName(settings.nodeName, name()));
構建線程工廠。
final ExecutorService executor = EsExecutors.newFixed(settings.nodeName + "/" + name(), size, queueSize, threadFactory, threadContext);
構建線程池。
至此,ElasticSearch構建線程池整個流程就是這樣了。
構建出來的線程池被封裝在ThreadPool.ExecutorHolder
類中new ThreadPool.ExecutorHolder(executor, info);
final ThreadPool.Info info = new ThreadPool.Info(name, ThreadPool.ThreadPoolType.FIXED, size, size, null, queueSize < 0 ? null : new SizeValue(queueSize)); return new ThreadPool.ExecutorHolder(executor, info);
當全部的線程池構造完成後,在節點啓動過程當中初始化各類服務時,new 這些對象時,都須要傳一個ThreadPool 參數,各個服務就可使用線程池來執行任務了。org.elasticsearch.node.Node.java
中代碼:
//構造好各類線程池 final ThreadPool threadPool = new ThreadPool(settings, executorBuilders.toArray(new ExecutorBuilder[0])); //clusterService 用到了threadPool final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool, ClusterModule.getClusterStateCustomSuppliers(clusterPlugins)); //monitorService 用到了threadPool final MonitorService monitorService = new MonitorService(settings, nodeEnvironment, threadPool, clusterInfoService); //actionModule ActionModule actionModule = new ActionModule(false, settings, clusterModule.getIndexNameExpressionResolver(), settingsModule.getIndexScopedSettings(), settingsModule.getClusterSettings(), settingsModule.getSettingsFilter(), threadPool, pluginsService.filterPlugins(ActionPlugin.class), client, circuitBreakerService, usageService); //...在new 不少其餘 XXXService時,都須要傳一個ThreadPool參數。
所以,能夠說ThreadPool在ElasticSearch各類操做中無處不在。哈哈。
這篇文章寫得有點亂,主要兩個方面:一個是JDK包中原生線程池相關功能介紹,而後對比ElasticSearch中如何實現自定義的線程池。分析了ElasticSearch中自定義線程池任務提交時的拒絕策略和線程執行過程當中拋出異常時的異常處理策略。而後大概分析下ElasticSearch中線程池的建立流程:從org.elasticsearch.node.Node
開始:
主要涉及到如下類:
org.elasticsearch.common.util.concurrent.EsThreadPoolExecutor
org.elasticsearch.threadpool.ExecutorBuilder
的三個子類:
org.elasticsearch.threadpool.FixedExecutorBuilder
org.elasticsearch.threadpool.AutoQueueAdjustingExecutorBuilder
org.elasticsearch.threadpool.ScalingExecutorBuilder
org.elasticsearch.threadpool.ThreadPool