本文主要分析Dubbo線程池的構建過程,主要介紹官方文檔中有關於ThreadPool的種類:java
各類類型的線程池,內部就是根據規則建立不一樣的ThreadPoolExecutor對象,那咱們先簡單回顧一下線程池的基本知識,其構造方法以下所示:緩存
public ThreadPoolExecutor( int corePoolSize, // 線程池核心線程數、常駐線程數。 int maximumPoolSize, // 線程池中最大線程數量 long keepAliveTime, // 線程保持活躍時間,(若是線程建立,並空閒 //指定值後,線程會被回收,0表示不開啓該特性,其範圍針對 // corePoolSize的線程) TimeUnit unit, // keepAliveTime的時間單位。 BlockingQueue< Runnable> workQueue,// 任務隊列 ThreadFactory threadFactory, // 線程工廠類,通常經過該線程工廠,爲線程命名,以便區分線程。 RejectedExecutionHandler handler) // 拒絕策略。
提交任務流程(線程建立流程)架構
public class FixedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<runnable>() : (queues < 0 ? new LinkedBlockingQueue<runnable>() : new LinkedBlockingQueue<runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
實現要點:併發
public class CachedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<runnable>() : (queues < 0 ? new LinkedBlockingQueue<runnable>() : new LinkedBlockingQueue<runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
實現要點:既然要實現線程能夠被回收,則必然要設置 keepAliveTime。 故對應線程池核心參數設置,對應以下:分佈式
public class LimitedThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, queues == 0 ? new SynchronousQueue<runnable>() : (queues < 0 ? new LinkedBlockingQueue<runnable>() : new LinkedBlockingQueue<runnable>(queues)), new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); } }
就不回收,與cached不一樣的就是 keepAliveTime 的取值不一樣,limited 取值爲:Long.MAX_VALUE,其餘與 cached 相同。ide
其核心實現主要由 TaskQueue、EagerThreadPoolExecutor 共同完成。 首先,咱們關注一下 TaskQueued 的 offer方法。高併發
public boolean offer(Runnable runnable) { if (executor == null) { throw new RejectedExecutionException("The task queue does not have executor!"); } int currentPoolThreadSize = executor.getPoolSize(); // [@1](https://my.oschina.net/u/1198) // have free worker. put task into queue to let the worker deal with task. if (executor.getSubmittedTaskCount() < currentPoolThreadSize) { // @2 return super.offer(runnable); } // return false to let executor create new worker. if (currentPoolThreadSize < executor.getMaximumPoolSize()) { // [@3](https://my.oschina.net/u/2648711) return false; } // currentPoolThreadSize >= max // @4 return super.offer(runnable); }
代碼@1:獲取當前線程池中線程的數量。源碼分析
代碼@2:若是當前已提交到線程池中的任務數量小於當前存在在的線程數,則走默認的提交流程。url
代碼@3:若是當前已提交到線程中的數量大於當前的線程池,併線程池中數量並未達到線程池容許建立的最大線程數時,則返回false,並不入隊,其效果是會建立新的線程來執行。.net
代碼@4:若是當前線程池中的線程已達到容許建立的最大線程數後,走默認的提交任務邏輯。 EagerThreadPoolExecutor#execute
public void execute(Runnable command) { if (command == null) { throw new NullPointerException(); } // do not increment in method beforeExecute! submittedTaskCount.incrementAndGet(); // @1 try { super.execute(command); } catch (RejectedExecutionException rx) { // retry to offer the task into queue. final TaskQueue queue = (TaskQueue) super.getQueue(); try { if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException("Queue capacity is full."); } } catch (InterruptedException x) { submittedTaskCount.decrementAndGet(); throw new RejectedExecutionException(x); } } catch (Throwable t) { // decrease any way submittedTaskCount.decrementAndGet(); // @2 } }
其核心實現邏輯:若是提交任務失敗,則再走一次默認的任務提交流程。 最總後結一下Eager的核心特性。
public class EagerThreadPool implements ThreadPool { @Override public Executor getExecutor(URL url) { String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); // init queue and executor TaskQueue<runnable> taskQueue = new TaskQueue<runnable>(queues <= 0 ? 1 : queues); EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, taskQueue, new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url)); taskQueue.setExecutor(executor); return executor; } }
其核心特性以下:
>做者介紹:丁威,《RocketMQ技術內幕》做者,RocketMQ 社區優秀佈道師、CSDN2019博客之星TOP10,維護公衆號:中間件興趣圈目前已陸續發表源碼分析Java集合、Java 併發包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源碼專欄。能夠點擊連接加入中間件知識星球 ,一塊兒探討高併發、分佈式服務架構,交流源碼。 </runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable></runnable>