當面試官問線程池時,你應該知道些什麼?

概述

什麼是線程池?

線程池是一種多線程處理形式,處理過程當中將任務添加到隊列,而後在建立線程後自動啓動這些任務。java

爲何要用線程池?

  • 下降資源消耗
    • 經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。
  • 提升響應速度
    • 當任務到達時,任務能夠不須要等到線程建立就能當即執行。
  • 提升線程的可管理性
    • 線程是稀缺資源,若是無限制的建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一的分配,調優和監控。可是要作到合理的利用線程池,必須對其原理了如指掌。

Executor 框架

簡介

semaphore

  • Executor:一個接口,其定義了一個接收 Runnable 對象的方法 executor,其方法簽名爲 executor(Runnable command),
  • ExecutorService:是一個比 Executor 使用更普遍的子類接口,其提供了生命週期管理的方法,以及可跟蹤一個或多個異步任務執行情況返回 Future 的方法。
  • AbstractExecutorService:ExecutorService 執行方法的默認實現。
  • ScheduledExecutorService:一個可定時調度任務的接口。
  • ScheduledThreadPoolExecutor:ScheduledExecutorService 的實現,一個可定時調度任務的線程池。
  • ThreadPoolExecutor:線程池,能夠經過調用 Executors 如下靜態工廠方法來建立線程池並返回一個 ExecutorService 對象。

ThreadPoolExecutor

java.uitl.concurrent.ThreadPoolExecutor 類是 Executor 框架中最核心的一個類。git

ThreadPoolExecutor 有四個構造方法,前三個都是基於第四個實現。第四個構造方法定義以下:github

public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {

參數說明

  • corePoolSize:線程池的基本線程數。這個參數跟後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了 prestartAllCoreThreads()或者 prestartCoreThread()方法,從這 2 個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立 corePoolSize 個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲 0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到 corePoolSize 後,就會把到達的任務放到緩存隊列當中。
  • maximumPoolSize:線程池容許建立的最大線程數。若是隊列滿了,而且已建立的線程數小於最大線程數,則線程池會再建立新的線程執行任務。值得注意的是若是使用了無界的任務隊列這個參數就沒什麼效果。
  • keepAliveTime:線程活動保持時間。線程池的工做線程空閒後,保持存活的時間。因此若是任務不少,而且每一個任務執行的時間比較短,能夠調大這個時間,提升線程的利用率。
  • unit:參數 keepAliveTime 的時間單位,有 7 種取值。可選的單位有天(DAYS),小時(HOURS),分鐘(MINUTES),毫秒(MILLISECONDS),微秒(MICROSECONDS, 千分之一毫秒)和毫微秒(NANOSECONDS, 千分之一微秒)。
  • workQueue:任務隊列。用於保存等待執行的任務的阻塞隊列。 能夠選擇如下幾個阻塞隊列。
    • ArrayBlockingQueue:是一個基於數組結構的有界阻塞隊列,此隊列按 FIFO(先進先出)原則對元素進行排序。
    • LinkedBlockingQueue:一個基於鏈表結構的阻塞隊列,此隊列按 FIFO (先進先出) 排序元素,吞吐量一般要高於 ArrayBlockingQueue。靜態工廠方法 Executors.newFixedThreadPool()使用了這個隊列。
    • SynchronousQueue:一個不存儲元素的阻塞隊列。每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於 LinkedBlockingQueue,靜態工廠方法 Executors.newCachedThreadPool 使用了這個隊列。
    • PriorityBlockingQueue:一個具備優先級的無限阻塞隊列。
  • threadFactory:建立線程的工廠。能夠經過線程工廠給每一個建立出來的線程設置更有意義的名字。
  • handler:飽和策略。當隊列和線程池都滿了,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略默認狀況下是 AbortPolicy,表示沒法處理新任務時拋出異常。如下是 JDK1.5 提供的四種策略。
    • AbortPolicy:直接拋出異常。
    • CallerRunsPolicy:只用調用者所在線程來運行任務。
    • DiscardOldestPolicy:丟棄隊列裏最近的一個任務,並執行當前任務。
    • DiscardPolicy:不處理,丟棄掉。
    • 固然也能夠根據應用場景須要來實現 RejectedExecutionHandler 接口自定義策略。如記錄日誌或持久化不能處理的任務。

重要方法

在 ThreadPoolExecutor 類中有幾個很是重要的方法:數組

  • execute() 方法其實是 Executor 中聲明的方法,在 ThreadPoolExecutor 進行了具體的實現,這個方法是 ThreadPoolExecutor 的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。
  • submit() 方法是在 ExecutorService 中聲明的方法,在 AbstractExecutorService 就已經有了具體的實現,在 ThreadPoolExecutor 中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和 execute()方法不一樣,它可以返回任務執行的結果,去看 submit()方法的實現,會發現它實際上仍是調用的 execute()方法,只不過它利用了 Future 來獲取任務執行結果(Future 相關內容將在下一篇講述)。
  • shutdown() 和 shutdownNow() 是用來關閉線程池的。

向線程池提交任務

咱們可使用 execute 提交任務,可是 execute 方法沒有返回值,因此沒法判斷任務是否被線程池執行成功。緩存

經過如下代碼可知 execute 方法輸入的任務是一個 Runnable 實例。多線程

threadsPool.execute(new Runnable() {
            @Override
            public void run() {
                // TODO Auto-generated method stub
            }
        });

咱們也可使用 submit 方法來提交任務,它會返回一個 Future ,那麼咱們能夠經過這個 Future 來判斷任務是否執行成功。併發

經過 Future 的 get 方法來獲取返回值,get 方法會阻塞住直到任務完成。而使用 get(long timeout, TimeUnit unit) 方法則會阻塞一段時間後當即返回,這時有可能任務沒有執行完。框架

Future<Object> future = executor.submit(harReturnValuetask);
try {
     Object s = future.get();
} catch (InterruptedException e) {
    // 處理中斷異常
} catch (ExecutionException e) {
    // 處理沒法執行任務異常
} finally {
    // 關閉線程池
    executor.shutdown();
}

線程池的關閉

咱們能夠經過調用線程池的 shutdown 或 shutdownNow 方法來關閉線程池,它們的原理是遍歷線程池中的工做線程,而後逐個調用線程的 interrupt 方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。可是它們存在必定的區別,shutdownNow 首先將線程池的狀態設置成 STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表,而 shutdown 只是將線程池的狀態設置成 SHUTDOWN 狀態,而後中斷全部沒有正在執行任務的線程。異步

只要調用了這兩個關閉方法的其中一個,isShutdown 方法就會返回 true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用 isTerminaed 方法會返回 true。至於咱們應該調用哪種方法來關閉線程池,應該由提交到線程池的任務特性決定,一般調用 shutdown 來關閉線程池,若是任務不必定要執行完,則能夠調用 shutdownNow。分佈式

Executors

JDK 中提供了幾種具備表明性的線程池,這些線程池是基於 ThreadPoolExecutor 的定製化實現。

在實際使用線程池的場景中,咱們每每不是直接使用 ThreadPoolExecutor ,而是使用 JDK 中提供的具備表明性的線程池實例。

newCachedThreadPool

建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。

這種類型的線程池特色是:

  • 工做線程的建立數量幾乎沒有限制(其實也有限制的,數目爲 Interger.MAX_VALUE), 這樣可靈活的往線程池中添加線程。
  • 若是長時間沒有往線程池中提交任務,即若是工做線程空閒了指定的時間(默認爲 1 分鐘),則該工做線程將自動終止。終止後,若是你又提交了新的任務,則線程池從新建立一個工做線程。
  • 在使用 CachedThreadPool 時,必定要注意控制任務的數量,不然,因爲大量線程同時運行,頗有會形成系統癱瘓。

示例:

public class CachedThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            try {
                Thread.sleep(index * 1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            executorService.execute(() -> System.out.println(Thread.currentThread().getName() + " 執行,i = " + index));
        }
    }
}

newFixedThreadPool

建立一個指定工做線程數量的線程池。每當提交一個任務就建立一個工做線程,若是工做線程數量達到線程池初始的最大數,則將提交的任務存入到池隊列中。

FixedThreadPool 是一個典型且優秀的線程池,它具備線程池提升程序效率和節省建立線程時所耗的開銷的優勢。可是,在線程池空閒時,即線程池中沒有可運行任務時,它不會釋放工做線程,還會佔用必定的系統資源。

示例:

public class FixedThreadPoolDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(3);
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 執行,i = " + index);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

newSingleThreadExecutor

建立一個單線程化的 Executor,即只建立惟一的工做者線程來執行任務,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。若是這個線程異常結束,會有另外一個取代它,保證順序執行。單工做線程最大的特色是可保證順序地執行各個任務,而且在任意給定的時間不會有多個線程是活動的。

示例:

public class SingleThreadExecutorDemo {
    public static void main(String[] args) {
        ExecutorService executorService = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int index = i;
            executorService.execute(() -> {
                try {
                    System.out.println(Thread.currentThread().getName() + " 執行,i = " + index);
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}

newScheduleThreadPool

建立一個線程池,能夠安排任務在給定延遲後運行,或按期執行。

public class ScheduledThreadPoolDemo {

    private static void delay() {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.schedule(() -> System.out.println(Thread.currentThread().getName() + " 延遲 3 秒"), 3,
                TimeUnit.SECONDS);
    }

    private static void cycle() {
        ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);
        scheduledThreadPool.scheduleAtFixedRate(
                () -> System.out.println(Thread.currentThread().getName() + " 延遲 1 秒,每 3 秒執行一次"), 1, 3,
                TimeUnit.SECONDS);
    }

    public static void main(String[] args) {
        delay();
        cycle();
    }
}

源碼

線程池的具體實現原理,大體從如下幾個方面講解:

  1. 線程池狀態
  2. 任務的執行
  3. 線程池中的線程初始化
  4. 任務緩存隊列及排隊策略
  5. 任務拒絕策略
  6. 線程池的關閉
  7. 線程池容量的動態調整

線程池狀態

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }

runState 表示當前線程池的狀態,它是一個 volatile 變量用來保證線程之間的可見性;

下面的幾個 static final 變量表示 runState 可能的幾個取值。

當建立線程池後,初始時,線程池處於 RUNNING 狀態;

RUNNING -> SHUTDOWN

若是調用了 shutdown()方法,則線程池處於 SHUTDOWN 狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢。

(RUNNING or SHUTDOWN) -> STOP

若是調用了 shutdownNow()方法,則線程池處於 STOP 狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務。

SHUTDOWN -> TIDYING

當線程池和隊列都爲空時,則線程池處於 TIDYING 狀態。

STOP -> TIDYING

當線程池爲空時,則線程池處於 TIDYING 狀態。

TIDYING -> TERMINATED

當 terminated() 回調方法完成時,線程池處於 TERMINATED 狀態。

任務的執行

任務執行的核心方法是 execute() 方法。執行步驟以下:

  1. 若是少於 corePoolSize 個線程正在運行,嘗試使用給定命令做爲第一個任務啓動一個新線程。對 addWorker 的調用會自動檢查 runState 和 workerCount,從而防止在不該該的狀況下添加線程。
  2. 若是任務排隊成功,仍然須要仔細檢查是否應該添加一個線程(由於現有的線程自上次檢查以來已經死亡)或者自從進入方法後,線程池就關閉了。因此咱們從新檢查狀態,若是有必要的話,在線程池中止狀態時回滾隊列,若是沒有線程的話,就開始一個新的線程。
  3. 若是任務排隊失敗,那麼咱們嘗試添加一個新的線程。若是失敗了,說明線程池已經關閉了,或者已經飽和了,因此拒絕這個任務。
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();

    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

線程池中的線程初始化

默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。

在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:

prestartCoreThread():初始化一個核心線程; prestartAllCoreThreads():初始化全部核心線程

public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意傳進去的參數是null
}

public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
        ++n;
    return n;
}

任務緩存隊列及排隊策略

在前面咱們屢次提到了任務緩存隊列,即 workQueue,它用來存放等待執行的任務。

workQueue 的類型爲 BlockingQueue,一般能夠取下面三種類型:

  1. ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;
  2. LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲 Integer.MAX_VALUE;
  3. SynchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

任務拒絕策略

當線程池的任務緩存隊列已滿而且線程池中的線程數目達到 maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略

  • ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出 RejectedExecutionException 異常。
  • ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
  • ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
  • ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

線程池的關閉

ThreadPoolExecutor 提供了兩個方法,用於線程池的關閉,分別是 shutdown()和 shutdownNow(),其中:

  • shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
  • shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

線程池容量的動態調整

ThreadPoolExecutor 提供了動態調整線程池容量大小的方法:setCorePoolSize()和 setMaximumPoolSize(),

  • setCorePoolSize:設置核心池大小
  • setMaximumPoolSize:設置線程池最大能建立的線程數目大小

當上述參數從小變大時,ThreadPoolExecutor 進行線程賦值,還可能當即建立新的線程來執行任務。

免費Java資料須要本身領取,涵蓋了Java、Redis、MongoDB、MySQL、Zookeeper、Spring Cloud、Dubbo高併發分佈式等教程,一共30G。
傳送門:https://mp.weixin.qq.com/s/JzddfH-7yNudmkjT0IRL8Q

相關文章
相關標籤/搜索