線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。html
若是併發請求數量不少,但每一個線程執行的時間很短,就會出現頻繁的建立和銷燬線程。如此一來,會大大下降系統的效率,可能頻繁建立和銷燬線程的時間、資源開銷要大於實際工做的所需。java
正是因爲這個問題,因此有必要引入線程池。使用 線程池的好處 有如下幾點:編程
Executor 框架是一個根據一組執行策略調用,調度,執行和控制的異步任務的框架,目的是提供一種將」任務提交」與」任務如何運行」分離開來的機制。數組
Executor 框架核心 API 以下:緩存
Executor
- 運行任務的簡單接口。ExecutorService
- 擴展了 Executor
接口。擴展能力:
ScheduledExecutorService
- 擴展了 ExecutorService
接口。擴展能力:支持按期執行任務。AbstractExecutorService
- ExecutorService
接口的默認實現。ThreadPoolExecutor
- Executor 框架最核心的類,它繼承了 AbstractExecutorService
類。ScheduledThreadPoolExecutor
- ScheduledExecutorService
接口的實現,一個可定時調度任務的線程池。Executors
- 能夠經過調用 Executors
的靜態工廠方法來建立線程池並返回一個 ExecutorService
對象。Executor
接口中只定義了一個 execute
方法,用於接收一個 Runnable
對象。markdown
public interface Executor { void execute(Runnable command); } 複製代碼
ExecutorService
接口繼承了 Executor
接口,它還提供了 invokeAll
、invokeAny
、shutdown
、submit
等方法。多線程
public interface ExecutorService extends Executor { void shutdown(); List<Runnable> shutdownNow(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; } 複製代碼
從其支持的方法定義,不難看出:相比於 Executor
接口,ExecutorService
接口主要的擴展是:併發
sumbit
、invokeAll
、invokeAny
方法中都支持傳入Callable
對象。shutdown
、shutdownNow
、isShutdown
等方法。ScheduledExecutorService
接口擴展了 ExecutorService
接口。框架
它除了支持前面兩個接口的全部能力之外,還支持定時調度線程。異步
public interface ScheduledExecutorService extends ExecutorService { public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit); public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit); public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit); public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit); } 複製代碼
其擴展的接口提供如下能力:
schedule
方法能夠在指定的延時後執行一個 Runnable
或者 Callable
任務。scheduleAtFixedRate
方法和 scheduleWithFixedDelay
方法能夠按照指定時間間隔,按期執行任務。java.uitl.concurrent.ThreadPoolExecutor
類是 Executor
框架中最核心的類。因此,本文將着重講述一下這個類。
ThreadPoolExecutor
有如下重要字段:
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); private static final int COUNT_BITS = Integer.SIZE - 3; private static final int CAPACITY = (1 << COUNT_BITS) - 1; // runState is stored in the high-order bits private static final int RUNNING = -1 << COUNT_BITS; private static final int SHUTDOWN = 0 << COUNT_BITS; private static final int STOP = 1 << COUNT_BITS; private static final int TIDYING = 2 << COUNT_BITS; private static final int TERMINATED = 3 << COUNT_BITS; 複製代碼
參數說明:
ctl
- 用於控制線程池的運行狀態和線程池中的有效線程數量。它包含兩部分的信息:
runState
)workerCount
)ctl
使用了 Integer
類型來保存,高 3 位保存 runState
,低 29 位保存 workerCount
。COUNT_BITS
就是 29,CAPACITY
就是 1 左移 29 位減 1(29 個 1),這個常量表示 workerCount
的上限值,大約是 5 億。RUNNING
- 運行狀態。接受新任務,而且也能處理阻塞隊列中的任務。SHUTDOWN
- 關閉狀態。不接受新任務,但能夠處理阻塞隊列中的任務。
RUNNING
狀態時,調用 shutdown
方法會使線程池進入到該狀態。finalize
方法在執行過程當中也會調用 shutdown
方法進入該狀態。STOP
- 中止狀態。不接受新任務,也不處理隊列中的任務。會中斷正在處理任務的線程。在線程池處於 RUNNING
或 SHUTDOWN
狀態時,調用 shutdownNow
方法會使線程池進入到該狀態。TIDYING
- 整理狀態。若是全部的任務都已終止了,workerCount
(有效線程數) 爲 0,線程池進入該狀態後會調用 terminated
方法進入 TERMINATED
狀態。TERMINATED
- 已終止狀態。在 terminated
方法執行完後進入該狀態。默認 terminated
方法中什麼也沒有作。進入 TERMINATED
的條件以下:
RUNNING
狀態;TIDYING
狀態或 TERMINATED
狀態;SHUTDOWN
而且 workerQueue
爲空;workerCount
爲 0;TIDYING
狀態成功。ThreadPoolExecutor
有四個構造方法,前三個都是基於第四個實現。第四個構造方法定義以下:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { 複製代碼
參數說明:
corePoolSize
- 核心線程數量。當有新任務經過 execute
方法提交時 ,線程池會執行如下判斷:
corePoolSize
,則建立新線程來處理任務,即便線程池中的其餘線程是空閒的。corePoolSize
且小於 maximumPoolSize
,則只有當 workQueue
滿時才建立新的線程去處理任務;corePoolSize
和 maximumPoolSize
相同,則建立的線程池的大小是固定的。這時若是有新任務提交,若 workQueue
未滿,則將請求放入 workQueue
中,等待有空閒的線程去從 workQueue
中取任務並處理;maximumPoolSize
,這時若是 workQueue
已經滿了,則使用 handler
所指定的策略來處理任務;corePoolSize
=> workQueue
=> maximumPoolSize
。maximumPoolSize
- 最大線程數量。
keepAliveTime
:線程保持活動的時間。
corePoolSize
的時候,若是這時沒有新的任務提交,核心線程外的線程不會當即銷燬,而是會等待,直到等待的時間超過了 keepAliveTime
。unit
- keepAliveTime
的時間單位。有 7 種取值。可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。workQueue
- 等待執行的任務隊列。用於保存等待執行的任務的阻塞隊列。 能夠選擇如下幾個阻塞隊列。
ArrayBlockingQueue
- 有界阻塞隊列。
LinkedBlockingQueue
- 無界阻塞隊列。
Integer.MAX_VALUE
。ArrayBlockingQueue
。LinkedBlockingQueue
意味着: maximumPoolSize
將不起做用,線程池能建立的最大線程數爲 corePoolSize
,由於任務等待隊列是無界隊列。Executors.newFixedThreadPool
使用了這個隊列。SynchronousQueue
- 不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
LinkedBlockingQueue
。Executors.newCachedThreadPool
使用了這個隊列。PriorityBlockingQueue
- 具備優先級的無界阻塞隊列。threadFactory
- 線程工廠。能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。handler
- 飽和策略。它是 RejectedExecutionHandler
類型的變量。當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。線程池支持如下策略:
AbortPolicy
- 丟棄任務並拋出異常。這也是默認策略。DiscardPolicy
- 丟棄任務,但不拋出異常。DiscardOldestPolicy
- 丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)。CallerRunsPolicy
- 只用調用者所在的線程來運行任務。RejectedExecutionHandler
接口來定製處理策略。如記錄日誌或持久化不能處理的任務。默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。
提交任務可使用 execute
方法,它是 ThreadPoolExecutor
的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。
execute
方法工做流程以下:
workerCount < corePoolSize
,則建立並啓動一個線程來執行新提交的任務;workerCount >= corePoolSize
,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;workerCount >= corePoolSize && workerCount < maximumPoolSize
,且線程池內的阻塞隊列已滿,則建立並啓動一個線程來執行新提交的任務;workerCount >= maximumPoolSize
,而且線程池內的阻塞隊列已滿,則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。在 ThreadPoolExecutor
類中還有一些重要的方法:
submit
- 相似於 execute
,可是針對的是有返回值的線程。submit
方法是在 ExecutorService
中聲明的方法,在 AbstractExecutorService
就已經有了具體的實現。ThreadPoolExecutor
直接複用 AbstractExecutorService
的 submit
方法。shutdown
- 不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務。
SHUTDOWN
狀態;interruptIdleWorkers
方法請求中斷全部空閒的 worker;tryTerminate
嘗試結束線程池。shutdownNow
- 當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務。與 shutdown
方法相似,不一樣的地方在於:
STOP
;isShutdown
- 調用了 shutdown
或 shutdownNow
方法後,isShutdown
方法就會返回 true。isTerminaed
- 當全部的任務都已關閉後,才表示線程池關閉成功,這時調用 isTerminaed
方法會返回 true。setCorePoolSize
- 設置核心線程數大小。setMaximumPoolSize
- 設置最大線程數大小。getTaskCount
- 線程池已經執行的和未執行的任務總數;getCompletedTaskCount
- 線程池已完成的任務數量,該值小於等於 taskCount
;getLargestPoolSize
- 線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否滿過,也就是達到了 maximumPoolSize
;getPoolSize
- 線程池當前的線程數量;getActiveCount
- 當前線程池中正在執行任務的線程數量。public class ThreadPoolExecutorDemo { public static void main(String[] args) { ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 500, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()); for (int i = 0; i < 100; i++) { threadPoolExecutor.execute(new MyThread()); String info = String.format("線程池中線程數目:%s,隊列中等待執行的任務數目:%s,已執行玩別的任務數目:%s", threadPoolExecutor.getPoolSize(), threadPoolExecutor.getQueue().size(), threadPoolExecutor.getCompletedTaskCount()); System.out.println(info); } threadPoolExecutor.shutdown(); } static class MyThread implements Runnable { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 執行"); } } } 複製代碼
JDK 的 Executors
類中提供了幾種具備表明性的線程池,這些線程池 都是基於 ThreadPoolExecutor
的定製化實現。
在實際使用線程池的場景中,咱們每每不是直接使用 ThreadPoolExecutor
,而是使用 JDK 中提供的具備表明性的線程池實例。
建立一個單線程的線程池。
只會建立惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。 若是這個惟一的線程由於異常結束,那麼會有一個新的線程來替代它 。
單工做線程最大的特色是:可保證順序地執行各個任務。
示例:
public class SingleThreadExecutorDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newSingleThreadExecutor(); for (int i = 0; i < 100; i++) { executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 執行"); } }); } executorService.shutdown(); } } 複製代碼
建立一個固定大小的線程池。
每次提交一個任務就會新建立一個工做線程,若是工做線程數量達到線程池最大線程數,則將提交的任務存入到阻塞隊列中。
FixedThreadPool
是一個典型且優秀的線程池,它具備線程池提升程序效率和節省建立線程時所耗的開銷的優勢。可是,在線程池空閒時,即線程池中沒有可運行任務時,它不會釋放工做線程,還會佔用必定的系統資源。
示例:
public class FixedThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newFixedThreadPool(3); for (int i = 0; i < 100; i++) { executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 執行"); } }); } executorService.shutdown(); } } 複製代碼
建立一個可緩存的線程池。
CachedThreadPool
時,必定要注意控制任務的數量,不然,因爲大量線程同時運行,頗有會形成系統癱瘓。示例:
public class CachedThreadPoolDemo { public static void main(String[] args) { ExecutorService executorService = Executors.newCachedThreadPool(); for (int i = 0; i < 100; i++) { executorService.execute(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 執行"); } }); } executorService.shutdown(); } } 複製代碼
建立一個大小無限的線程池。此線程池支持定時以及週期性執行任務的需求。
public class ScheduledThreadPoolDemo { public static void main(String[] args) { schedule(); scheduleAtFixedRate(); } private static void schedule() { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); for (int i = 0; i < 100; i++) { executorService.schedule(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 執行"); } }, 1, TimeUnit.SECONDS); } executorService.shutdown(); } private static void scheduleAtFixedRate() { ScheduledExecutorService executorService = Executors.newScheduledThreadPool(5); for (int i = 0; i < 100; i++) { executorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { System.out.println(Thread.currentThread().getName() + " 執行"); } }, 1, 1, TimeUnit.SECONDS); } executorService.shutdown(); } } 複製代碼