深刻淺出Java線程池ThreadPoolExecutor

簡書江溢Jonny,轉載請註明原創出處,謝謝!java

本文基於JDK1.7的源碼進行分析並解讀。咱們將以一個簡單的案例開始本話題的討論,而且在文章的結尾,筆者將會給出一些經驗之談,避免後來者踩坑。編程

若是你喜歡我,能夠關注個人公衆號~更多幹貨~ 緩存

ThreadPoolExecutor是JUC提供的一類線程池工具,也是Java語言中應用場景最多的併發框架,能夠說,幾乎全部須要異步或者併發執行的,均可以使用Java線程池。那麼首先,咱們一塊兒來比較一下「單純使用線程的方案」和「使用ThreadPoolExecutor線程池的方案」,在解決問題上有什麼區別吧。安全

案例:抄寫員

在中世紀,有一種叫作抄寫員(Scribe)的工做,他們的職責就好像是複印機,抄寫一本又一本書。假如這個時候有一個抄寫員工做室,只有2個抄寫員,他們要抄寫10本書。bash

咱們在本例中分別「本身寫線程管理」和「由ThreadPoolExecutor作線程管理」服務器

public static class Book {
 
    private static AtomicInteger id = new AtomicInteger(0); // 書名生成器
    private String bookName; // 書名
     
    public void copy() { // 抄寫書籍
        System.out.println("start copy " + bookName);
        try {
            Thread.sleep(100L); // sleep 100ms
        } catch (Exception e) {
            // ignore
        }
        System.out.println("end copy " + bookName);
    }
 
    public Book() {
        bookName = "book-" + String.valueOf(id.incrementAndGet()); // 書名自動生成
    }
}
複製代碼

本身實現線程管理

// 提早準備好十本書
final BlockingQueue<Book> books = new LinkedBlockingDeque<Book>(10);
for (int i = 0; i < 10; i++) {
    try {
        books.put(new Book());
    } catch (Exception e) {
        // ignore
    }
}
 
 
System.out.println("start work...");
// 建立兩個書籍抄寫員線程
Thread[] scribes = new Thread[2];
for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) {
    scribes[scribeIndex] = new Thread(new Runnable() {
        public void run() {
            for (; ; ) {
                if (Thread.currentThread().isInterrupted()) {
                    System.out.println("time arrives, stop writing...");
                }
                try {
                    Book currentBook = books.poll(5, TimeUnit.SECONDS);
                    currentBook.copy();
                } catch (Exception e) {
                    System.out.println("time arrives, stop writing...");
                    return;
                }
            }
        }
    });
    scribes[scribeIndex].setDaemon(false); // 設置爲非守護線程
    scribes[scribeIndex].start();
}
 
// 工做已經安排下去了,安心等待就行了
try {
    Thread.sleep(10000l);
} catch (Exception e) {
    // ignore
}
 
// 時間到了,提醒兩個抄寫員中止抄寫
for (int scribeIndex = 0; scribeIndex < 2; scribeIndex++) {
    scribes[scribeIndex].interrupt();
}
 
System.out.println("end work...");
複製代碼

寫了一大堆代碼來完成上述的功能,讓咱們一塊兒來看看用了ThreadPoolExecutor是怎麼完成的。併發

System.out.println("start work...");
ExecutorService executorService = Executors.newFixedThreadPool(2);
for (int i = 0; i < 10; i ++) {
    executorService.submit(new Runnable() {
        public void run() {
            new Book().copy();
        }
    });
}
 
// 工做已經安排下去了,安心等待就行了
try {
    Thread.sleep(10000l);
} catch (Exception e) {
    // ignore
}
 
executorService.shutdownNow();
System.out.println("end work...");
複製代碼

整個流程很是清晰,分別是:任務編寫線程建立線程啓動終止線程框架

可是不少時候,問題並不只限於上述的內容。異步

開發者困境

最先的併發編程的開發者不少事情都須要親力親爲,而經過使用Java線程池,能夠完成如下工做:函數

1)線程管理,線程的建立、啓動、銷燬等工做;

2)線程複用,線程的建立是會給服務器帶來必定開銷的,如何減小頻繁重複建立線程的開銷;

3)彈性伸縮,服務器一般有高峯期也有低峯期,線程池是否能夠彈性伸縮,好比線程建立成功後長時間不使用是否能夠回收,以減小系統資源的浪費,或者線程池的容量是否能夠隨時增加;

4)拒絕策略,線程數量有限而須要處理的任務不少,超出系統承載範圍的任務是拒絕仍是阻塞等待;

5)異常處理,線程在執行過程當中可能遇到異常或者錯誤,開發者如何正確應對這些異常或者錯誤;

6)任務分配,任務的分配是基於先入先出仍是基於某種優先級的策略。

等等如是,不一而足,這個時候咱們就要介紹Doug Lea大神開發的ThreadPoolExecutor線程池框架,看看大神是怎麼解決上面這些問題的。

ThreadPoolExecutor源碼簡析

首先,在解讀源碼以前,要引入ThreadPoolExecutor的一些重要概念

生命週期

在ThreadPoolExecutor線程池的設計中,把整個任務執行框架線程池劃分爲5個生命週期:

RUNNING:容許接收新任務而且處理隊列中的任務

SHUTDOWN:再也不接收新的任務,僅消化完隊列中的任務

STOP:不只再也不接收新的任務,連隊列中的任務都再也不消化處理了,而且嘗試中斷正在執行任務的線程

TIDYING:全部任務被終止了,工做線程數workCount也被設爲0,線程的狀態也被設爲TIDYING,並開始調用鉤子函數terminated()

TERMINATED:鉤子函數terminated()執行完畢

各個生命週期的轉化圖以下:

線程池的生命週期

從圖中能夠看到,整個生命週期的變化是不可逆的。

狀態字

ThreadPoolExecutor把線程池狀態和線程池容量打包成一個int型變量,以下圖所示

狀態字

線程池狀態位

狀態 高位值枚舉 正負性
RUNNING 111 負數(-536870912)
SHUTDOWN 000 0
STOP 001 正數(536870912)
TIDYING 010 正數(1073741824)
TERMINATED 011 正數(1610612736)

所以在狀態值的排布上能夠知道 TERMINATED > TIDYING > STOP >SHUTDOWN > RUNNING

ThreadPoolExecutor中的代碼以下所示:

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 
// 狀態字的高比特位存放線程池狀態信息
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;
 
// 打包/提取狀態字信息
private static int runStateOf(int c) { return c & ~CAPACITY; }
private static int workerCountOf(int c) { return c & CAPACITY; }
  
// 判斷當前線程池是否正在執行
private static boolean isRunning(int c) {
    return c < SHUTDOWN;
}
複製代碼

線程池主要執行流程

線程池主要執行流程

代碼介紹

首先,咱們建立一個線程池。

1.線程池建立
ExecutorService executorService = Executors.newFixedThreadPool(2);
複製代碼

這裏使用了Executors提供的工廠方法,能夠建立如下四種類型線程池:

newFixedThreadPool。該方法將用於建立一個固定大小的線程池(此時corePoolSize = maxPoolSize),每提交一個任務就建立一個線程池,直到線程池達到最大數量,線程池的規模在此後不會發生任何變化;

newCachedThreadPool。該方法建立了一個可緩存的線程池,(此時corePoolSize = 0,maxPoolSize = Integer.MAX_VALUE),空閒線程超過60秒就會被自動回收,該線程池存在的風險是,若是服務器應用達到請求高峯期時,會不斷建立新的線程,直到內存耗盡;

newSingleThreadExecutor。該方法建立了一個單線程的線程池,該線程池按照任務在隊列中的順序串行執行(如:FIFOLIFO、優先級);

newScheduledThreadPool。該方法建立了一個固定長度的線程池,能夠以延遲或者定時的方式執行任務;

二、任務提交

任務提交的大概邏輯以下:

1)當線程池小於corePoolSize時,新提交任務將建立一個新線程執行任務,即便此時線程池中存在空閒線程;

2)當線程池達到corePoolSize時,新提交任務將被放入workQueue中,等待線程池中任務調度執行;

3)當workQueue已滿,且maximumPoolSize > corePoolSize時,新提交任務會建立新線程執行任務;

4)當提交任務數超過maximumPoolSize時,新提交任務由RejectedExecutionHandler處理;

5)當線程池中超過corePoolSize線程,空閒時間達到keepAliveTime時,關閉空閒線程;

那麼接下來咱們看看源代碼是怎麼實現上面的描述的

線程池建立成功之後,咱們提交任務到線程池中:

executorService.submit(new Runnable() {
    public void run() {
        new Book().copy();
    }
});
複製代碼

submit到線程池之後:

public <T> Future<T> submit(Runnable task, T result) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task, result);// 包裝出一個新的任務
    execute(ftask); // 線程池的入口
    return ftask;
}
複製代碼

能夠看到ThreadPoolExecutor的入口方法就是execute(Runnable commad)。該方法的執行邏輯以下:

int c = ctl.get();
// 1. 若是當前線程池中線程總數少於核心線程數,則添加新線程到線程池中,
// 而且由新線程執行剛提交進來的任務
if (workerCountOf(c) < corePoolSize) {
    if (addWorker(command, true))
        return;
    c = ctl.get();
}
  
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    // 2. 可能剛纔在建立新線程成功的同時,線程池被關閉了,所以須要double-check,
    // 若是此時線程池已經被關閉了,那麼回滾剛纔被添加進來的任務
    if (! isRunning(recheck) && remove(command))
        reject(command);
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
}
// 3. 若是此時核心線程數(corePoolSize)已經滿了,而且任務隊列也滿了,
// 嘗試增長線程到maximumPoolSize大小,若是仍然失敗,執行拒絕策略
else if (!addWorker(command, false))
    reject(command);
複製代碼

在上面的代碼裏面,ctl.get()方法、workerCountOf()、以及isRunning() 方法都是對以前提到的狀態字進行讀寫的操做,這部分咱們就再也不展開給讀者看了,有興趣的讀者能夠本身瞭解一下。

接下來,咱們看看addWorker都作了什麼工做:

private boolean addWorker(Runnable firstTask, boolean core) {
    // 這部分省略的代碼都是對狀態字進行修改,添加並建立線程以前,
    // 須要遞增work記數(此時須要線程安全地操做)
    ...
 
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        ...
        w = new Worker(firstTask); // 此處封裝出了一個新的Work,這個類咱們稍後會介紹
        final Thread t = w.thread;
        if (t != null) {
            ...
             
                // 得到線程池狀態,若是線程池已經被關閉了,則再也不建立新的線程
                int c = ctl.get();
                int rs = runStateOf(c);
 
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    ...
                    workerAdded = true;
            ...
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 若是任務啓動或者提交到線程池失敗,
            // 則執行回滾操做(從工做線程池中移除失敗添加的worker、減小狀態字中的任務計數)
            addWorkerFailed(w);
    }
    return workerStarted;
}

複製代碼
三、任務執行

任務執行在Worker類中,而Worker類是一個繼承了Runnable接口的類。

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    ...
    public void run() {
        runWorker(this);
    }
    ...
}
複製代碼

能夠看到Worker類中調用了外部的runWorker()方法。所以能夠了解到,任務執行的主要邏輯,就是在外部的runWorker()方法中執行的

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    ...
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) { // 循環讀取任務
            ...
            try {
                beforeExecute(wt, task); // 用戶實現的回調方法,任務啓動前
                Throwable thrown = null;
                try {
                    task.run();// 任務執行
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown); // 用戶實現的回調方法,任務執行後
                }
            } finally {
                task = null;
                w.completedTasks++;
                ...
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

複製代碼

beforeExecute和afterExecute是兩個鉤子方法,在裏面指定了當線程開始執行和完成執行之後執行的動做,須要開發者實現。

另外須要注意的還有runWorker方法內調用的getTask()方法,在該方法內部,若是發生如下狀況將會返回null,終止工做線程的執行循環: 1)當前線程數即將超過maxPoolSize 2)線程池被關閉 3)當前線程數大於corePoolSize且小於maxPoolSize,並關切從BlockingQueue取數據超過了超時時間(默認60秒)

代碼實現以下:

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?
 
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
 
        // 校驗當前線程池狀態
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }
 
        boolean timed;      // Are workers subject to culling?
 
        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;
 
            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
 
        try {
            // 若是線程超過指定時間內(默認60秒)沒有獲取到任務,說明有線程即將過時
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

複製代碼
四、任務拒絕

若是線程被提交到線程池時,當前線程池出現如下狀況的任一一種狀況: 1)線程池任務隊列已經滿了 2)線程池被關閉了(調用了shutdown函數或者shutdownNow函數) 都將會調用提早設置好的回調策略,ThreadPoolExecutor中總共提供了四種策略:

1)AbortPolicy(停止):該策略將會直接拋出RejectedExecutionException異常,調用者將會得到異常;

2)DiscardPolicy(拋棄):使用該策略,線程池將會悄悄地丟棄這個任務而不被調用者知道;

3)CallerRunsPolicy(調用者運行):該策略既不會拋棄任務也不會拋出異常,而是將這個任務退回給調用者,從而下降新任務的流量;

4)DiscardOldestPolicy(拋棄最舊的):該策略將會拋棄下一個即將輪到執行的任務,那麼「拋棄最舊」的將致使拋棄優先級最高的任務,所以最好不要把「拋棄最舊的」飽和策略和優先級隊列放在一塊兒使用; 這裏,代碼實現咱們將只展現**CallerRunsPolicy(調用者運行)**策略:

public static class CallerRunsPolicy implements RejectedExecutionHandler {
    /** * Creates a {@code CallerRunsPolicy}. */
    public CallerRunsPolicy() { }
     
    // 策略實現
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        if (!e.isShutdown()) {
            r.run();
        }
    }
}

複製代碼

固然開發者也能夠選擇,根據業務需求,定義本身的飽和策略。

五、線程池銷燬

ThreadPoolExecutor提供了兩種方法銷燬線程池,分別是shutdown()shutdownNow()

shutdown()方法僅僅是把線程池的狀態置爲SHUTDOWN,而且拒絕以後嘗試提交進來的全部請求,可是已經在任務隊列裏的任務會仍然會正常消費。

shutdownNow()方法的表現顯得更加簡單粗暴,它會強行關閉ExecutorService,也會嘗試取消正在執行的任務,而且返回全部已經提交但還沒有開始的任務,開發者能夠將這些任務寫入日誌保存起來以便以後進行處理,另外嘗試取消正在執行的任務僅僅是嘗試對執行線程進行中斷,具體的線程響應中斷策略須要用戶本身編寫。代碼實現以下:

public List<Runnable> shutdownNow() {
    List<Runnable> tasks;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        checkShutdownAccess();
        advanceRunState(STOP);
        interruptWorkers();
        tasks = drainQueue();
    } finally {
        mainLock.unlock();
    }
    tryTerminate();
    return tasks;
}
複製代碼

當心踩坑:線程池經驗談

不要使用ThreadLocal

不要在ThreadPoolExecutor線程池中使用ThreadLocal,由於在ThreadPoolExecutor中,線程是複用的,所以在這裏使用ThreadLocal會被多個task共享,所以可能會帶來髒數據污染。須要當心使用

合理設置corePoolSize的值

以一段代碼爲例:

// 10個線程,由於任務多,這裏用LinkedBlockingQueue
private static final LinkedBlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();
private static final ExecutorService service = new ThreadPoolExecutor(0, 10,
                                                60L, TimeUnit.SECONDS, queue
);
複製代碼

代碼中的corePoolSize=0,也就是核心線程數是1,若是任務數多於10個,那麼會先建立maximumPoolSize個線程執行,其他的任務加入 queue 中等待執行。

而在ThreadPoolExecutor的實現中,當workQueue已滿,且maximumPoolSize>corePoolSize時,新提交任務會建立新線程執行任務。

所以,queue 是不會滿的,那麼永遠不會有maximumPoolSize個線程被建立,也就是說咱們的任務一直仍是一個線程在跑,沒法達到能夠同時使用多個線程的預期。

線程中斷

雖然ThreadPoolExecutor提供了shutdownNow()方法,在調用該方法後會嘗試中斷全部線程,可是該中斷並不能保證線程必定會就此終止,所以,須要開發者實現線程中斷的策略。關於這部分的內容,在Doug Lea的《Java Concurrency In Practice》的7.1.2節已經進行了完整的討論,筆者在這裏就再也不贅述了。

finalize函數

尤爲須要注意的是,ThreadPoolExecutor有一個finalize函數,具體實現以下:

protected void finalize() {
    shutdown();
}
複製代碼

在該方法中調用了shutdown()函數,所以,若是你並非真正但願中止線程池的執行,那麼就不要讓線程池離開你代碼的做用範圍。

我是江溢Jonny。

相關文章
相關標籤/搜索