在當今計算機的CPU計算速度很是快的狀況下,爲了可以充分利用CPU性能提升程序運行效率咱們在程序中使用了線程。可是在高併發狀況下會頻繁的建立和銷燬線程,這樣就變相的阻礙了程序的執行速度,因此爲了管理線程資源和減小線程建立以及銷燬的性能消耗就引入了線程池。數據庫
corePoolSize:線程池在建立完時,裏面並無線程,只有當任務到來時再去建立線程。緩存
maxPoolSize:線程池可能會在覈心線程數的基礎上額外增長一些線程,可是線程數量的上限是maxPoolSize
。好比第一天執行的任務很是多,次日執行的任務很是少,可是有了maxPoolSize
參數,就能夠加強任務處理的靈活性。bash
corePoolSize
即便線程沒有在執行任務,也會建立新的線程。corePoolSize
,但小於maxPoolSize
則將任務放入隊列。maxPoolSize
,則建立新的線程運行任務。maxPoolSize
,則拒絕該任務。執行流程:服務器
corePoolSize
和maxPoolSize
設置爲相同的值,那麼就會建立固定大小的線程池。maxPoolSize
參數設置爲很大的值,例如Integer.MAX_VALUE
,能夠容許線程池容納任意數量的併發任務。corePoolSize
的線程,因此若是使用了無界隊列(如:LinkedBlockingQueue
)就不會建立到超過corePoolSize
的線程數。若是線程池當前的線程數大於corePoolSize
,那麼若是多餘的線程的空閒時間大於keepAliveTime
,它們就會被終止。網絡
keepAliveTime
參數的使用能夠減小線程數過多冗餘時的資源消耗。併發
新的線程由ThreadFactory
建立,默認使用Executors.defaultThreadFactory()
,建立出來的線程都在同一個線程組,擁有一樣的NORM_PRIORITY
優先級而且都不是守護線程。若是本身指定ThreadFactory
,那麼就能夠改變線程名、線程組、優先級、是不是守護線程等。一般狀況下直接使用defaultThreadFactory
就行。ide
SynchronousQueue
):任務很少時,只須要用隊列進行簡單的任務中轉,這種隊列沒法存儲任務,在使用這種隊列時,須要將maxPoolSize
設置的大一點。LinkedBlockingQueue
):若是使用無界隊列看成workQueue
,將maxQueue
設置的多大都沒有用,使用無界隊列的優勢是能夠防止流量突增,缺點是若是處理任務的速度跟不上提交任務的速度,這樣就會致使無界隊列中的任務愈來愈多,從而致使OOM
異常。ArrayBlockingQueue
):使用有界隊列能夠設置隊列大小,讓線程池的maxPoolSize
有意義。手動建立更好,由於這樣可讓咱們更加了解線程池的運行規則,避免資源耗盡的風險。函數
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
newFixedThreadPool
線程池經過傳入相同的corePoolSize
和maxPoolSize
能夠保證線程數量固定,0L
的keepAliveTime
表示時刻被銷燬,workQueue
使用的是無界隊列。這樣潛在的問題就是當處理任務的速度趕不上任務提交的速度的時候,就可能會讓大量任務堆積在workQueue
中,從而引起OOM
異常。高併發
/**
* 演示newFixedThreadPool線程池OOM問題
*/
public class FixedThreadPoolOOM {
private static ExecutorService executorService = Executors.newFixedThreadPool(1);
public static void main(String[] args) {
for (int i = 0; i < Integer.MAX_VALUE; i++) {
executorService.execute(new SubThread());
}
}
}
class SubThread implements Runnable {
@Override
public void run() {
try {
//延長任務時間
Thread.sleep(1000000000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
複製代碼
更改JVM
參數工具
運行結果
使用線程池打印線程名
public class SingleThreadExecutor {
public static void main(String[] args) {
ExecutorService executorService = Executors.newSingleThreadExecutor();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
複製代碼
newSingleThreadExecutor
源碼
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
複製代碼
從源碼能夠看出newSingleThreadExecutor
和newFixedThreadPool
基本相似,不一樣的只是corePoolSize
和maxPoolSize
的值,因此newSingleThreadExecutor
也存在內存溢出問題。
newCachedThreadPool
也被稱爲可緩存線程池,它是一個
無界線程池,具備
自動回收多餘線程的功能。
public class CachedThreadPool {
public static void main(String[] args) {
ExecutorService executorService = Executors.newCachedThreadPool();
for (int i = 0; i < 1000; i++) {
executorService.execute(new Task());
}
}
}
複製代碼
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
複製代碼
newCachedThreadPool
的maxPoolSize
設置的值爲Integer.MAX_VALUE
,因此可能會致使線程被無限建立,最終致使OOM
異常。
該線程池支持週期性任務的執行
public class ScheduledThreadPoolTest {
public static void main(String[] args) {
ScheduledExecutorService scheduledExecutorService =
Executors.newScheduledThreadPool(10);
// scheduledExecutorService.schedule(new Task(), 5, TimeUnit.SECONDS);
scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
}
}
複製代碼
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
new DelayedWorkQueue());
}
複製代碼
根據業務場景不一樣,本身設置線程池參數,例如內存有多大,本身取線程名子等。
線程數=CPU核心數 × (1+平均等待時間/平均工做時間)
corePoolSize
和maxPoolSize
,以固定的線程數來執行任務corePoolSize
和maxPoolSize
默認都是1,全程只以1條線程執行任務corePoolSize
是經過手動傳入的,它的maxPoolSize
爲Integer.MAX_VALUE
,而且具備自動回收線程的功能。由於這兩個線程池的核心線程數和最大線程數都是相同的,也就沒法預估任務量,因此須要在自身進行改進,就使用了無界隊列。
由於緩存線程池的最大線程數是「無上限」的,每當任務來的時候直接建立線程進行執行就行了,因此不須要使用隊列來存儲任務。這樣避免了使用隊列進行任務的中轉,提升了執行效率。
由於ScheduledThreadPool
是延遲任務線程池,因此使用延遲隊列有利於對執行任務的時間作延遲。
workStealingPool
適用於執行產生子任務的環境,例如進行二叉樹的遍歷。workStealingPool
具備竊取能力。shutdown()
方法不必定會當即中止,這個方法僅僅是初始整個關閉過程。由於線程池中的線程有可能正在運行,而且隊列中也有待處理的任務,不可能說停就停。因此每當調用該方法時,線程池會把正在執行的任務和隊列中等待的任務都執行完畢再關閉,而且在此期間若是接收到新的任務會被拒絕。/**
* 演示關閉線程池
*/
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
executorService.shutdown();
//再次提交任務
executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
複製代碼
/**
* 演示關閉線程池
*/
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown());
executorService.shutdown();
System.out.println(executorService.isShutdown());
//再次提交任務
// executorService.execute(new ShutDownTask());
}
}
class ShutDownTask implements Runnable {
@Override
public void run() {
try {
Thread.sleep(500);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName());
}
}
複製代碼
/**
* 演示關閉線程池
*/
public class ShutDown {
public static void main(String[] args) throws InterruptedException {
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 1000; i++) {
executorService.execute(new ShutDownTask());
}
Thread.sleep(1500);
System.out.println(executorService.isShutdown());
executorService.shutdown();
System.out.println(executorService.isShutdown());
System.out.println(executorService.isTerminated());
//再次提交任務
// executorService.execute(new ShutDownTask());
}
}
複製代碼
isTerminated
方法的地方休眠10s
//在3s後判斷線程池是否被終止,返回boolean值
System.out.println(executorService.awaitTermination(3L, TimeUnit.SECONDS));
複製代碼
Executor
關閉時,新提交的任務會被拒絕。Executor
對最大線程數和工做隊列容量使用有限邊界而且已經飽和時。總結:第四種拒絕策略相對於前三種更加「機智」一些,能夠避免前面三種策略產生的損失。在第四種策略下能夠下降提交的速度,達到負反饋的效果。
/**
* 演示每一個任務執行的先後放鉤子函數
*/
public class PauseableThreadPool extends ThreadPoolExecutor {
private boolean isPaused;
private final ReentrantLock lock = new ReentrantLock();
private Condition unPaused = lock.newCondition();
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
}
public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
TimeUnit unit, BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory, RejectedExecutionHandler handler) {
super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
}
@Override
protected void beforeExecute(Thread t, Runnable r) {
super.beforeExecute(t, r);
lock.lock();
try {
while (isPaused) {
unPaused.await();
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
private void pause() {
lock.lock();
try {
isPaused = true;
} finally {
lock.unlock();
}
}
public void resume() {
lock.lock();
try {
isPaused = false;
//喚醒所有
unPaused.signalAll();
} finally {
lock.unlock();
}
}
public static void main(String[] args) throws InterruptedException {
PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L,
TimeUnit.SECONDS, new LinkedBlockingQueue<>());
Runnable runnable = new Runnable() {
@Override
public void run() {
System.out.println("我被執行");
try {
Thread.sleep(10);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
};
for (int i = 0; i < 10000; i++) {
pauseableThreadPool.execute(runnable);
}
Thread.sleep(1500);
pauseableThreadPool.pause();
System.out.println("線程池被暫停了");
Thread.sleep(1500);
pauseableThreadPool.resume();
System.out.println("線程池被恢復了");
}
}
複製代碼
Executor
,是Executor
的子接口,在接口內部增長了一些新的方法,例如第6小節講到的幾個方法利用相同線程執行不一樣任務
public void execute(Runnable command) {
// 判斷任務是否爲空,爲空就拋出異常
if (command == null)
throw new NullPointerException();
int c = ctl.get();
// 若是當前線程數小於核心線程數,就增長Worker
if (workerCountOf(c) < corePoolSize) {
// command就是任務,點擊addWorker方法
// 第二個參數用於判斷當前線程數是否小於核心線程數
if (addWorker(command, true))
return;
c = ctl.get();
}
// 此時線程數大於等於核心線程數
// 判斷線程池是否是正在運行並將任務放到工做隊列中
if (isRunning(c) && workQueue.offer(command)) {
// 再次檢查線程狀態
int recheck = ctl.get();
// 若是線程不是正在運行的,就刪除掉任務而且拒絕
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0) //這裏用於避免已經提交的任務沒有線程進行執行
addWorker(null, false);
}
// 若是任務沒法添加或者大於最大線程數就拒絕任務
else if (!addWorker(command, false))
reject(command);
}
複製代碼
由於要查看的是Worker
因此進入到addWorker()
方法後點擊Worker
類查看runWorker()
方法
w = new Worker(firstTask);
複製代碼
private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
複製代碼
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
// 獲取到任務
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
// 只要任務不爲空或者可以獲取到任務就執行下面的方法
while (task != null || (task = getTask()) != null) {
w.lock();
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
// task是一個Runnable類型,調用run()方法就是運行線程
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
複製代碼
總結:核心原理就是獲取到task
,若是task
不爲空就調用run()
方法,這樣就實現了線程的複用,達到讓相同的線程執行不一樣任務的目的。
shutdownNow()
帶來的效果workerCount
爲零時,線程會轉換到TIDYING狀態,並將運行terminate()
鉤子方法terminate()
運行完成