合理利用線程池可以帶來三個好處。java
第一:下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。git
第二:提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。github
第三:提升線程的可管理性。線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。可是要作到合理的利用線程池,必須對其原理了如執掌。數據庫
咱們能夠經過ThreadPoolExecutor來建立一個線程池數組
1 new ThreadPoolExecutor(corePoolSize, maximumPoolSize, keepAliveTime, milliseconds,runnableTaskQueue, handler);
建立一個線程池須要輸入幾個參數:緩存
咱們可使用execute提交的任務,可是execute方法沒有返回值,因此沒法判斷任務是否被線程池執行成功。經過如下代碼可知execute方法輸入的任務是一個Runnable類的實例。服務器
1 threadsPool.execute(new Runnable() { 2 @Override 3 public void run() { 4 // TODO Auto-generated method stub 5 } 6 });
咱們也可使用submit 方法來提交任務,它會返回一個future,那麼咱們能夠經過這個future來判斷任務是否執行成功,經過future的get方法來獲取返回值,get方法會阻塞住直到任務完成,而使用get(long timeout, TimeUnit unit)方法則會阻塞一段時間後當即返回,這時有可能任務沒有執行完。併發
1 Future<Object> future = executor.submit(harReturnValuetask); 2 try { 3 Object s = future.get(); 4 } catch (InterruptedException e) { 5 // 處理中斷異常 6 } catch (ExecutionException e) { 7 // 處理沒法執行任務異常 8 } finally { 9 // 關閉線程池 10 executor.shutdown(); 11 }
咱們能夠經過調用線程池的shutdown或shutdownNow方法來關閉線程池,它們的原理是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。可是它們存在必定的區別,shutdownNow首先將線程池的狀態設置成STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。ide
只要調用了這兩個關閉方法的其中一個,isShutdown方法就會返回true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。至於咱們應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用shutdown來關閉線程池,若是任務不必定要執行完,則能夠調用shutdownNow。源碼分析
流程分析: 線程池的主要工做流程以下圖:
從上圖咱們能夠看出,當提交一個新任務到線程池時,線程池的處理流程以下:
源碼分析。上面的流程分析讓咱們很直觀的瞭解了線程池的工做原理,讓咱們再經過源代碼來看看是如何實現的。線程池執行任務的方法以下:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 //若是線程數小於基本線程數,則建立線程並執行當前任務 5 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { 6 //如線程數大於等於基本線程數或線程建立失敗,則將當前任務放到工做隊列中。 7 if (runState == RUNNING && workQueue.offer(command)) { 8 if (runState != RUNNING || poolSize == 0) 9 ensureQueuedTaskHandled(command); 10 } 11 //若是線程池不處於運行中或任務沒法放入隊列,而且當前線程數量小於最大容許的線程數量, 12 則建立一個線程執行任務。 13 else if (!addIfUnderMaximumPoolSize(command)) 14 //拋出RejectedExecutionException異常 15 reject(command); // is shutdown or saturated 16 } 17 }
工做線程。線程池建立線程時,會將線程封裝成工做線程Worker,Worker在執行完任務後,還會無限循環獲取工做隊列裏的任務來執行。咱們能夠從Worker的run方法裏看到這點:
1 public void run() { 2 try { 3 Runnable task = firstTask; 4 firstTask = null; 5 while (task != null || (task = getTask()) != null) { 6 runTask(task); 7 task = null; 8 } 9 } finally { 10 workerDone(this); 11 } 12 }
要想合理的配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來進行分析:
任務的性質:CPU密集型任務,IO密集型任務和混合型任務。
任務的優先級:高,中和低。
任務的執行時間:長,中和短。
任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。
任務性質不一樣的任務能夠用不一樣規模的線程池分開處理。CPU密集型任務配置儘量小的線程,如配置Ncpu+1個線程的線程池。IO密集型任務則因爲線程並非一直在執行任務,則配置儘量多的線程,如2*Ncpu。混合型的任務,若是能夠拆分,則將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐率要高於串行執行的吞吐率,若是這兩個任務執行時間相差太大,則不必進行分解。咱們能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數。
優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。
執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。
依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。
建議使用有界隊列,有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點,好比幾千。有一次咱們組使用的後臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,經過排查發現是數據庫出現了問題,致使執行SQL變得很是緩慢,由於後臺任務線程池裏的任務全是須要向數據庫查詢和插入數據的,因此致使線程池裏的工做線程所有阻塞住,任務積壓在線程池裏。若是當時咱們設置成無界隊列,線程池的隊列就會愈來愈多,有可能會撐滿內存,致使整個系統不可用,而不僅是後臺任務出現問題。固然咱們的系統全部的任務是用的單獨的服務器部署的,而咱們使用不一樣規模的線程池跑不一樣類型的任務,可是出現這樣問題時也會影響到其餘任務。
經過線程池提供的參數進行監控。線程池裏有一些屬性在監控線程池的時候可使用
經過擴展線程池進行監控。經過繼承線程池並重寫線程池的beforeExecute,afterExecute和terminated方法,咱們能夠在任務執行前,執行後和線程池關閉前幹一些事情。如監控任務的平均執行時間,最大執行時間和最小執行時間等。這幾個方法在線程池裏是空方法。以下:
1 protected void beforeExecute(Thread t, Runnable r) { }
在dubbo-common 模塊的threadpool包下體現,以下圖所示:
com.alibaba.dubbo.common.threadpool.ThreadPool ,線程池接口。代碼以下:
1 //@SPI("fixed")註解,Dubbo SPI擴展點,默認爲"fixed"。 2 @SPI("fixed") 3 public interface ThreadPool { 4 /** 5 * @Adaptive({Constants.THREADPOOL_KEY}) 註解,基於Dubbo SPI Adaptive機制,加載對應的線程池實現,使用URL.threadpool屬性。 6 * getExecutor(url)方法,得到對應的線程池的執行器 7 * 8 */ 9 @Adaptive({Constants.THREADPOOL_KEY}) 10 Executor getExecutor(URL url); 11 12 }
com.alibaba.dubbo.common.threadpool.support.fixed.FixedThreadPool ,實現ThreadPool接口,固定大小線程池,啓動時創建線程,不關閉,一直持有。代碼以下:
1 public class FixedThreadPool implements ThreadPool { 2 3 @Override 4 public Executor getExecutor(URL url) { 5 //線程名 6 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); 7 //線程數 8 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); 9 //隊列數 10 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); 11 //建立執行器 12 return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 13 /** 14 * 根據不一樣的隊列數,使用不一樣的隊列實現: 15 * queues == 0,SynchronousQueue對象。 16 * queues < 0,LinkedBlockingQueue對象。 17 * queues > 0,帶隊列數的LinkedBlockingQueue對象。 18 */ 19 queues == 0 ? new SynchronousQueue<Runnable>() : 20 (queues < 0 ? new LinkedBlockingQueue<Runnable>() 21 : new LinkedBlockingQueue<Runnable>(queues)), 22 /** 23 * 建立NamedThreadFactory對象,用於生成線程名 24 * 建立AbortPolicyWithReport對象,用於當任務添加到線程池中被拒絕時。 25 */ 26 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); 27 } 28 }
推薦閱讀:
com.alibaba.dubbo.common.threadpool.support.cached.CachedThreadPool ,實現ThreadPool接口,緩存線程池,空閒必定時長,自動刪除,須要時重建。代碼以下:
1 public class CachedThreadPool implements ThreadPool { 2 3 @Override 4 public Executor getExecutor(URL url) { 5 //線程池名 6 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); 7 //核心線程數 8 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); 9 //最大線程數 10 int threads = url.getParameter(Constants.THREADS_KEY, Integer.MAX_VALUE); 11 //隊列數 12 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); 13 //線程存活時長 14 int alive = url.getParameter(Constants.ALIVE_KEY, Constants.DEFAULT_ALIVE); 15 //建立執行器 16 return new ThreadPoolExecutor(cores, threads, alive, TimeUnit.MILLISECONDS, 17 queues == 0 ? new SynchronousQueue<Runnable>() : 18 (queues < 0 ? new LinkedBlockingQueue<Runnable>() 19 : new LinkedBlockingQueue<Runnable>(queues)), 20 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); 21 } 22 }
com.alibaba.dubbo.common.threadpool.support.limited.LimitedThreadPool ,實現ThreadPool接口,可伸縮線程池,但池中的線程池只會增加不會收縮。只增加不收縮的目的是爲了不收縮時忽然來了大流量引發的性能問題。代碼以下:
1 public class LimitedThreadPool implements ThreadPool { 2 3 @Override 4 public Executor getExecutor(URL url) { 5 //線程池名 6 String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME); 7 //核心線程數 8 int cores = url.getParameter(Constants.CORE_THREADS_KEY, Constants.DEFAULT_CORE_THREADS); 9 //最大線程數 10 int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS); 11 //隊列數 12 int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES); 13 /** 14 * 和CachedThreadPool實現是基本一致的,差別點在alive == Integer.MAX_VALUE,空閒時間無限大,即不會刪除。 15 */ 16 return new ThreadPoolExecutor(cores, threads, Long.MAX_VALUE, TimeUnit.MILLISECONDS, 17 queues == 0 ? new SynchronousQueue<Runnable>() : 18 (queues < 0 ? new LinkedBlockingQueue<Runnable>() 19 : new LinkedBlockingQueue<Runnable>(queues)), 20 new NamedInternalThreadFactory(name, true), new AbortPolicyWithReport(name, url)); 21 } 22 23 }
com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport ,實現 java.util.concurrent.ThreadPoolExecutor.AbortPolicy,拒絕策略實現類。打印JStack,分析線程狀態 代碼以下:
1 /** 2 * AbortPolicyWithReport實現自ThreadPoolExecutor.AbortPolicy,拒絕策略實現類, 3 * 打印JStack,分析線程狀態。 4 */ 5 public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy { 6 7 8 protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class); 9 /** 10 * 線程名 11 */ 12 private final String threadName; 13 14 /** 15 * URL 對象 16 */ 17 private final URL url; 18 19 /** 20 * 最後打印時間 21 */ 22 private static volatile long lastPrintTime = 0; 23 24 /** 25 * 信號量,大小爲1。 26 */ 27 private static Semaphore guard = new Semaphore(1); 28 29 public AbortPolicyWithReport(String threadName, URL url) { 30 this.threadName = threadName; 31 this.url = url; 32 } 33 34 @Override 35 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 36 /** 37 * 打印告警日誌 38 */ 39 String msg = String.format("Thread pool is EXHAUSTED!" + 40 " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," + 41 " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!", 42 threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(), 43 e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(), 44 url.getProtocol(), url.getIp(), url.getPort()); 45 logger.warn(msg); 46 // 打印 JStack,分析線程狀態。 47 dumpJStack(); 48 //拋出 RejectedExecutionException 異常 49 throw new RejectedExecutionException(msg); 50 } 51 52 private void dumpJStack() { 53 long now = System.currentTimeMillis(); 54 //每 10 分鐘,打印一次。 55 //dump every 10 minutes 56 if (now - lastPrintTime < 10 * 60 * 1000) { 57 return; 58 } 59 //得到信號量 60 if (!guard.tryAcquire()) { 61 return; 62 } 63 //建立線程池,後臺執行打印JStack 64 Executors.newSingleThreadExecutor().execute(new Runnable() { 65 @Override 66 public void run() { 67 //得到路徑 68 String dumpPath = url.getParameter(Constants.DUMP_DIRECTORY, System.getProperty("user.home")); 69 70 SimpleDateFormat sdf; 71 //得到系統 72 String OS = System.getProperty("os.name").toLowerCase(); 73 74 // window system don't support ":" in file name 75 if(OS.contains("win")){ 76 sdf = new SimpleDateFormat("yyyy-MM-dd_HH-mm-ss"); 77 }else { 78 sdf = new SimpleDateFormat("yyyy-MM-dd_HH:mm:ss"); 79 } 80 81 String dateStr = sdf.format(new Date()); 82 //得到輸出流 83 FileOutputStream jstackStream = null; 84 try { 85 jstackStream = new FileOutputStream(new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr)); 86 //打印JStack 87 JVMUtil.jstack(jstackStream); 88 } catch (Throwable t) { 89 logger.error("dump jstack error", t); 90 } finally { 91 //釋放信號量 92 guard.release(); 93 //釋放輸出流 94 if (jstackStream != null) { 95 try { 96 jstackStream.flush(); 97 jstackStream.close(); 98 } catch (IOException e) { 99 } 100 } 101 } 102 //記錄最後打印時間 103 lastPrintTime = System.currentTimeMillis(); 104 } 105 }); 106 107 } 108 109 }
推薦閱讀: