首先,咱們爲何須要線程池?
讓咱們先來了解下什麼是 對象池 技術。某些對象(好比線程,數據庫鏈接等),它們建立的代價是很是大的 —— 相比於通常對象,它們建立消耗的時間和內存都很大(並且這些對象銷燬的代價比通常對象也大)。因此,若是咱們維護一個 池,每次使用完這些對象以後,並不銷燬它,而是將其放入池中,下次須要使用時就直接從池中取出,即可以免這些對象的重複建立;同時,咱們能夠固定 池的大小,好比設置池的大小爲 N —— 即池中只保留 N 個這類對象 —— 當池中的 N 個對象都在使用中的時候,爲超出數量的請求設置一種策略,好比 排隊等候 或者 直接拒絕請求 等,從而避免頻繁的建立此類對象。
線程池 即對象池的一種(池中的對象爲線程 Thread
),相似的還有 數據庫鏈接池(池中對象爲數據庫鏈接 Connection
)。合理利用線程池可以帶來三個好處(參考文末的 References[1]):html
本文只介紹 Java 中線程池的基本使用,不會過多的涉及到線程池的原理。若是有興趣的讀者須要深刻理解線程池的實現原理,能夠參考文末的 References。java
JDK 中線程池的基礎架構以下:
數據庫
執行器 Executor
是頂級接口,只包含了一個 execute
方法,用來執行一個 Runnable
任務:
segmentfault
執行器服務 ExecutorService
接口繼承了 Executor
接口,ExecutorService
是全部線程池的基礎接口,它定義了 JDK 中線程池應該實現的基本方法:
緩存
線程池執行器 ThreadPoolExecutor
是基礎線程池的核心實現,而且能夠經過定製 ThreadPoolExecutor
的構造參數或者繼承 ThreadPoolExecutor
,實現本身的線程池;多線程
ScheduledThreadPoolExecutor
繼承自 ThreadPoolExecutor
,是能執行週期性任務或定時任務的線程池;架構
ForkJoinPool
是 JDK1.7 時添加的類,做爲對 Fork/Join 型線程池的實現。ide
本文只介紹 ThreadPoolExecutor
線程池的使用,ScheduledThreadPoolExecutor
和 ForkJoinPool
會在以後的文章中介紹。工具
查看 ThreadPoolExecutor
的源碼可知,在 ThreadPoolExecutor
的內部,將每一個池中的線程包裝爲了一個 Worker
:性能
而後在 ThreadPoolExecutor
中定義了一個 HashSet<Worker>
,做爲 「池」:
設置一個合適的線程池(即自定義 ThreadPoolExecutor
)是比較麻煩的,所以 JDK 經過 Executors
這個工廠類爲咱們提供了一些預先定義好的線程池:
一、固定大小的線程池
建立一個包含 nThreads 個工做線程的線程池,這 nThreads 個線程共享一個無界隊列(即不限制大小的隊列);當新任務提交到線程池時,若是當前沒有空閒線程,那麼任務將放入隊列中進行等待,直到有空閒的線程來從隊列中取出該任務並運行。
(經過 Runtime.getRuntime().availableProcessors()
能夠得到當前機器可用的處理器個數,對於計算密集型的任務,固定大小的線程池的 nThreads 設置爲這個值時,通常能得到最大的 CPU 使用率)
二、單線程線程池
建立一個只包含一個工做線程的線程池,它的功能能夠簡單的理解爲 即 newFixedThreadPool
方法傳入參數爲 1 的狀況。可是與 newFixedThreadPool(1)
不一樣的是,若是線程池中這個惟一的線程意外終止,線程池會建立一個新線程繼續執行以後的任務。
三、可緩存線程的線程池
建立一個可緩存線程的線程池。當新任務提交到線程池時,若是當前線程池中有空閒線程可用,則使用空閒線程來運行任務,不然新建一個線程來運行該任務,並將該線程添加到線程池中;並且該線程池會終止並移除那些超過 60 秒未被使用的空閒線程。因此這個線程池表現得就像緩存,緩存的資源爲線程,緩存的超時時間爲 60 秒。根據 JDK 的文檔,當任務的運行時間都較短的時候,該線程池有利於提升性能。
咱們看到每一個構造線程池的工廠方法都有一個帶 ThreadFactory
的重載形式。ThreadFactory
即線程池用來新建線程的工廠,每次線程池須要新建一個線程時,調用的就是這個 ThreadFactory
的 newThread
方法:
(若是不提供自定義的 ThreadFactory
,那麼使用的就是 DefaultThreadFactory
—— Executors
內定義的內部類)
好比咱們要爲線程池中的每一個線程提供一個特定的名字,那麼咱們就能夠自定義 ThreadFactory
並重寫其 newThread
方法:
public class SimpleThreadFactory implements ThreadFactory { private AtomicInteger id = new AtomicInteger(1); @Override public Thread newThread(Runnable r) { Thread thread = new Thread(r); thread.setName("Test_Thread-" + id.getAndIncrement()); return thread; } }
經過 JDK 的源碼咱們能夠知道,以上三種線程池的實現都是基於 ThreadPoolExecutor
:
下面咱們來看一下線程池的基礎接口 ExecutorService
中每一個方法的含義。
首先是從 Executor
接口繼承到的 execute
方法:
使用該方法即將一個 Runnable
任務交給線程池去執行。
submit
方法:
submit
方法會提交一個任務去給線程池執行,該任務能夠是帶返回結果的 Callable<V>
任務,也能夠是一開始就指定結果的 Runnable
任務,或者不帶結果的 Runnable
任務(此時即一開始指定結果爲 null
)。submit
方法會返回一個與所提交任務相關聯的 Future<V>
。經過 上一篇文章 咱們能夠知道,Future<V>
的 get
方法能夠等待任務執行完畢並返回結果。因此經過 Future<V>
,咱們能夠與已經提交到線程池的任務進行交互。submit
提交任務及任務運行過程大體以下:
Runnable
或者 Callable<V>
任務;newTaskFor
方法構造出 FutureTask<V>
;(由 上一篇文章 可知,FutureTask<V>
實現了 Runnable
和 Future<V>
兩個接口,從而 FutureTask<V>
能夠做爲 Runnable
交給 Worker
(Thread
)去運行,也能夠做爲一個 Future<V>
與任務交互)
![newTaskFor 方法][19]
execute
方法將 FutureTask<V>
交給當前的 Worker
去運行,並將 FutureTask<V>
以 Future<V>
返回;
Worker
執行任務(即運行 run
方法),在任務完成後,爲 Future<V>
(FutureTask<V>
) 設置結果 —— 設置結果以前,調用 Future<V>
的 get
方法會讓調用線程處於阻塞狀態;Future<V>
的 get
方法,得到任務的結果。invokeAll
方法:
invokeAll
方法能夠一次執行多個任務,但它並不一樣等於屢次調用 submit
方法。submit
方法是非阻塞的,每次調用 submit
方法提交任務到線程池以後,會當即返回與任務相關聯的 Future<V>
,而後當前線程繼續向後執行;
而 invokeAll
方法是阻塞的,只有當提交的多個任務都執行完畢以後,invokeAll
方法纔會返回,執行結果會以List<Future<V>>
返回,該 List<Future<V>>
中的每一個 Future<V>
是和提交任務時的 Collection<Callable<V>>
中的任務 Callable<V>
一 一對應的。帶 timeout 參數的 invokeAll
就是設置一個超時時間,若是超過這個時間 invokeAll
中提交的全部任務還有沒所有執行完,那麼沒有執行完的任務會被取消(中斷),以後一樣以一個 List<Future<V>>
返回執行的結果。
invokeAny
方法:
invokeAny
方法也是阻塞的,與 invokeAll
方法的不一樣之處在於,當所提交的一組任務中的任何一個任務完成以後,invokeAny
方法便會返回(返回的結果即是那個已經完成的任務的返回值),而其餘任務會被取消(中斷)。
舉一個 invokeAny
使用的例子:電腦有 C、D、E、F 四個盤,咱們須要找一個文件,可是咱們不知道這個文件位於哪一個盤中,咱們即可以使用 invokeAny
,並提交四個任務(對應於四個線程)分別查找 C、D、E、F 四個盤,若是哪一個線程找到了這個文件,那麼此時 invokeAny
便中止阻塞並返回結果,同時取消其餘任務。
shutdown
方法:
shutdown
方法的做用是向線程池發送關閉的指令。一旦在線程池上調用 shutdown
方法以後,線程池便不能再接受新的任務;若是此時還向線程池提交任務,那麼將會拋出 RejectedExecutionException
異常。以後線程池不會馬上關閉,直到以前已經提交到線程池中的全部任務(包括正在運行的任務和在隊列中等待的任務)都已經處理完成,纔會關閉。
shutdownNow
方法:
與 shutdown
不一樣,shutdownNow
會當即關閉線程池 —— 當前在線程池中運行的任務會所有被取消,而後返回線程池中全部正在等待的任務。
(值得注意的是,咱們 必須顯式的關閉線程池,不然線程池不會本身關閉)
awaitTermination
方法:
awaitTermination
能夠用來判斷線程池是否已經關閉。調用 awaitTermination
以後,在 timeout 時間內,若是線程池沒有關閉,則阻塞當前線程,不然返回 true
;當超過 timeout 的時間後,若線程池已經關閉則返回 true
,不然返回 false
。該方法通常這樣使用:
shutdown
方法向線程池發送關閉的指令;awaitTermination
來檢測到線程池是否已經關閉,能夠得知線程池中全部的任務是否已經執行完畢;awaitTermination
方法的線程中止阻塞,並返回 true
;isShutdown()
方法,若是線程池已經調用 shutdown
或者 shutdownNow
,則返回 true
,不然返回 false
;
isTerminated()
方法,若是線程池已經調用 shutdown
而且線程池中全部的任務已經執行完畢,或者線程池調用了 shutdownNow
,則返回 true
,不然返回 false
。
經過以上介紹,咱們已經瞭解了 ExecutorService
中全部方法的功能,如今讓咱們來實踐 ExecutorService
的功能。
咱們繼續使用 上一篇文章 的兩個例子中的任務,首先是任務類型爲 Runnable
的狀況:
import java.util.*; import java.util.concurrent.*; public class RunnableTest { public static void main(String[] args) throws Exception { System.out.println("使用線程池運行 Runnable 任務:"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定爲 5 的線程池 List<AccumRunnable> tasks = new ArrayList<>(10); for (int i = 0; i < 10; i++) { AccumRunnable task = new AccumRunnable(i * 10 + 1, (i + 1) * 10); tasks.add(task); threadPool.execute(task); // 讓線程池執行任務 task } threadPool.shutdown(); // 向線程池發送關閉的指令,等到已經提交的任務都執行完畢以後,線程池會關閉 threadPool.awaitTermination(1, TimeUnit.HOURS); // 等待線程池關閉,等待的最大時間爲 1 小時 int total = 0; for (AccumRunnable task : tasks) { total += task.getResult(); // 調用在 AccumRunnable 定義的 getResult 方法得到返回的結果 } System.out.println("Total: " + total); } static final class AccumRunnable implements Runnable { private final int begin; private final int end; private int result; public AccumRunnable(int begin, int end) { this.begin = begin; this.end = end; } @Override public void run() { result = 0; try { for (int i = begin; i <= end; i++) { result += i; Thread.sleep(100); } } catch (InterruptedException ex) { ex.printStackTrace(System.err); } System.out.printf("(%s) - 運行結束,結果爲 %d\n", Thread.currentThread().getName(), result); } public int getResult() { return result; } } }
運行結果:
能夠看到 NetBeans 給出的運行時間爲 2 秒 —— 由於每一個任務須要 1 秒的時間,而線程池中的線程個數固定爲 5 個,因此線程池最多同時處理 5 個任務,於是 10 個任務總共須要 2 秒的運行時間。
任務類型爲 Callable
:
import java.util.*; import java.util.concurrent.*; public class CallableTest { public static void main(String[] args) throws Exception { System.out.println("使用線程池運行 Callable 任務:"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定爲 5 的線程池 List<Future<Integer>> futures = new ArrayList<>(10); for (int i = 0; i < 10; i++) { AccumCallable task = new AccumCallable(i * 10 + 1, (i + 1) * 10); Future<Integer> future = threadPool.submit(task); // 提交任務 futures.add(future); } threadPool.shutdown(); // 向線程池發送關閉的指令,等到已經提交的任務都執行完畢以後,線程池會關閉 int total = 0; for (Future<Integer> future : futures) { total += future.get(); // 阻塞,直到任務結束,返回任務的結果 } System.out.println("Total: " + total); } static final class AccumCallable implements Callable<Integer> { private final int begin; private final int end; public AccumCallable(int begin, int end) { this.begin = begin; this.end = end; } @Override public Integer call() throws Exception { int result = 0; for (int i = begin; i <= end; i++) { result += i; Thread.sleep(100); } System.out.printf("(%s) - 運行結束,結果爲 %d\n", Thread.currentThread().getName(), result); return result; } } }
運行結果:
改寫上面的代碼,使用 invokeAll
來直接執行一組任務:
public static void main(String[] args) throws Exception { System.out.println("使用線程池 invokeAll 運行一組 Callable 任務:"); ExecutorService threadPool = Executors.newFixedThreadPool(5); // 建立大小固定爲 5 的線程池 List<AccumCallable> tasks = new ArrayList<>(10); // tasks 爲一組任務 for (int i = 0; i < 10; i++) { tasks.add(new AccumCallable(i * 10 + 1, (i + 1) * 10)); } List<Future<Integer>> futures = threadPool.invokeAll(tasks); // 阻塞,直到全部任務都運行完畢 int total = 0; for (Future<Integer> future : futures) { total += future.get(); // 返回任務的結果 } System.out.println("Total: " + total); threadPool.shutdown(); // 向線程池發送關閉的指令 }
運行結果:
線程池是很強大並且很方便的工具,它提供了對線程進行統一的分配和調控的各類功能。自 JDK1.5 時 JDK 添加了線程池的功能以後,通常狀況下更推薦使用線程池來編寫多線程程序,而不是直接使用 Thread
。
(invokeAny
也是很實用的方法,請有興趣的讀者本身實踐)
References: