線程池?面試?看這篇就夠了!

ThreadPool
<!-- more -->java

本文原創地址, 個人博客https://jsbintask.cn/2019/03/10/jdk/jdk8-threadpool/(食用效果最佳),轉載請註明出處!

前言

在實際工做中,線程是一個咱們常常要打交道的角色,它能夠幫咱們靈活利用資源,提高程序運行效率。可是咱們今天不是探討線程!咱們今天來聊聊另外一個與線程息息相關的角色:線程池.本篇文章的目的就是全方位的解析線程池的做用,以及jdk中的接口,實現以及原理,另外對於某些重要概念,將從源碼的角度探討。
tip:本文較長,建議先碼後看。編程

線程池介紹

首先咱們看一段建立線程而且運行的經常使用代碼:數組

for (int i = 0; i < 100; i++) {
    new Thread(() -> {
        System.out.println("run thread->" + Thread.currentThread().getName());
        //to do something, send email, message, io operator, network...
    }).start();
}

上面的代碼很容易理解,咱們爲了異步,或者效率考慮,將某些耗時操做放入一個新線程去運行,可是這樣的代碼卻存在這樣的問題:安全

  1. 建立銷燬線程資源消耗; 咱們使用線程的目的本是出於效率考慮,能夠爲了建立這些線程卻消耗了額外的時間資源,對於線程的銷燬一樣須要系統資源。
  2. cpu資源有限,上述代碼建立線程過多,形成有的任務不能即時完成,響應時間過長。
  3. 線程沒法管理,無節制地建立線程對於有限的資源來講彷佛成了「得不償失」的一種做用。

手動建立執行線程存在以上問題,而線程池就是用來解決這些問題的。怎麼解決呢?咱們能夠先粗略的定義一下線程池:多線程

線程池是一組已經建立好的,一直在等待任務執行的線程的集合。

由於線程池中線程是已經建立好的,因此對於任務的執行不會消耗掉額外的資源,線程池中線程個數由咱們自定義添加,可相對於資源,資源任務作出調整,對於某些任務,若是線程池還沒有執行,可手動取消,線程任務變得可以管理!
因此,線程池的做用以下:併發

  1. 下降資源消耗。經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  2. 提升響應速度。當任務到達時,任務能夠不須要等到線程建立就能當即執行。
  3. 提升線程的可管理性。

jdk線程池詳解

上面咱們已經知道了線程池的做用,而對於這樣一個好用,重要的工具,jdk固然已經爲咱們提供了實現,這也是本篇文章的重點。
在jdk中,關於線程池的接口,類都定義在juc(java.util.concurrent)包中,這是jdk專門爲咱們提供用於併發編程的包,固然,本篇文章咱們只介紹與線程池有關的接口和類,首先咱們看下重點要學習的接口和類:
ThreadPool
如圖所示,咱們將一一講解這6個類的做用而且分析。異步

Executor

首先咱們須要瞭解就是Executor接口,它有一個方法,定義以下:
ThreadPool
Executor自jdk1.5引入,這個接口只有一個方法execute聲明,它的做用以及定義以下:接收一個任務(Runnable)而且執行。注意:同步執行仍是異步執行都可
由它的定義咱們就知道,它是一個線程池最基本的做用。可是在實際使用中,咱們經常使用的是另一個功能更多的子類ExecutorService工具

ExecutorService

ThreadPool
這個接口繼承自Executor,它的方法定義就豐富多了,能夠關閉,提交Future任務,批量提交任務,獲取執行結果等,咱們一一講解下每一個方法做用聲明:學習

  1. void shutdown(): 「優雅地」關閉線程池,爲何是「優雅地」呢?由於這個線程池在關閉前會先等待線程池中已經有的任務執行完成,通常會配合方法awaitTermination一塊兒使用,調用該方法後,線程池中不能再加入新的任務。
  2. List<Runnable> shutdownNow();: 「嘗試」終止正在執行的線程,返回在正在等待的任務列表,調用這個方法後,會調用正在執行線程的interrupt()方法,因此若是正在執行的線程若是調用了sleep,join,await等方法,會拋出InterruptedException異常。
  3. boolean awaitTermination(long timeout, TimeUnit unit): 該方法是一個阻塞方法,參數分別爲時間和時間單位。這個方法通常配合上面兩個方法以後調用。若是先調用shutdown方法,全部任務執行完成返回true,超時返回false,若是先調用的是shutdownNow方法,正在執行的任務所有完成true,超時返回false。
  4. boolean isTerminated();: 調用方法1或者2後,若是全部人物所有執行完畢則返回true,也就是說,就算全部任務執行完畢,可是不是先調用1或者2,也會返回false。
  5. <T> Future<T> submit(Callable<T> task);: 提交一個可以返回結果的Callable任務,返回任務結果抽象對象是Future,調用Future.get()方法能夠阻塞等待獲取執行結果,例如:

result = exec.submit(aCallable).get();,提交一個任務而且一直阻塞知道該任務執行完成獲取到返回結果。this

  1. <T> Future<T> submit(Runnable task, T result);: 提交一個Runnable任務,執行成功後調用Future.get()方法返回的是result(這是什麼騷操做?)。
  2. Future<?> submit(Runnable task);:和6不一樣的是調用Future.get()方法返回的是null(這又是什麼操做?)。
  3. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks): 提交一組任務,而且返回每一個任務執行結果的抽象對象List<Future<T>>,Future做用同上,值得注意的是:

當調用其中任一Future.isDone()(判斷任務是否完成,正常,異常終止都算)方法時,必須等到全部任務都完成時才返回true,簡單說:所有任務完成纔算完成

  1. <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit): 同方法8,多了一個時間參數,不一樣的是:若是超時,Future.isDone()一樣返回true。
  2. <T> T invokeAny(Collection<? extends Callable<T>> tasks):這個看名字和上面對比就容易理解了,返回第一個正常完成的任務地執行結果,後面沒有完成的任務將被取消。
  3. <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit):同10相比,多了一個超時參數。不一樣的是:在超時時間內,一個任務都沒有完成,將拋出TimeoutException

到如今,咱們已經知道了一個線程池基本的全部方法,知道了每一個方法的做用,接下來咱們就來看看具體實現,首先咱們研究下ExecutorService的具體實現抽象類:AbstractExecutorService

AbstractExecutorService

ThreadPool
AbstractExecutorService是一個抽象類,繼承自ExecutorService,它實現了ExecutorService接口的submit, invokeAll, invokeAny方法,主要用於將ExecutorService的公共實現封裝,方便子類更加方便使用,接下來咱們看看具體實現:

1. submit方法:

public Future<?> submit(Runnable task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}
  • 判空
  • 利用task構建一個Future的子類RunnableFuture,最後返回
  • 執行這個任務(execute方法聲明在Executor接口中,因此也是交由子類實現)。

execute方法交由子類實現了,這裏咱們主要分析newTaskFor方法,看它是如何構建Future對象的:
首先,RunnableFuture接口定義以下:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    void run();
}

他就是Future和Runnable的組合,它的實現是FutureTask
ThreadPool

2. invokeAll方法:

public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
        if (tasks == null)
            throw new NullPointerException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
        boolean done = false;  // ①
        try {
            for (Callable<T> t : tasks) {  // ②
                RunnableFuture<T> f = newTaskFor(t);
                futures.add(f);
                execute(f);
            }
            for (int i = 0, size = futures.size(); i < size; i++) {
                Future<T> f = futures.get(i);    // ③
                if (!f.isDone()) {
                    try {
                        f.get();
                    } catch (CancellationException ignore) {
                    } catch (ExecutionException ignore) {
                    }
                }
            }
            done = true;   //  ④
            return futures;
        } finally {
            if (!done)     //   ⑤
                for (int i = 0, size = futures.size(); i < size; i++)
                    futures.get(i).cancel(true);
        }
}
  1. 聲明一個flag判斷全部任務是否所有完成
  2. 調用newTaskFor方法構建RunnableFuture對象,循環調用execute方法添加每個任務。
  3. 遍歷每一個任務結果,判斷是否執行完成,沒有完成調用 get()阻塞方法等待完成。
  4. 全部任務所有完成,將flag設置成true。
  5. 出現異常,還有任務沒有完成,全部任務取消:Future.cancel()(實際是調用執行線程的interrupt方法。

上面代碼分析和咱們一開始講解ExecutorServiceinvokeAll一致。

3. invokeAny方法

ThreadPool
invokeAny實際調用doInvokeAny:

private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                              boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
        if (tasks == null)
            throw new NullPointerException();
        int ntasks = tasks.size();
        if (ntasks == 0)
            throw new IllegalArgumentException();
        ArrayList<Future<T>> futures = new ArrayList<Future<T>>(ntasks);
        ExecutorCompletionService<T> ecs =     // ①
            new ExecutorCompletionService<T>(this);

        try {
            ExecutionException ee = null;
            final long deadline = timed ? System.nanoTime() + nanos : 0L;
            Iterator<? extends Callable<T>> it = tasks.iterator();
            
            futures.add(ecs.submit(it.next()));       // ②
            --ntasks;
            int active = 1;

            for (;;) {
                Future<T> f = ecs.poll();    //  ③
                if (f == null) {
                    if (ntasks > 0) {
                        --ntasks;
                        futures.add(ecs.submit(it.next()));
                        ++active;
                    }
                    else if (active == 0)
                        break;
                    else if (timed) {
                        f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
                        if (f == null)
                            throw new TimeoutException();
                        nanos = deadline - System.nanoTime();
                    }
                    else                  //  ④
                        f = ecs.take();
                }
                if (f != null) {           // ⑤
                    --active;
                    try {
                        return f.get();
                    } catch (ExecutionException eex) {
                        ee = eex;
                    } catch (RuntimeException rex) {
                        ee = new ExecutionException(rex);
                    }
                }
            }

            if (ee == null)       
                ee = new ExecutionException();
            throw ee;

        } finally {
            for (int i = 0, size = futures.size(); i < size; i++)      //  ⑥
                futures.get(i).cancel(true);
        }
    }
  1. 聲明一個ExecutorCompletionService ecs,這個對象實際是一個任務執行結果阻塞隊列和線程池的結合,因此它能夠加入任務,執行任務,將任務執行結果加入阻塞隊列。
  2. 向ecs添加tasks中的第一個任務而且執行。
  3. 從ecs的阻塞隊列中取出第一個(隊頭),若是爲null(不爲null跳到註釋⑤),說明一個任務都還沒執行完成,繼續添加任務。
  4. 若是全部任務都被添加了,阻塞等待任務的執行結果,知道有任一任務執行完成。
  5. 若是取到了某個任務的執行結果,直接返回。
  6. 取消全部還沒執行的任務。

上面代碼分析和咱們一開始講解ExecutorServiceinvokeAny一致。 到如今,咱們已經分析完了AbstractExecutorService中的公共的方法,接下來就該研究最終的具體實現了:ThreadPoolExecutor

ThreadPoolExecutor

ThreadPoolExecutor繼承自AbstractExecutorService,它是線程池的具體實現:
ThreadPool
咱們首先分析下構造方法:`public ThreadPoolExecutor(int corePoolSize,

int maximumPoolSize,
                                       long keepAliveTime,
                                       TimeUnit unit,
                                       BlockingQueue<Runnable> workQueue,
                                       ThreadFactory threadFactory,
                                       RejectedExecutionHandler handler)`。

corePoolSize:核心線程數,maximumPoolSize:線程池最大容許線程數,workQueue:任務隊列,threadFactory:線程建立工廠,handler: 任務拒絕策,keepAliveTime, unit:等待時長,它們的具體做用以下:
ThreadPool
提交一個task(Runnable)後(執行execute方法),檢查總線程數是否小於corePoolSize,小於等於則使用threadFactory直接建立一個線程執行任務,大於則再次檢查線程數量是否等於maximumPoolSize,等於則直接執行handler拒絕策略,小於則判斷workQueue是否已經滿了,沒滿則將任務加入等待線程執行,滿了則使用threadFactory建立新線程執行隊頭任務。
經過流程圖咱們知道每一個參數做用,這裏值得注意的是,若是咱們將某些參數特殊化,則能夠獲得特殊的線程池:

  1. corePoolSize=maximuPoolSize,咱們能夠建立一個線程池線程數量固定的任務。
  2. maximumPoolSize設置的足夠大(Integer.MAX_VALUE),能夠無限制的加入任務。
  3. workQueue設置的足夠大,線程池中的數量不會超過corePoolSize,此時maximumPoolSize參數無用。
  4. corePoolSize=0,線程池一旦空閒(超過期間),線程都將被回收。
  5. 咱們上面知道,若是多餘的空閒線程空閒時間超過keepAliveTime*unit,這些線程將被回收。咱們能夠經過方法allowCoreThreadTimeOut使這個參數對線程池中全部線程都有效果。
  6. workQueue通常有三種實現:
  • SynchronousQueue,這是一個空隊列,不會保存提交的task(添加操做必須等待另外的移除操做)。
  • ArrayBlockingQueue,數組實現的丟列,能夠指定隊列的長度。
  • LinkedBlockingQueue, 鏈表實現的隊列,因此理論上能夠無限大,也能夠指定鏈表長度。
  1. 而RejectedExecutionHandler通常由四種實現:
  • AbortPolicy, 直接拋出RejectedExecutionException,這是線程池中的默認實現
  • DiscardPolicy,什麼都不作
  • DiscardOldestPolicy,丟棄workQueue隊頭任務,加入新任務
  • CallerRunsPolicy,直接在調用者的線程執行任務

最後,咱們再分析下ThreadPoolExecutor核心方法execute

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        
        int c = ctl.get();   // ①
        if (workerCountOf(c) < corePoolSize) {   // ②
            if (addWorker(command, true))
                return;
            c = ctl.get();     // ③
        }
        if (isRunning(c) && workQueue.offer(command)) {    // ④
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))   // ⑤
                reject(command);
            else if (workerCountOf(recheck) == 0)    // ⑥
                addWorker(null, false);
        }
        else if (!addWorker(command, false))    // ⑦
            reject(command);
    }
  1. 獲取線程池中的線程數量
  2. 線程池中線程數量小於corePoolSize,直接調用addWorker添加新線程執行任務返回。
  3. 由於多線程的關係,上一步可能調用addWorker失敗(其它線程建立了,數以數量已經超過了),重啓獲取線程數量。
  4. 向workQueue添加添加任務,若是添加成功,double獲取線程數量,添加失敗,走到步驟⑦
  5. double檢查後發現線程池已經關閉或者數量超出,回滾已經添加的任務(remove(command))而且執行拒絕策略。
  6. double檢查經過,添加一個新線程。
  7. 再次添加線程,失敗則調用拒絕策略。

好了,到如今jdk中的線程池核心的實現,策略,分析咱們已經分析完成了。接下來我咱們就來看看關於線程池的另外的一些擴展,也就是圖上的剩下的接口和類:
ThreadPool

ScheduledExecutorService

ScheduledExecutorService繼承自ExecutorService,ExecutorService的分析上面咱們已經知道了,咱們來看看它擴展了哪些方法:
ThreadPool
這個接口做爲線程池的定義主要增長了能夠定時執行任務(執行一次)和按期執行任務(重複執行),咱們來一一簡述下每一個方法的做用。

  1. public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit);: 這個方法用於定時執行任務command,延遲的時間爲delay*unit,它返回一個ScheduledFuture對象用於獲取執行結果或者剩餘延時,調用Future.get()方法將阻塞當前線程最後返回null。

ThreadPool

  1. public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit);:同上,不一樣的是,調用Future.get()方法將返回執行的結果,而不是null。
  2. public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period,TimeUnit unit);: 重複執行任務command,第一次執行時間爲initialDelay延遲後,之後的執行時間將在initialDelay + period * n,unit表明時間單位,值得注意的是,若是某次執行出現異常,後面該任務就不會再執行。或者經過返回對象Future手動取消,後面也將再也不執行。
  3. public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,long initialDelay,long delay, TimeUnit unit);: 效果同上,不一樣點:若是command耗時爲 y,則上面的計算公式爲initialDelay + period * n + y,也就是說,它的定時時間會加上任務耗時,而上面的方法則是一個固定的頻率,不會算上任務執行時間!

這是它擴展的四個方法,其中須要注意的是scheduleAtFixedRate和scheduleWithFixedDelay的細微差異,最後,咱們來看下它的實現類:ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor類,實現了ScheduledExecutorService接口,上面均已經分析。
ThreadPool
它的構造器以下:
ThreadPool
看起來比它的父類構造器簡潔,主要由於它的任務隊列workQueue是默認的(DelayedWorkQueue),而且最大的線程數爲最大值。接着咱們看下DelayedWorkQueue實現:
ThreadPool
它內部使用數組維護了一個二叉樹,提升了任務查找時間,而之因此ScheduledThreadPoolExecutor可以實現延時的關鍵也在於DelayedWorkQueue的take()方法:

public RunnableScheduledFuture<?> take() throws InterruptedException {
            final ReentrantLock lock = this.lock;
            lock.lockInterruptibly();
            try {
                for (;;) {    // ①
                    RunnableScheduledFuture<?> first = queue[0];
                    if (first == null)
                        available.await();
                    else {
                        long delay = first.getDelay(NANOSECONDS);
                        if (delay <= 0)
                            return finishPoll(first);
                        first = null; // don't retain ref while waiting
                        if (leader != null)
                            available.await();
                        else {
                            Thread thisThread = Thread.currentThread();
                            leader = thisThread;
                            try {
                                available.awaitNanos(delay);
                            } finally {
                                if (leader == thisThread)
                                    leader = null;
                            }
                        }
                    }
                }
            } finally {
                if (leader == null && queue[0] != null)
                    available.signal();
                lock.unlock();
            }
        }
  1. 工做線程調用take方法獲取剩餘任務。
  2. 檢查這個任務是否已經到了執行時間。
  3. 未到執行時間,await等待。
  4. 本身喚醒,進入循環再次計算時間。

好了,到目前爲止jdk中關於線程池的6個核心類已經所有分析完畢了。接下來還有最後一個小問題,咱們手動建立線程池參數也太了,不論是ThreadPoolExecutor仍是ScheduledThreadPoolExecutor,這對於用戶來講彷佛並不太友好,固然,jdk已經想到了這個問題,因此,咱們最後再介紹一個建立這些線程池的工具類:Executors:

Executors

它的主要工具方法以下:
ThreadPool
比起手動建立,它幫咱們加了不少默認值,用起來固然就方便多了,好比說newFixedThreadPool
ThreadPool
建立一個線程數固定的線程池,其實就是核心線程數等於最大線程數,和咱們一開始分析的結果同樣。
值得注意的是:爲了咱們的程序安全可控性考慮,咱們應該儘可能考慮手動建立線程池,知曉每個參數的做用,下降不穩定性!

總結

本次,咱們首先從代碼出發,分析了線程池給咱們帶來的好處以及直接使用線程的弊端,接着引出了jdk中的已經實現了的線程池。而後重點分析了jdk中關於線程池的六個最重要的接口和類,而且從源碼角度講解了關鍵點實現,最後,處於方便考慮,咱們還知道jdk給咱們留了一個建立線程池的工具類,簡化了手動建立線程池的步驟。
真正作到了知其然,知其因此然

關注我,這裏只有乾貨!

相關文章
相關標籤/搜索