線程池 掌握治理線程的法寶

1.爲何須要線程池

在當今計算機的CPU計算速度很是快的狀況下,爲了可以充分利用CPU性能提升程序運行效率咱們在程序中使用了線程。可是在高併發狀況下會頻繁的建立和銷燬線程,這樣就變相的阻礙了程序的執行速度,因此爲了管理線程資源和減小線程建立以及銷燬的性能消耗就引入了線程池。web

2.什麼場景下適合使用線程池

當服務器接收到大量任務時,若是使用線程池能夠大量減小線程的建立與銷燬次數,從而提高程序執行效率
在實際開發中,若是須要建立5個以上的線程,那麼就可使用線程池來管理數據庫

3.線程池參數介紹以及特色

file
https://user-gold-cdn.xitu.io...segmentfault

3.1 corePoolSize和maxPoolSize

corePoolSize:線程池在建立完時,裏面並無線程,只有當任務到來時再去建立線程。緩存

maxPoolSize:線程池可能會在覈心線程數的基礎上額外增長一些線程,可是線程數量的上限是maxPoolSize。好比第一天執行的任務很是多,次日執行的任務很是少,可是有了maxPoolSize參數,就能夠加強任務處理的靈活性。服務器

3.2 添加線程的規則

當線程數量小於corePoolSize即便線程沒有在執行任務,也會建立新的線程。
若是線程數量等於(或大於)corePoolSize,但小於maxPoolSize則將任務放入隊列。
若是隊列已滿,而且線程數小於maxPoolSize,則建立新的線程運行任務。
若是隊列已滿,而且線程數大於或等於maxPoolSize,則拒絕該任務。
執行流程:
file
https://user-gold-cdn.xitu.io...網絡

3.3 增減線程的特色

將corePoolSize和maxPoolSize設置爲相同的值,那麼就會建立固定大小的線程池。
線程池但願保持更少的線程數,而且只有在負載變得很大時纔會增長它。
若是將線程池的maxPoolSize參數設置爲很大的值,例如Integer.MAX_VALUE,能夠容許線程池容納任意數量的併發任務。
只有在隊列滿了的時候纔會去建立大於corePoolSize的線程,因此若是使用了無界隊列(如:LinkedBlockingQueue)就不會建立到超過corePoolSize的線程數。併發

3.4 keepAliveTime

若是線程池當前的線程數大於corePoolSize,那麼若是多餘的線程的空閒時間大於keepAliveTime,它們就會被終止。ide

keepAliveTime參數的使用能夠減小線程數過多冗餘時的資源消耗。函數

3.5 threadFactory

新的線程由ThreadFactory建立,默認使用Executors.defaultThreadFactory(),建立出來的線程都在同一個線程組,擁有一樣的NORM_PRIORITY優先級而且都不是守護線程。若是本身指定ThreadFactory,那麼就能夠改變線程名、線程組、優先級、是不是守護線程等。一般狀況下直接使用defaultThreadFactory就行。高併發

3.6 workQueue

直接交接(SynchronousQueue):任務很少時,只須要用隊列進行簡單的任務中轉,這種隊列沒法存儲任務,在使用這種隊列時,須要將maxPoolSize設置的大一點。

無界隊列(LinkedBlockingQueue):若是使用無界隊列看成workQueue,將maxQueue設置的多大都沒有用,使用無界隊列的優勢是能夠防止流量突增,缺點是若是處理任務的速度跟不上提交任務的速度,這樣就會致使無界隊列中的任務愈來愈多,從而致使OOM異常。

有界隊列(ArrayBlockingQueue):使用有界隊列能夠設置隊列大小,讓線程池的maxPoolSize有意義。

4.線程池應該手動建立仍是自動建立

手動建立更好,由於這樣可讓咱們更加了解線程池的運行規則,避免資源耗盡的風險。

4.1 直接調用JDK封裝好的線程池會帶來的問題

newFixedThreadPool

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

newFixedThreadPool線程池經過傳入相同的corePoolSize和maxPoolSize能夠保證線程數量固定,0L的keepAliveTime表示時刻被銷燬,workQueue使用的是無界隊列。這樣潛在的問題就是當處理任務的速度趕不上任務提交的速度的時候,就可能會讓大量任務堆積在workQueue中,從而引起OOM異常。

4.2 演示newFixedThreadPool內存溢出問題

/**
 * 演示newFixedThreadPool線程池OOM問題
 */
public class FixedThreadPoolOOM {

    private static ExecutorService executorService = Executors.newFixedThreadPool(1);

    public static void main(String[] args) {
        for (int i = 0; i < Integer.MAX_VALUE; i++) {
            executorService.execute(new SubThread());
        }
    }
}

class SubThread implements Runnable {

    @Override
    public void run() {
        try {
            //延長任務時間
            Thread.sleep(1000000000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

更改JVM參數
file

運行結果
file

4.3 newSingleThreadExecutor

使用線程池打印線程名

public class SingleThreadExecutor {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}

file

查看newSingleThreadExecutor源碼

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

從源碼能夠看出newSingleThreadExecutor和newFixedThreadPool基本相似,不一樣的只是corePoolSize和maxPoolSize的值,因此newSingleThreadExecutor也存在內存溢出問題。

4.4 newCachedThreadPool

newCachedThreadPool也被稱爲可緩存線程池,它是一個無界線程池,具備自動回收多餘線程的功能。
file

public class CachedThreadPool {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new Task());
        }
    }
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

file
newCachedThreadPool的maxPoolSize設置的值爲Integer.MAX_VALUE,因此可能會致使線程被無限建立,最終致使OOM異常。

4.5 newScheduledThreadPool

該線程池支持週期性任務的執行

public class ScheduledThreadPoolTest {
    public static void main(String[] args) {
        ScheduledExecutorService scheduledExecutorService =
                Executors.newScheduledThreadPool(10);
//        scheduledExecutorService.schedule(new Task(), 5, TimeUnit.SECONDS);
        scheduledExecutorService.scheduleAtFixedRate(new Task(), 1, 3, TimeUnit.SECONDS);
    }
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
    super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
          new DelayedWorkQueue());
}

file

4.6 正確的建立線程池的方法

根據業務場景不一樣,本身設置線程池參數,例如內存有多大,本身取線程名子等。

4.7 線程池裏的線程數量設置多少比較合適?

CPU密集型(加密、計算hash等):最佳線程數設置爲CPU核心數的1——2倍。
耗時I/O型(讀寫數據庫、文件、網絡讀寫等):最佳線程數通常會大於CPU核心數不少倍,以JVM監控顯示繁忙狀況爲依據,保證線程空閒能夠銜接上。參考Brain Goezt推薦的計算方法:

線程數=CPU核心數 × (1+平均等待時間/平均工做時間)

5.對比線程池的特色

file

FixedThreadPool:經過手動傳入corePoolSize和maxPoolSize,以固定的線程數來執行任務

SingleThreadExecutor:corePoolSize和maxPoolSize默認都是1,全程只以1條線程執行任務

CachedThreadPool:它沒有須要維護的核心線程數,每當須要線程的時候就進行建立,由於它的線程存活時間是60秒,因此它也憑藉着這個參數實現了自動回收的功能。

ScheduledThreadPool:這個線程池能夠執行定時任務,corePoolSize是經過手動傳入的,它的maxPoolSize爲Integer.MAX_VALUE,而且具備自動回收線程的功能。

5.1 爲何FixedThreadPool和SingleThreadExecutor的Queue是LinkedBlockingQueue?

由於這兩個線程池的核心線程數和最大線程數都是相同的,也就沒法預估任務量,因此須要在自身進行改進,就使用了無界隊列。

5.2 爲何CachedThreadPool使用的Queue是SynchronousQueue?

由於緩存線程池的最大線程數是「無上限」的,每當任務來的時候直接建立線程進行執行就行了,因此不須要使用隊列來存儲任務。這樣避免了使用隊列進行任務的中轉,提升了執行效率。

5.3 爲何ScheduledThreadPool使用延遲隊列DelayedWorkQueue?

由於ScheduledThreadPool是延遲任務線程池,因此使用延遲隊列有利於對執行任務的時間作延遲。

5.4 JDK1.8中加入的workStealingPool

workStealingPool適用於執行產生子任務的環境,例如進行二叉樹的遍歷。
workStealingPool具備竊取能力。
使用時最好不要加鎖,並且不保證執行順序。

6.中止線程池的正確方法

shutdown:調用了shutdown()方法不必定會當即中止,這個方法僅僅是初始整個關閉過程。由於線程池中的線程有可能正在運行,而且隊列中也有待處理的任務,不可能說停就停。因此每當調用該方法時,線程池會把正在執行的任務和隊列中等待的任務都執行完畢再關閉,而且在此期間若是接收到新的任務會被拒絕。

/**
 * 演示關閉線程池
 */
public class ShutDown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        executorService.shutdown();

        //再次提交任務
        executorService.execute(new ShutDownTask());
    }
}

class ShutDownTask implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

file

isShutdown:能夠用於判斷線程池是否被shutdown了

/**
 * 演示關閉線程池
 */
public class ShutDown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        System.out.println(executorService.isShutdown());
        executorService.shutdown();
        System.out.println(executorService.isShutdown());
        //再次提交任務
//        executorService.execute(new ShutDownTask());
    }
}

class ShutDownTask implements Runnable {

    @Override
    public void run() {
        try {
            Thread.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName());
    }
}

file

isTerminated:能夠判斷線程是否被徹底終止了

/**
 * 演示關閉線程池
 */
public class ShutDown {
    public static void main(String[] args) throws InterruptedException {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        for (int i = 0; i < 1000; i++) {
            executorService.execute(new ShutDownTask());
        }
        Thread.sleep(1500);
        System.out.println(executorService.isShutdown());
        executorService.shutdown();
        System.out.println(executorService.isShutdown());
        System.out.println(executorService.isTerminated());
        //再次提交任務
//        executorService.execute(new ShutDownTask());
    }
}

file
將循環的次數改成100次,而且在第一次調用isTerminated方法的地方休眠10s
file

awaitTermination:傳入等待時間,等待時間達到時判斷是否中止了,主要用於檢測。

//在3s後判斷線程池是否被終止,返回boolean值
System.out.println(executorService.awaitTermination(3L, TimeUnit.SECONDS));

shutdownNow:調用了這個方法時,線程池會當即終止,並返回沒有被處理完的任務。若是須要繼續執行這裏的任務能夠再次讓線程池執行這些返回的任務。

7.任務太多,怎麼拒絕?

7.1 拒絕的時機

當Executor關閉時,新提交的任務會被拒絕。
以及Executor對最大線程數和工做隊列容量使用有限邊界而且已經飽和時。

7.2 拒絕策略

AbortPolicy(中斷策略):直接拋出異常進行拒絕
DiscardPolicy(丟棄策略):不會獲得通知,默默的拋棄掉任務
DiscardOldestPolicy(丟棄最老的):因爲隊列中存儲了不少任務,這個策略會丟棄在隊列中存在時間最久的任務。
CallerRunsPolicy:好比主線程給線程池提交任務,可是線程池已經滿了,在這種策略下會讓提交任務的線程去執行。

總結:第四種拒絕策略相對於前三種更加「機智」一些,能夠避免前面三種策略產生的損失。在第四種策略下能夠下降提交的速度,達到負反饋的效果。

8.使用鉤子爲線程池加點料(可用於日誌記錄)

/**
 * 演示每一個任務執行的先後放鉤子函數
 */
public class PauseableThreadPool extends ThreadPoolExecutor {

    private boolean isPaused;
    private final ReentrantLock lock = new ReentrantLock();
    private Condition unPaused = lock.newCondition();

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }


    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
    }


    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, handler);
    }

    public PauseableThreadPool(int corePoolSize, int maximumPoolSize, long keepAliveTime,
                               TimeUnit unit, BlockingQueue<Runnable> workQueue,
                               ThreadFactory threadFactory, RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        super.beforeExecute(t, r);
        lock.lock();
        try {
            while (isPaused) {
                unPaused.await();
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    private void pause() {
        lock.lock();
        try {
            isPaused = true;
        } finally {
            lock.unlock();
        }
    }

    public void resume() {
        lock.lock();
        try {
            isPaused = false;
            //喚醒所有
            unPaused.signalAll();
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        PauseableThreadPool pauseableThreadPool = new PauseableThreadPool(10, 20, 10L,
                TimeUnit.SECONDS, new LinkedBlockingQueue<>());
        Runnable runnable = new Runnable() {
            @Override
            public void run() {
                System.out.println("我被執行");
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        };
        for (int i = 0; i < 10000; i++) {
            pauseableThreadPool.execute(runnable);
        }
        Thread.sleep(1500);
        pauseableThreadPool.pause();
        System.out.println("線程池被暫停了");
        Thread.sleep(1500);
        pauseableThreadPool.resume();
        System.out.println("線程池被恢復了");
    }
}

file

9.線程池實現原理

9.1 線程池組成部分

線程池管理器
工做線程
任務隊列
任務

9.2 Executor家族

file
Executor:它是一個頂層接口,其餘接口以及類都i繼承或實現於它,包含如下方法:
void execute(Runnable command);
ExecutorService:它繼承於Executor,是Executor的子接口,在接口內部增長了一些新的方法,例如第6小節講到的幾個方法
Executors:這個類是一個工具類,裏面包含一些建立線程池的方法

9.3 線程池實現任務複用的原理

利用相同線程執行不一樣任務

源碼分析

public void execute(Runnable command) {
    // 判斷任務是否爲空,爲空就拋出異常
    if (command == null)
        throw new NullPointerException();
    
    int c = ctl.get();
    // 若是當前線程數小於核心線程數,就增長Worker
    if (workerCountOf(c) < corePoolSize) {
        // command就是任務,點擊addWorker方法
        // 第二個參數用於判斷當前線程數是否小於核心線程數
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 此時線程數大於等於核心線程數
    // 判斷線程池是否是正在運行並將任務放到工做隊列中
    if (isRunning(c) && workQueue.offer(command)) {
        // 再次檢查線程狀態
        int recheck = ctl.get();
        // 若是線程不是正在運行的,就刪除掉任務而且拒絕
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)   //這裏用於避免已經提交的任務沒有線程進行執行
            addWorker(null, false);
    }
    // 若是任務沒法添加或者大於最大線程數就拒絕任務
    else if (!addWorker(command, false))
        reject(command);
}

由於要查看的是Worker因此進入到addWorker()方法後點擊Worker類查看runWorker()方法

w = new Worker(firstTask);

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 獲取到任務
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 只要任務不爲空或者可以獲取到任務就執行下面的方法
        while (task != null || (task = getTask()) != null) {
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // task是一個Runnable類型,調用run()方法就是運行線程
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

總結:核心原理就是獲取到task,若是task不爲空就調用run()方法,這樣就實現了線程的複用,達到讓相同的線程執行不一樣任務的目的。

10.線程池狀態

RUNNING:接受新任務並處理排隊任務
SHUTDOWN:不接受新的任務可是處理排隊任務
STOP:不接受新的任務,也不處理排隊的任務,並中斷正在執行的任務,就是調用shutdownNow()帶來的效果
TIDYING:中文意思是整潔,意思就是說任務都已經終止,workerCount爲零時,線程會轉換到TIDYING狀態,並將運行terminate()鉤子方法
TERMINATED:terminate()運行完成
file

11.使用線程池的注意點

避免任務的堆積(堆積容易產生內存溢出)
避免線程數過多增長(緩存線程池會致使線程數過分增長)
排查線程泄漏(線程已經執行完畢卻沒法被回收)

轉自:https://juejin.im/post/5e1b1f...

相關文章
相關標籤/搜索