無限制建立線程的不足:html
1) 線程生命週期開銷高;java
2) 資源消耗大,尤爲是內存。若是可運行的線程數量多於可用處理器的數量,那麼有些線程將閒置。大量空閒的線程佔用許多內存,給垃圾回收器帶來壓力(頻繁 stop the world)。因此,若是已經擁有足夠多的線程使全部CPU保持忙碌狀態,那麼建立再多的線程反而會下降性能。web
3) 穩定性。可建立線程的數量存在必定限制。每一個都會維護兩個執行棧,一個用於java代碼,另外一個用於原生代碼。一般JVM在默認狀況下生成一個複合棧,大約爲0.5MB。若是無限制地建立線程,破壞了系統對線程的限制,就極可能拋出OutOfMemoryError異常,使得系統處於不穩定狀態。編程
public class ExecutorTest { private static final int NUMBERS = 100; private static final Executor EXECUTOR = Executors.newFixedThreadPool(NUMBERS); public void processRequst() throws IOException { ServerSocket serverSocket = new ServerSocket(80); while (true) { final Socket conn = serverSocket.accept(); Runnable task = new Runnable() { @Override public void run() { System.out.println("-----------process request----------"); } }; EXECUTOR.execute(task); } } }
線程池經過調用Executors的靜態工廠方法的四種建立方式:緩存
1) newFixedThreadPool。建立一個固定長度的線程池,每當提交一個任務時就建立一個線程,直到達到線程池的最大數量,若是某個線程發生異常而結束那麼線程池會補充一個線程。服務器
/** * Creates a thread pool that reuses a fixed number of threads * operating off a shared unbounded queue. At any point, at most * {@code nThreads} threads will be active processing tasks. * If additional tasks are submitted when all threads are active, * they will wait in the queue until a thread is available. * If any thread terminates due to a failure during execution * prior to shutdown, a new one will take its place if needed to * execute subsequent tasks. The threads in the pool will exist * until it is explicitly {@link ExecutorService#shutdown shutdown}. * * @param nThreads the number of threads in the pool * @return the newly created thread pool * @throws IllegalArgumentException if {@code nThreads <= 0} */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); }
2) newCachedThreadPool。建立一個可緩存的線程池。若是線程池的當前規模超過了處理需求時,將回收空閒的線程,而當需求增長時,則會添加新的線程,線程池的規模不存在任何限制。網絡
/** * Creates a thread pool that creates new threads as needed, but * will reuse previously constructed threads when they are * available. These pools will typically improve the performance * of programs that execute many short-lived asynchronous tasks. * Calls to {@code execute} will reuse previously constructed * threads if available. If no existing thread is available, a new * thread will be created and added to the pool. Threads that have * not been used for sixty seconds are terminated and removed from * the cache. Thus, a pool that remains idle for long enough will * not consume any resources. Note that pools with similar * properties but different details (for example, timeout parameters) * may be created using {@link ThreadPoolExecutor} constructors. * * @return the newly created thread pool */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }
3) newSingleThreadExecutor。這是一個單線程的executor,它建立單個工做者線程來執行任務,若是這個線程異常結束,會建立另外一個線程替代。該方式能確保依照任務在隊列中的順序來串行執行(FIFO,LIFO,優先級)。多線程
/** * Creates an Executor that uses a single worker thread operating * off an unbounded queue. (Note however that if this single * thread terminates due to a failure during execution prior to * shutdown, a new one will take its place if needed to execute * subsequent tasks.) Tasks are guaranteed to execute * sequentially, and no more than one task will be active at any * given time. Unlike the otherwise equivalent * {@code newFixedThreadPool(1)} the returned executor is * guaranteed not to be reconfigurable to use additional threads. * * @return the newly created single-threaded Executor */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }
4) newScheduledThreadPool。建立一個固定長度的線程池,而且以延遲或定時的方式執行任務併發
/** * Creates a thread pool that can schedule commands to run after a * given delay, or to execute periodically. * @param corePoolSize the number of threads to keep in the pool, * even if they are idle * @return a newly created scheduled thread pool * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } /** * Creates a new {@code ScheduledThreadPoolExecutor} with the * given core pool size. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @throws IllegalArgumentException if {@code corePoolSize < 0} */ public ScheduledThreadPoolExecutor(int corePoolSize) { super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS, new DelayedWorkQueue()); }
建立線程池的方法最終都是建立了一個ThreadPoolExecutors實例,該類的構造方法以下app
/** * Creates a new {@code ThreadPoolExecutor} with the given initial * parameters. * * @param corePoolSize the number of threads to keep in the pool, even * if they are idle, unless {@code allowCoreThreadTimeOut} is set * @param maximumPoolSize the maximum number of threads to allow in the * pool * @param keepAliveTime when the number of threads is greater than * the core, this is the maximum time that excess idle threads * will wait for new tasks before terminating. * @param unit the time unit for the {@code keepAliveTime} argument * @param workQueue the queue to use for holding tasks before they are * executed. This queue will hold only the {@code Runnable} * tasks submitted by the {@code execute} method. * @param threadFactory the factory to use when the executor * creates a new thread * @param handler the handler to use when execution is blocked * because the thread bounds and queue capacities are reached * @throws IllegalArgumentException if one of the following holds:<br> * {@code corePoolSize < 0}<br> * {@code keepAliveTime < 0}<br> * {@code maximumPoolSize <= 0}<br> * {@code maximumPoolSize < corePoolSize} * @throws NullPointerException if {@code workQueue} * or {@code threadFactory} or {@code handler} is null */ public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
參數說明(摘抄自JDK1.6參考文檔)
1) 核心和最大池大小(corePoolSize和maximumPoolSize)
ThreadPoolExecutor 將根據 corePoolSiz和 maximumPoolSize設置的邊界自動調整池大小。當新任務在方法 execute(java.lang.Runnable) 中提交時,若是運行的線程少於 corePoolSize,則建立新線程來處理請求,即便其餘輔助線程是空閒的。若是運行的線程多於 corePoolSize 而少於 maximumPoolSize,則僅當隊列滿時才建立新線程。若是設置的 corePoolSize 和 maximumPoolSize 相同,則建立了固定大小的線程池。若是將 maximumPoolSize 設置爲基本的無界值(如 Integer.MAX_VALUE),則容許池適應任意數量的併發任務。在大多數狀況下,核心和最大池大小僅基於構造來設置,不過也可使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 進行動態更改。
2) 保持活動時間(keepAliveTime)
若是池中當前有多於 corePoolSize 的線程,則這些多出的線程在空閒時間超過 keepAliveTime 時將會終止。這提供了當池處於非活動狀態時減小資源消耗的方法。若是池後來變爲活動,則能夠建立新的線程。也可使用方法 setKeepAliveTime(long, java.util.concurrent.TimeUnit) 動態地更改此參數。默認狀況下,保持活動策略只在有多於 corePoolSizeThreads 的線程時應用。可是隻要 keepAliveTime 值非 0,allowCoreThreadTimeOut(boolean) 方法也可將此超時策略應用於核心線程。
TimeUnit爲超時時間單位。
3)阻塞隊列(BlockingQueue)
全部 BlockingQueue 均可用於傳輸和保持提交的任務。可使用此隊列與池大小進行交互:
排隊有三種通用策略:
4) 建立新線程(ThreadFactory)
使用 ThreadFactory 建立新線程。若是沒有另外說明,則在同一個 ThreadGroup 中一概使用 Executors.defaultThreadFactory() 建立線程,而且這些線程具備相同的 NORM_PRIORITY 優先級和非守護進程狀態。經過提供不一樣的 ThreadFactory,能夠改變線程的名稱、線程組、優先級、守護進程狀態,等等。若是從 newThread 返回 null 時 ThreadFactory 未能建立線程,則執行程序將繼續運行,但不能執行任何任務。
5) 被拒絕的任務(RejectedExecutionHandler)
當 Executor 已經關閉,而且 Executor 將有限邊界用於最大線程和工做隊列容量,且已經飽和時,在方法 execute(java.lang.Runnable) 中提交的新任務將被拒絕。在以上兩種狀況下,execute 方法都將調用其 RejectedExecutionHandler 的 RejectedExecutionHandler.rejectedExecution(java.lang.Runnable, java.util.concurrent.ThreadPoolExecutor) 方法。下面提供了四種預約義的處理程序策略:
定義和使用其餘種類的 RejectedExecutionHandler 類也是可能的,但這樣作須要很是當心,尤爲是當策略僅用於特定容量或排隊策略時。
咱們知道,JVM只有在全部線程所有終止後纔會退出。全部,若是咱們沒法正確地關閉Executor,JVM將沒法結束。爲了解決執行服務的生命週期問題,ExecutorService繼承Executor接口,添加了一些用於生命週期管理的方法。ExecutorService的生命週期有三種狀態:運行、關閉和已終止。
public interface ExecutorService extends Executor { /** * Initiates an orderly shutdown in which previously submitted * tasks are executed, but no new tasks will be accepted. * Invocation has no additional effect if already shut down. * 執行平緩的關閉過程,再也不接受新的任務,同時等待已經提交的任務執行完成——包括哪些還未開始執行的任務 */ void shutdown(); /** * Attempts to stop all actively executing tasks, halts the * processing of waiting tasks, and returns a list of the tasks * that were awaiting execution. * 嘗試取消全部運行中的任務,而且再也不啓動隊列中還沒有開始執行的任務 */ List<Runnable> shutdownNow(); /** * Returns {@code true} if this executor has been shut down. * 查詢 ExecutorService 是否已經關閉 */ boolean isShutdown(); /** * Returns {@code true} if all tasks have completed following shut down. * Note that {@code isTerminated} is never {@code true} unless * either {@code shutdown} or {@code shutdownNow} was called first. * * @return {@code true} if all tasks have completed following shut down * * 查詢 ExecutorService 是否已經終止 */ boolean isTerminated(); /** * Blocks until all tasks have completed execution after a shutdown * request, or the timeout occurs, or the current thread is * interrupted, whichever happens first. * * 等待ExecutorService進入終止狀態 */ boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; /*其餘用於提交任務的方法……*/ }
採用future和callable攜帶任務結果,長時間執行的任務能夠先進行計算,在以後經過future取得計算結果。
示例程序
/* * 網頁數據加載並行的可能性,假設網頁只包含文本和圖像兩種數據 */ public class CompletionTest { class ImageData{ // 屬性.... } // 從序列化的數據中加載圖像 public ImageData loadFrom(String source) { return new ImageData(); } /** * 單線程模式 * @param sequence 序列化後的網頁數據 */ public void loadWithSingleThread(CharSequence source) { System.out.print("加載文本"); List<ImageData> list = new ArrayList<>(); // 從 source 解析圖像數據並加入到 list 中 ..... // loadImage(source) for (ImageData imageData : list) { System.out.println("圖像加載完成" + imageData); } } private final ExecutorService executorService = Executors.newFixedThreadPool(10); /* * 單線程加載CPU利用率低,若是程序依賴於長時間的 io(當前從網絡加載圖像就是)那麼將很費時間 * 結合 futureTask 預加載圖像 */ public void loadInFuture(CharSequence source) { Callable<List<ImageData>> task = new Callable<List<ImageData>>() { @Override public List<ImageData> call() throws Exception { List<ImageData> result = new ArrayList<>(); /* * 加載圖像數據到 result * loadImageFrom source to list..... */ return result; } }; Future<List<ImageData>> future = executorService.submit(task); /* * loadText from source..... */ System.out.println("loading text"); try { List<ImageData> imageDatas = future.get(); for (ImageData imageData : imageDatas) { System.out.println("圖像數據" + imageData); } } catch (InterruptedException e) { // 拋出中斷異常,從新設置線程的中斷狀態 Thread.currentThread().interrupt(); // 中斷了,結果已不須要 future.cancel(true); } catch (ExecutionException e) { e.printStackTrace(); } } /* * 採用 future 來預加載圖像在必定程度上提供了併發性,但在本問題中效率仍比較低,由於咱們採用的是一次性加載完圖像 * 再返回,而相對於加載文原本說,圖像加載速度要低不少,在本問題中幾乎能夠說效率與串行差異不大,那怎麼改進? * 爲每一圖片設置一個相應的 future計算任務,而後循環操做,每計算完就直接加載,那樣用戶看到的頁面是一張張加載 * 出來的,這可行,但比較繁瑣,咱們能夠直接使用CompletionService。 * 結合 completionService 加載圖像 */ public void loadWithCompeletionSevice(CharSequence source) { List<String> imageInfo = new ArrayList<>(); // imageInfo = load from source....... CompletionService<ImageData> service = new ExecutorCompletionService<>(executorService); for (String string : imageInfo) { service.submit(new Callable<CompletionTest.ImageData>() { @Override public ImageData call() throws Exception { ImageData imageData = loadFrom(string); // imageDate = loadFromSource(); return imageData; } }); } // loadText(source) System.out.println("loading text"); try { for (int i = 0; i < imageInfo.size(); i++) { // 在得出結果以前阻塞 Future<ImageData> future = service.take(); ImageData imageData = future.get(); // loading image ... System.out.println("loading image" + imageData); } } catch (InterruptedException e) { Thread.currentThread().interrupt(); } catch (ExecutionException e) { e.printStackTrace(); } } }
有時候,咱們可能沒法在指定的時間內完成某個任務,那麼咱們將不須要它的結果,此時咱們能夠放棄這個任務。例如,一個web應用程序須要從外部的廣告服務器獲取廣告,當若是該應用程序在2秒內得不到響應,那麼將顯示一個默認的廣告,這樣即便沒法獲取廣告信息,也不會下降站點的性能。
程序示例
public class GetAd { class Ad{ /* * 屬性.... */ } private final ExecutorService execute = Executors.newFixedThreadPool(100); private final Ad DEFAULT_AD = new Ad(); private final long TIME_LOAD = 2000; public void loadAd() { long endTime = System.currentTimeMillis() + TIME_LOAD; Future<Ad> future = execute.submit(new FetchAdTask()); // loading page text..... Ad ad; try { // leftTime 有可能爲負數,但 future.get 方法會把負數視爲0 long leftTime = endTime - System.currentTimeMillis(); ad = future.get(leftTime, TimeUnit.MILLISECONDS); } catch (InterruptedException e) { Thread.currentThread().interrupt(); future.cancel(true); ad = DEFAULT_AD; } catch (TimeoutException e) { future.cancel(true); ad = DEFAULT_AD; } catch (ExecutionException e) { ad = DEFAULT_AD; } System.out.println("load complete" + ad); } class FetchAdTask implements Callable<Ad>{ @Override public Ad call() throws Exception { // load ad from ad server return new Ad(); } } }
總結自《java併發編程實戰》