構造一個線程池爲何須要幾個參數?若是避免線程池出現OOM? Runnable
和 Callable
的區別是什麼?本文將對這些問題一一解答,同時還將給出使用線程池的常見場景和代碼片斷。html
Java中建立線程池很簡單,只須要調用 Executors
中相應的便捷方法便可,好比 Executors.newFixedThreadPool(int nThreads)
,可是便捷不只隱藏了複雜性,也爲咱們埋下了潛在的隱患(OOM,線程耗盡)。java
Executors
建立線程池便捷方法列表:小程序
方法名 | 功能 |
---|---|
newFixedThreadPool(int nThreads) | 建立固定大小的線程池 |
newSingleThreadExecutor() | 建立只有一個線程的線程池 |
newCachedThreadPool() | 建立一個不限線程數上限的線程池,任何提交的任務都將當即執行 |
小程序使用這些快捷方法沒什麼問題,對於服務端須要長期運行的程序,建立線程池應該直接使用 ThreadPoolExecutor
的構造方法。沒錯,上述 Executors
方法建立的線程池就是 ThreadPoolExecutor
。api
Executors
中建立線程池的快捷方法,其實是調用了 ThreadPoolExecutor
的構造方法(定時任務使用的是 ScheduledThreadPoolExecutor
),該類構造方法參數列表以下:服務器
// Java線程池的完整構造函數public ThreadPoolExecutor( int corePoolSize, // 線程池長期維持的線程數,即便線程處於Idle狀態,也不會回收。 int maximumPoolSize, // 線程數的上限 long keepAliveTime, TimeUnit unit, // 超過corePoolSize的線程的idle時長, // 超過這個時間,多餘的線程會被回收。 BlockingQueue<Runnable> workQueue, // 任務的排隊隊列 ThreadFactory threadFactory, // 新線程的產生方式 RejectedExecutionHandler handler) // 拒絕策略
居然有7個參數,很無奈,構造一個線程池確實須要這麼多參數。這些參數中,比較容易引發問題的有 corePoolSize
, maximumPoolSize
, workQueue
以及 handler
:oracle
corePoolSize
和 maximumPoolSize
設置不當會影響效率,甚至耗盡線程;less
workQueue
設置不當容易致使OOM;ide
handler
設置不當會致使提交任務時拋出異常。函數
正確的參數設置方式會在下文給出。ui
If fewer than corePoolSize threads are running, the Executor always prefers adding a new thread rather than queuing. If corePoolSize or more threads are running, the Executor always prefers queuing a request rather than adding a new thread. If a request cannot be queued, a new thread is created unless this would exceed maximumPoolSize, in which case, the task will be rejected.
corePoolSize -> 任務隊列 -> maximumPoolSize -> 拒絕策略
能夠向線程池提交的任務有兩種: Runnable
和 Callable
,兩者的區別以下:
方法簽名不一樣, void Runnable.run()
, V Callable.call() throws Exception
是否容許有返回值, Callable
容許有返回值
是否容許拋出異常, Callable
容許拋出異常。
Callable
是JDK1.5時加入的接口,做爲 Runnable
的一種補充,容許有返回值,容許拋出異常。
提交方式 | 是否關心返回結果 |
---|---|
Future<T> submit(Callable<T> task) |
是 |
void execute(Runnable command) |
否 |
Future<?> submit(Runnable task) |
否,雖然返回Future,可是其get()方法老是返回null |
不要使用 Executors.newXXXThreadPool()
快捷方法建立線程池,由於這種方式會使用×××的任務隊列,爲避免OOM,咱們應該使用 ThreadPoolExecutor
的構造方法手動指定隊列的最大長度:
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), // 使用有界隊列,避免OOM new ThreadPoolExecutor.DiscardPolicy());
任務隊列總有佔滿的時候,這是再 submit()
提交新的任務會怎麼樣呢? RejectedExecutionHandler
接口爲咱們提供了控制方式,接口定義以下:
public interface RejectedExecutionHandler { void rejectedExecution(Runnable r, ThreadPoolExecutor executor); }
線程池給咱們提供了幾種常見的拒絕策略:
拒絕策略 | 拒絕行爲 |
---|---|
AbortPolicy | 拋出RejectedExecutionException |
DiscardPolicy | 什麼也不作,直接忽略 |
DiscardOldestPolicy | 丟棄執行隊列中最老的任務,嘗試爲當前提交的任務騰出位置 |
CallerRunsPolicy | 直接由提交任務者執行這個任務 |
線程池默認的拒絕行爲是 AbortPolicy
,也就是拋出 RejectedExecutionHandler
異常,該異常是非受檢異常,很容易忘記捕獲。若是不關心任務被拒絕的事件,能夠將拒絕策略設置成 DiscardPolicy
,這樣多餘的任務會悄悄的被忽略。
ExecutorService executorService = new ThreadPoolExecutor(2, 2, 0, TimeUnit.SECONDS, new ArrayBlockingQueue<>(512), new ThreadPoolExecutor.DiscardPolicy());// 指定拒絕策略
線程池的處理結果、以及處理過程當中的異常都被包裝到 Future
中,並在調用 Future.get()
方法時獲取,執行過程當中的異常會被包裝成 ExecutionException
, submit()
方法自己不會傳遞結果和任務執行過程當中的異常。獲取執行結果的代碼能夠這樣寫:
ExecutorService executorService = Executors.newFixedThreadPool(4); Future<Object> future = executorService.submit(new Callable<Object>() { @Override public Object call() throws Exception { throw new RuntimeException("exception in call~");// 該異常會在調用Future.get()時傳遞給調用者 } }); try { Object result = future.get(); } catch (InterruptedException e) { // interrupt} catch (ExecutionException e) { // exception in Callable.call() e.printStackTrace(); }
上述代碼輸出相似以下:
int poolSize = Runtime.getRuntime().availableProcessors() * 2; BlockingQueue<Runnable> queue = new ArrayBlockingQueue<>(512); RejectedExecutionHandler policy = new ThreadPoolExecutor.DiscardPolicy(); executorService = new ThreadPoolExecutor(poolSize, poolSize, 0, TimeUnit.SECONDS, queue, policy);
過 submit()
向線程池提交任務後會返回一個 Future
,調用 V Future.get()
方法可以阻塞等待執行結果, V get(long timeout, TimeUnit unit)
方法能夠指定等待的超時時間。
若是向線程池提交了多個任務,要獲取這些任務的執行結果,能夠依次調用 Future.get()
得到。但對於這種場景,咱們更應該使用 ExecutorCompletionService ,該類的 take()
方法老是阻塞等待某一個任務完成,而後返回該任務的 Future
對象。向 CompletionService
批量提交任務後,只需調用相同次數的 CompletionService.take()
方法,就能獲取全部任務的執行結果,獲取順序是任意的,取決於任務的完成順序:
void solve(Executor executor, Collection<Callable<Result>> solvers) throws InterruptedException, ExecutionException { CompletionService<Result> ecs = new ExecutorCompletionService<Result>(executor);// 構造器 for (Callable<Result> s : solvers)// 提交全部任務 ecs.submit(s); int n = solvers.size(); for (int i = 0; i < n; ++i) {// 獲取每個完成的任務 Result r = ecs.take().get(); if (r != null) use(r); } }
V Future.get(long timeout, TimeUnit unit)
方法能夠指定等待的超時時間,超時未完成會拋出 TimeoutException
。
等待多個任務完成,並設置最大等待時間,能夠經過 CountDownLatch 完成:
public void testLatch(ExecutorService executorService, List<Runnable> tasks) throws InterruptedException{ CountDownLatch latch = new CountDownLatch(tasks.size()); for(Runnable r : tasks){ executorService.submit(new Runnable() { @Override public void run() { try{ r.run(); }finally { latch.countDown();// countDown } } }); } latch.await(10, TimeUnit.SECONDS); // 指定超時時間 }
以運營一家裝修公司作個比喻。公司在辦公地點等待客戶來提交裝修請求;公司有固定數量的正式工以維持運轉;旺季業務較多時,新來的客戶請求會被排期,好比接單後告訴用戶一個月後才能開始裝修;當排期太多時,爲避免用戶等過久,公司會經過某些渠道(好比人才市場、熟人介紹等)僱傭一些臨時工(注意,招聘臨時工是在排期排滿以後);若是臨時工也忙不過來,公司將決定再也不接收新的客戶,直接拒單。
線程池就是程序中的「裝修公司」,代勞各類髒活累活。上面的過程對應到線程池上:
// Java線程池的完整構造函數public ThreadPoolExecutor( int corePoolSize, // 正式工數量 int maximumPoolSize, // 工人數量上限,包括正式工和臨時工 long keepAliveTime, TimeUnit unit, // 臨時工不務正業的最長時間,超過這個時間將被解僱 BlockingQueue<Runnable> workQueue, // 排期隊列 ThreadFactory threadFactory, // 招人渠道 RejectedExecutionHandler handler) // 拒單方式
Executors
爲咱們提供了構造線程池的便捷方法,對於服務器程序咱們應該杜絕使用這些便捷方法,而是直接使用線程池 ThreadPoolExecutor
的構造方法,避免×××隊列可能致使的OOM以及線程個數限制不當致使的線程數耗盡等問題。 ExecutorCompletionService
提供了等待全部任務執行結束的有效方式,若是要設置等待的超時時間,則能夠經過 CountDownLatch
完成。