線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。java
java.uitl.concurrent.ThreadPoolExecutor
類是 Executor 框架中最核心的一個類。git
ThreadPoolExecutor 有四個構造方法,前三個都是基於第四個實現。第四個構造方法定義以下:github
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
corePoolSize
:線程池的基本線程數。這個參數跟後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了 prestartAllCoreThreads()或者 prestartCoreThread()方法,從這 2 個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立 corePoolSize 個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲 0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到 corePoolSize 後,就會把到達的任務放到緩存隊列當中。maximumPoolSize
:線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是若是使用了無界的任務隊列這個參數就沒什麼效果。keepAliveTime
:線程活動保持時間。線程池的工做線程空閒後,保持存活的時間。因此若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。workQueue
:任務隊列。用於保存等待執行的任務的阻塞隊列。 能夠選擇如下幾個阻塞隊列。
threadFactory
:建立線程的工廠。能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。handler
:飽和策略。當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是 AbortPolicy,表示沒法處理新任務時拋出異常。如下是 JDK1.5 提供的四種策略。
在 ThreadPoolExecutor 類中有幾個很是重要的方法:數組
execute()
方法其實是 Executor 中聲明的方法,在 ThreadPoolExecutor 進行了具體的實現,這個方法是 ThreadPoolExecutor 的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。submit()
方法是在 ExecutorService 中聲明的方法,在 AbstractExecutorService 就已經有了具體的實現,在 ThreadPoolExecutor 中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和 execute()方法不一樣,它可以返回任務執行的結果,去看 submit()方法的實現,會發現它實際上仍是調用的 execute()方法,只不過它利用了 Future 來獲取任務執行結果(Future 相關內容將在下一篇講述)。shutdown()
和 shutdownNow()
是用來關閉線程池的。咱們可使用 execute
提交任務,可是 execute
方法沒有返回值,因此沒法判斷任務是否被線程池執行成功。緩存
經過如下代碼可知 execute
方法輸入的任務是一個 Runnable 實例。多線程
threadsPool.execute(new Runnable() { @Override public void run() { // TODO Auto-generated method stub } });
咱們也可使用 submit
方法來提交任務,它會返回一個 Future
,那麼咱們能夠經過這個 Future
來判斷任務是否執行成功。併發
經過 Future
的 get
方法來獲取返回值,get
方法會阻塞住直到任務完成。而使用 get(long timeout, TimeUnit unit)
方法則會阻塞一段時間後當即返回,這時有可能任務沒有執行完。框架
Future<Object> future = executor.submit(harReturnValuetask); try { Object s = future.get(); } catch (InterruptedException e) { // 處理中斷異常 } catch (ExecutionException e) { // 處理沒法執行任務異常 } finally { // 關閉線程池 executor.shutdown(); }
咱們能夠經過調用線程池的 shutdown
或 shutdownNow
方法來關閉線程池,它們的原理是遍歷線程池中的工做線程,而後逐個調用線程的 interrupt 方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。可是它們存在必定的區別,shutdownNow 首先將線程池的狀態設置成 STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表,而 shutdown 只是將線程池的狀態設置成 SHUTDOWN 狀態,而後中斷全部沒有正在執行任務的線程。異步
只要調用了這兩個關閉方法的其中一個,isShutdown 方法就會返回 true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用 isTerminaed 方法會返回 true。至於咱們應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用 shutdown 來關閉線程池,若是任務不必定要執行完,則能夠調用 shutdownNow。分佈式
JDK 中提供了幾種具備表明性的線程池,這些線程池是基於 ThreadPoolExecutor
的定製化實現。
在實際使用線程池的場景中,咱們每每不是直接使用 ThreadPoolExecutor
,而是使用 JDK 中提供的具備表明性的線程池實例。
建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。
這種類型的線程池特色是:
示例:
public class CachedThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 10; i++) { final int index = i; try { Thread.sleep(index * 1000); } catch (InterruptedException e) { e.printStackTrace(); } executorService.execute(() -> System.out.println(Thread.currentThread().getName() + " 執行,i = " + index)); } } }
建立一個指定工做線程數量的線程池。每當提交一個任務就建立一個工做線程,若是工做線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。
FixedThreadPool 是一個典型且優秀的線程池,它具備線程池提升程序效率和節省建立線程時所耗的開銷的優勢。可是,在線程池空閒時,即線程池中沒有可運行任務時,它不會釋放工做線程,還會佔用必定的系統資源。
示例:
public class FixedThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 10; i++) { final int index = i; executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName() + " 執行,i = " + index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
建立一個單線程化的 Executor,即只建立惟一的工做者線程來執行任務,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。若是這個線程異常結束,會有另外一個取代它,保證順序執行。單工做線程最大的特色是可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的。
示例:
public class SingleThreadExecutorDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 10; i++) { final int index = i; executorService.execute(() -> { try { System.out.println(Thread.currentThread().getName() + " 執行,i = " + index); Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } }); } } }
建立一個線程池,能夠安排任務在給定延遲後運行,或按期執行。
public class ScheduledThreadPoolDemo { private static void delay() { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.schedule(() -> System.out.println(Thread.currentThread().getName() + " 延遲 3 秒"), 3, TimeUnit.SECONDS); } private static void cycle() { ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5); scheduledThreadPool.scheduleAtFixedRate( () -> System.out.println(Thread.currentThread().getName() + " 延遲 1 秒,每 3 秒執行一次"), 1, 3, TimeUnit.SECONDS); } public static void main(String[] args) { delay(); cycle(); } }
線程池的具體實現原理,大體從如下幾個方面講解:
// runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; // Packing and unpacking ctl private static int runStateOf(int c) { return c & ~CAPACITY; }
runState 表示當前線程池的狀態,它是一個 volatile 變量用來保證線程之間的可見性;
下面的幾個 static final 變量表示 runState 可能的幾個取值。
當建立線程池後,初始時,線程池處於 RUNNING 狀態;
RUNNING -> SHUTDOWN
若是調用了 shutdown()方法,則線程池處於 SHUTDOWN 狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢。
(RUNNING or SHUTDOWN) -> STOP
若是調用了 shutdownNow()方法,則線程池處於 STOP 狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務。
SHUTDOWN -> TIDYING
當線程池和隊列都爲空時,則線程池處於 TIDYING 狀態。
STOP -> TIDYING
當線程池爲空時,則線程池處於 TIDYING 狀態。
TIDYING -> TERMINATED
當 terminated() 回調方法完成時,線程池處於 TERMINATED 狀態。
任務執行的核心方法是 execute()
方法。執行步驟以下:
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); int c = ctl.get(); if (workerCountOf(c) < corePoolSize) { if (addWorker(command, true)) return; c = ctl.get(); } if (isRunning(c) && workQueue.offer(command)) { int recheck = ctl.get(); if (! isRunning(recheck) && remove(command)) reject(command); else if (workerCountOf(recheck) == 0) addWorker(null, false); } else if (!addWorker(command, false)) reject(command); }
默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。
在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:
prestartCoreThread():初始化一個核心線程; prestartAllCoreThreads():初始化全部核心線程
public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意傳進去的參數是null } public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null ++n; return n; }
在前面咱們屢次提到了任務緩存隊列,即 workQueue,它用來存放等待執行的任務。
workQueue 的類型爲 BlockingQueue,一般能夠取下面三種類型:
當線程池的任務緩存隊列已滿而且線程池中的線程數目達到 maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略
ThreadPoolExecutor 提供了兩個方法,用於線程池的關閉,分別是 shutdown()和 shutdownNow(),其中:
ThreadPoolExecutor 提供了動態調整線程池容量大小的方法:setCorePoolSize()和 setMaximumPoolSize(),
當上述參數從小變大時,ThreadPoolExecutor 進行線程賦值,還可能當即建立新的線程來執行任務。
免費Java資料須要本身領取,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高併發分佈式等教程,一共30G。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q