在項目中,避免不了要使用多線程,爲了不資源浪費和線程數的不可控而出現未知的問題,咱們通常都會使用線程池;java
JDK中給咱們提供了多種能夠當即使用的建立線程池的方法,其都是基於ThreadPoolExecutor建立的線程池,因此ThreadPoolExecutor是線程池的基礎,咱們主要分析一下ThreadPoolExecutor的使用,以後再分析一下快速建立線程池的方法優劣,再最後分析一下多線程的更加靈活運用;數組
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
上邊的是AllArgsConstructor,一共有七個參數;緩存
沒有任何容量的隊列,能夠理解爲容量爲0的隊列,當處理任務的線程數量大於等於corePoolSize時,新進任務會直接建立線程執行,若線程數量等於maximumPoolSizes 則會拋出RejectedExecutionException異常;bash
無界隊列,內部經過Node鏈表實現,若使用這種隊列則線程池中線程最大數量爲corePoolSize,參數maximumPoolSizes是無用的,由於超過corePoolSize的任務都會被放進queue中,且queue無界,不會觸發corePoolSize以外的線程建立;服務器
無界隊列並不表明真的無界,只是說明該隊列可支持無線長度,該隊列支持一個有參構造,可設置隊列長度,這樣maximumPoolSizes就不會失效了;多線程
有界隊列,必須設置一個固定容量,所以稱之爲有界,內部數組實現,可定義公平與非公平策略;併發
A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.app
當隊列和線程都處於Full狀態時,新進任務的處理策略,有以下3個默認給出的實現策略供咱們使用,固然也能夠自定義,只要實現其less
rejectedExecution(Runnable r, ThreadPoolExecutor e)方法就能夠了;ide
查看其實現:
/** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); }
可知不處理任務,始終拋出一個RejectedExecutionException異常;
該策略可很好的控制服務的線程數量和隊列的容量,但應該catch異常信息返回狀態碼,例如在app請求服務的時候返回服務繁忙請稍後再試的提示;
其rejectedExecution方法實現爲空,從其註釋也能夠看出,該策略默默的丟棄了新進任務,沒有任何提示及異常;
因爲會致使任務丟失且不可感知,所以應該在特定的場景下使用;
/** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } }
刪除隊列中最頭部的任務,而後將新進任務插入隊列尾部;
該策略會致使任務丟失,與2.3.2同樣,除非特定場景不然我的不建議使用;
/** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } }
若是線程池沒有掛掉,則使用新進任務的線程直接執行任務,而非等待使用線程池中的線程;
此種策略應該在服務內線程數量可控的範圍內,或在咱們很瞭解服務的線程使用狀況下使用;
若短期內有大量的新任務產生,此策略會致使服務內線程數目飆升,與咱們使用線程池的初衷不符;
建立固定容量的線程池:
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
經過內部實現咱們看到也是利用ThreadPoolExecutor建立,只不過是默認了部分入參而已.
corePoolSize 和 maximumPoolSize 均爲int nThreads,從而限制了線程的數量,採用無界隊列LinkedBlockingQueue使沒有得到線程資源的任務所有進入隊列等待,任務不會丟失;參數keepAliveTime默認爲0,由於沒有多於核心線程數的線程被建立,因此無需設置此值;
固定線程池能夠很好的控制服務中線程的數量,很好的避免的線程數量激增,控制了CPU的佔用率,但也會帶來另外的問題,如果隊列中有大量的任務阻塞,勢必會致使內存飆升;所以,固定線程池適用於任務併發數量可控的,短期內不會有大量任務提交的場景;
若在短期內有大量任務併發,可是每一個任務的運算不會佔用很長時間,能夠考慮下面的線程池 : ExecutorService newCachedThreadPool();
緩存線程池 :
public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
corePoolSize設置爲0,即沒有核心線程數量,全部的線程都是用完後超過keepAliveTime時間後就銷燬;maximumPoolSize 設置爲Integer.MAX_VALUE,基本能夠理解爲無上限,阻塞隊列採用同步隊列SynchronousQueue,全部任務即時提交線程執行,即不會有任務被阻塞在隊列中.
該線程池適用於短期內有任務併發,但任務都是在短期內能夠處理完畢的;maximumPoolSize的值保證了全部任務都能被線程或新建立線程當即處理,keepAliveTime = 60L使得大量線程在處理完當下任務時能夠保持活躍等待下一個任務到來,避免每次都會新建立線程帶來的開銷,在支持建立大量線程的狀況下有保證了線程不會被浪費,當線程空閒時間到達指定時間後銷燬,又避免了大量線程同時存在,控制的線程的數量;
對於任務處理時間長的場景,線程佔用時間過長,每次新進任務都會建立新的線程,線程數會上升,該線程池就不適用了須要考慮其餘的線程池;
單線程線程池;採用無界隊列的核心線程和最大線程都是1的線程池,全部任務會被串行的執行;
支持延遲執行任務的線程池,maximumPoolSize爲Integer.MAX_VALUE;可用於定時任務的執行;還有不少靈活的用法,詳細的能夠點擊直接看第二節;
工做竊取線程池,參數爲指定併發等級,默認爲服務器CPU的數量;該線程池內部基於ForkJoinPool,具體使用請點擊連接跳轉;
下面提供一個自定義線程池可直接使用,須要結合項目實際狀況適當修改:
package com.river.thread; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; public enum ContextThreadPool { /** * 該類的一個實例,經過枚舉類實現單例模式 */ INSTANCE; public ThreadPoolExecutor getThreadPool(){ return ThreadPoolHolder.pool; } private static class ThreadPoolHolder{ /** * 阻塞隊列的容量 */ private final static int CAPACITY = 500; private static ThreadPoolExecutor pool ; /** * 獲取處理器數目 */ private static int availableProcessors = Runtime.getRuntime().availableProcessors(); /** * 基於LinkedBlockingQueue的容量爲{@link CAPACITY} */ private static BlockingQueue queue = new LinkedBlockingQueue(CAPACITY); static { pool = new ThreadPoolExecutor( availableProcessors * 2, availableProcessors * 4 + 1, 0, TimeUnit.MILLISECONDS, queue, new ThreadFactory() { private AtomicInteger count = new AtomicInteger(0); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); String threadName = EnvirmentThreadPool.class.getSimpleName() + "-thread-" + count.addAndGet(1); thread.setName(threadName); return thread; } }, //自定義線程池FULL時的策略,新的任務阻塞在隊列外面; (r, executor) -> { try { queue.put(r); } catch (InterruptedException e) { e.printStackTrace(); } } ); } } }
工具調用:
ContextThreadPool.INSTANCE.getThreadPool();
如今咱們擁有了線程池,接下來就須要向線程池提交任務,目前有兩種方式:
<T> Future<T> submit(Callable<T> task)
前者定義在Executor中,用於任務無返回值的使用,後者定義在ExecutorService中,能夠拿到任務的結果Future;
咱們都知道,咱們建立線程有幾種方式,其中之一之二就是繼承Runnable接口和Callable接口,普通使用沒有什麼區別,可是在線程執行結果的獲取上就體現出來了;
@FunctionalInterface public interface Runnable { //void返回 public abstract void run(); }
@FunctionalInterface public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
能夠經過Future.上面的方法獲取線程返回值,有時候任務執行的時間比較長,在咱們獲取結果的時候尚未執行完畢,所以一般調用
boolean isDone();來判斷任務是否執行完畢;
package com.river.thread; import lombok.extern.slf4j.Slf4j; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.Future; import java.util.concurrent.ThreadPoolExecutor; @Slf4j public class FutureTest { public static void main(String[] args) throws ExecutionException, InterruptedException { ThreadPoolExecutor threadPool = ContextThreadPool.INSTANCE.getThreadPool(); //正常這裏我會用lamdba表達式去寫,爲了明瞭接口實現 Future<String> result = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { return "123"; } }); while (true){ if (result.isDone()){ System.out.println(result.get()); break; } log.info("not finish"); } } }
日誌輸出:
2018-09-17 17:47:37.352 myAppName [main] INFO com.river.thread.FutureTest - not finish 123
這裏看到第一次獲取結果是沒有獲取到的,第二次就獲取到了;
接下來咱們使用待超時的get()方法獲取結果:
public static void main(String[] args) throws ExecutionException, InterruptedException, TimeoutException { ThreadPoolExecutor threadPool = ContextThreadPool.INSTANCE.getThreadPool(); Future<String> result = threadPool.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return "123"; } }); log.info("get result"); log.info(result.get(3, TimeUnit.SECONDS)); }
2018-09-17 17:52:43.348 myAppName [main] INFO com.river.thread.FutureTest - get result 2018-09-17 17:52:45.349 myAppName [main] INFO com.river.thread.FutureTest - 123
能夠看到2s中以後獲取到了執行結果,若是線程執行時間超過獲取時間呢?
咱們將sleep參數改爲了5000,
2018-09-17 17:55:14.380 myAppName [main] INFO com.river.thread.FutureTest - get result Exception in thread "main" java.util.concurrent.TimeoutException at java.util.concurrent.FutureTask.get(FutureTask.java:205) at com.river.thread.FutureTest.main(FutureTest.java:21)
能夠看到拋出了異常;
可是當咱們不嘗試get結果的時候,異常是不會被拋出來的,也就是說,Future有持有異常的能力;咱們能夠經過在任務執行完畢後catch該異常,從而執行相應的處理辦法;
一般狀況下,咱們會向線程池提交一個任務集合,將result保存在集合中,最後在遍歷集合中的執行結果來獲得最終的結果;