ThreadPoolExecutor源碼閱讀

在阿里巴巴的Java開發手冊中看到了線程池比較推薦使用ThreadPoolExecutor,因而每次也都是照葫蘆畫瓢地使用,對於其中的參數(corePoolSize, maximumPoolSize,keepAliveTime , workQueue)等徹底靠着yy去使用。每次用的是時候都感受心慌慌的,總算是找了個時間來真正地去閱讀其源碼。java

四個主要參數

在使用ThreadPoolExecutor的時候,咱們一般會使用它的以下構造函數,(此處未考慮拒絕策略)安全

ThreadPoolExecutor(int corePoolSize,
                   int maximumPoolSize,
                   long keepAliveTime,
                   TimeUnit unit,
                   BlockingQueue<Runnable> workQueue)
複製代碼

在這裏主要有四個參數:核心線程池大小、最大線程池大小、存活時間、工做隊列。其實看到這四個參數我是很懵的,好比,核心線程池與最大線程池之間的區別、工做隊列又是用來作什麼的,存活時間指的是誰的存活時間。在講解源碼以前不妨猜猜。併發

流程總覽

image-20190520193752856

這個流程粗看沒太大問題,可是有一塊一方卻異常突兀、反常識,就是workQueue和maximum的順序,在個人想象中應該是先maximum再workQueue。可是事實上的確是先workQueue,再maximum。能夠嘗試運行下面這段demo,函數

public class ThreadPoolExecutorMain {
    private static final ThreadPoolExecutor pool = new ThreadPoolExecutor(1, 3, 30, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10));

    public static void main(String[] args) {
        for (int i = 1; i <= 20; i++) {
            final int tmp = i;
            pool.execute(() -> {
                try {
                    Thread.sleep(5000);
                    System.out.println(tmp);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
        }
    }
}
複製代碼

在這段demo中,不發生意外的時候,執行順序爲(1,12,13),(2,3,4),(5,6,7),(8,9,10),11,每組內部順序能夠混亂。(注意:在真正使用的時候,咱們須要將ThreadPoolExecutor看成無序的使用)this

源碼解析

execute()

首先直接看execute()方法的源碼spa

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 1. 判斷core是否塞得下
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    // 2. 判斷workQueue是否塞得下
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    // 3. addWorker中判斷max是否塞得下
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

在這裏ctl是一個設計很是精巧的狀態管理器,它實際上是一個AtomicInteger,它利用int的前三位來存儲當前線程池的狀態(RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED),後29位用來存儲線程數量。線程

在這段代碼中,咱們能夠看到對線程的執行策略分爲了三個部分:1. core部分 2. workQueue部分 3. max部分。其中workQueue部分比較直觀,就是直接調用workQueue.offer(command)將線程加入了待執行隊列。那麼接下來須要關注的是addWorker()方法。設計

addWorker()

private boolean addWorker(Runnable firstTask, boolean core) {
    // 判斷firstTask可否被執行
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&  // SHUTDOWN狀態不會執行新線程,可是能夠執行workQueue中的線程
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||  // 最多支持2^29-1個線程
                wc >= (core ? corePoolSize : maximumPoolSize))  // 此處判斷max是否塞得下
                return false;
            if (compareAndIncrementWorkerCount(c)) // 利用CAS防止併發問題
                break retry;
            c = ctl.get();
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // 執行新的線程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {    // 嘗試添加線程
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) { // SHUTDOWN狀態不會執行新線程,可是能夠執行workQueue中的線程
                    if (t.isAlive())
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();         // 執行線程
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
複製代碼

addWorker()這段代碼看起來比較複雜,可是若是去除掉一些細節和併發安全相關的代碼,總體的代碼邏輯就是判斷線程是否能夠執行,若是能夠執行則新建線程執行。在這段代碼中,咱們能夠看到咱們的線程被封裝到了一個叫作Worker的類中,接下來,咱們繼續探究Worker的源碼。code

Worker

在上面的代碼中咱們能夠看到Worker的執行是經過worker.thread.start()來執行的,先看一下構造函數。cdn

Worker(Runnable firstTask) {
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    this.thread = getThreadFactory().newThread(this);
}
複製代碼

這裏面Worker又做爲了Runnable參數傳給了Worker.thread。那接下來看run()方法

public void run() {
    runWorker(this);
}

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock();
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {   // 獲取Task
            w.lock();
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);  // 空方法,擴展使用
                Throwable thrown = null;
                try {
                    task.run();     // 執行Task
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);   // 空方法,擴展使用
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

這段run()方法能夠看到ThreadPoolExecutor是經過不停地getTask()來複用線程的,可是到這裏,其實我還有一個疑問,就是ThreadPoolExecutor如何保持線程一直處於存活狀態的。那這個問題一樣經過源碼來繼續解讀。

getTask()

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 判斷是否超時消亡
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            // 根據超時設置選擇不一樣的策略獲取Task
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製代碼

在這段代碼中咱們能夠看到此處利用workQueue是阻塞隊列的特性來保持core線程一直處於存活狀態(workQueue.take),max線程超時消亡(workQueue.poll)。固然在這段代碼中,咱們發現也能夠經過設置ThreadPoolExecutor的allowCoreThreadTimeOut來使得core線程超時消亡。至於workQueue的內部實現(take和poll)此處就不繼續深究下去了。

總結

至此,咱們已經知道了ThreadPoolExecutor的總體執行流程以及經常使用參數的意義,一樣也清楚了流程總覽中的demo代碼的執行結果爲什麼具備順序性。至於workQueue內部的實現就留到下一次,初步看了一下,感受其內部也有不少很是有意思的東西。

相關文章
相關標籤/搜索