線程池源碼分析-ThreadPoolExecutor

#1 系列目錄安全

該系列打算從一個最簡單的Executor執行器開始一步一步擴展到ThreadPoolExecutor,但願能粗略的描述出線程池的各個實現細節。針對JDK1.7中的線程池多線程

#2 ThreadPoolExecutoroop

從上一篇文章中瞭解到:核心execute(futureTask)方法須要被子類來實現,因此咱們就倆重點看看ThreadPoolExecutor是如何實現這個核心方法的源碼分析

##2.1 ThreadPoolExecutor的參數ui

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
  • int corePoolSize:核心線程數this

  • int maximumPoolSize:最大線程數atom

  • BlockingQueue workQueue:任務隊列.net

  • long keepAliveTime:和TimeUnit unit一塊兒構成線程的最大空閒時間,一旦超過該時間尚未任務處理,該線程就走向結束了。它是針對當前線程數已經超過corePoolSize核心線程數了或者核心線程數也開啓超時策略,即屬性allowCoreThreadTimeOut=true線程

  • ThreadFactory threadFactory:線程工廠設計

  • RejectedExecutionHandler handler:拒絕策略,當任務太多來不及處理,可拒絕該任務

先簡單描述下ThreadPoolExecutor的execute(futureTask)過程的大概狀況:

1 若是當前線程數小於corePoolSize,則直接建立出一個線程,用於執行新加進來的任務

2 若是當前線程數已經超過corePoolSize,則將該任務放到BlockingQueue workQueue任務隊列中,該任務隊列能夠是有限容量也能夠是無限容量的。每一個線程處理完一個任務後,都會不斷的從BlockingQueue workQueue任務隊列中取出任務並執行

3 若是BlockingQueue workQueue是有限容量的,已滿沒法放進新的任務了,若是此時的線程數小於maximumPoolSize,則直接建立一個線程執行該任務

4 若是線程數已達到maximumPoolSize不能再建立線程了,則直接使用RejectedExecutionHandler handler拒絕該任務

##2.2 execute(futureTask)代碼分析

詳細代碼以下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}

咱們來詳細看看每個步驟:

###2.2.1 ctl解析

ctl是ThreadPoolExecutor的一個重要屬性,它記錄着ThreadPoolExecutor的線程數量和線程狀態。

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

Integer有32位,其中前三位用於記錄線程狀態,後29位用於記錄線程的數量。那線程狀態有哪些呢?

//線程數量佔用的位數
private static final int COUNT_BITS = Integer.SIZE - 3;

//最大線程數
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl

//狀態值就是隻關心前三位的值,因此把後29位清0
private static int runStateOf(int c)     { return c & ~CAPACITY; }

//線程數量就是隻關心後29位的值,因此把前3位清0
private static int workerCountOf(int c)  { return c & CAPACITY; }

//兩個數相或
private static int ctlOf(int rs, int wc) { return rs | wc; }

狀態有 RUNNING、SHUTDOWN、STOP、TIDYING、TERMINATED

狀態的詳細信息能夠看下源碼中的文檔說明,這裏再也不詳細說明

###2.2.2 addWorker解析

咱們從上面的execute代碼中能夠看到,當提交一個任務時,當前線程數小於corePoolSize核心線程數的時候,就新添加一個線程,即addWorker(command, true)

咱們來詳細看下上述過程:

private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            if (compareAndIncrementWorkerCount(c))
                break retry;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }
    }

    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        final ReentrantLock mainLock = this.mainLock;
        w = new Worker(firstTask);
        final Thread t = w.thread;
        if (t != null) {
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int c = ctl.get();
                int rs = runStateOf(c);

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}

addWorker有2種狀況,一種就是線程數量不足核心線程數,另外一種就是核心線程數已滿同時任務隊列已滿可是線程數不足最大線程數。上述boolean core就是用來區分上述2種狀況的。

addWorker首先就要對線程數量自增,即ctl的後29爲進行自增。這裏就涉及到多線程問題,爲了解決多線程問題就採用了for循環加CAS來解決,爲何沒有直接用AtomicInteger的incrementAndGet?

incrementAndGet也是內部for循環加CAS,它是要確保必定要自增成功的,而這裏咱們不必定要自增成功,還要判斷當前線程的數量合不合法。 即若是core=true,則當前線程數量不能超過incrementAndGet,若是core=false,則當前線程數量不能超過maximumPoolSize。

若是當前線程數自增成功,下面就須要建立出Worker線程,存放到workers中,以下所述

private final HashSet<Worker> workers = new HashSet<Worker>();

咱們知道HashSet的內部實現就是經過HashMap來實現的,HashMap是線程不安全的,因此在對workers操做的時候必需要要進行加鎖,這就用到了ThreadPoolExecutor的mainLock了

private final ReentrantLock mainLock = new ReentrantLock();

一旦worker新增成功就直接啓動該Worker內部的線程。一旦worker新增失敗則調用addWorkerFailed處理失敗邏輯,什麼狀況下會失敗呢?

  • 建立出Worker對象時,內部分配的線程Thread爲空,這個線程的創造是由線程工廠ThreadFactory負責的創造的,可能爲null

  • 在獲取mainLock以後,發現當前線程池已被標記成大於SHUTDOWN狀態、或者是SHUTDOWN可是firstTask不爲null。當線程狀態大於SHUTDOWN,固然addWorker要失敗。後者怎麼解釋呢?這裏就要詳細解釋下SHUTDOWN狀態,以下所示

    RUNNING:  Accept new tasks and process queued tasks
    SHUTDOWN: Don't accept new tasks, but process queued tasks
    STOP:     Don't accept new tasks, don't process queued tasks,
    	      and interrupt in-progress tasks
    TIDYING:  All tasks have terminated, workerCount is zero,
             the thread transitioning to state TIDYING
           	will run the terminated() hook method
    TERMINATED: terminated() has completed

SHUTDOWN即再也不接收新的task,可是能夠繼續處理隊列中的task。當線程數小於核心線程數的時候,提交的task做爲新建立的Worker的firstTask,即firstTask不爲null。當線程數大於核心線程數後,此時addWorker中建立的Worker的firstTask就是null,它只負責從隊列中取出任務。因此firstTask不爲null的時候就代表是新提交的任務,SHUTDOWN狀態下是不容許新提交任務的,因此這種狀況也要失敗

失敗以後呢?來詳細看失敗的處理邏輯,即在addWorkerFailed

/**
 * Rolls back the worker thread creation.
 * - removes worker from workers, if present
 * - decrements worker count
 * - rechecks for termination, in case the existence of this
 *   worker was holding up termination
 */
private void addWorkerFailed(Worker w) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (w != null)
            workers.remove(w);
        decrementWorkerCount();
        tryTerminate();
    } finally {
        mainLock.unlock();
    }
}
  • 首先仍是先獲取mainLock,而後才能對workers操做

  • 從workers中刪除該Worker,記錄的worker數量自減

  • tryTerminate這個有興趣的能夠詳細研究

###2.2.2 Worker解析

上面咱們看到了僅僅是新增一個Worker,並放到workers中,並啓動它,這裏來詳細看看Worker是如何工做的

Worker概述

  • Worker是ThreadPoolExecutor的內部類
  • Worker繼承了AQS即AbstractQueuedSynchronizer,用於實現鎖的機制,這裏先拋出一個問題:爲何要繼承AQS
  • 實現了Runnable接口,則該Worker就能夠做爲Thread的參數建立出Thread,線程的運行即運行該Worker的run方法,該方法中會不斷的從隊列中取出任務並執行

咱們來詳細看下Worker的run過程

final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        while (task != null || (task = getTask()) != null) {
            w.lock();
            // If pool is stopping, ensure thread is interrupted;
            // if not, ensure thread is not interrupted.  This
            // requires a recheck in second case to deal with
            // shutdownNow race while clearing interrupt
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        processWorkerExit(w, completedAbruptly);
    }
}

該方法就是一個while循環,若是有firstTask就運行firstTask的內容,若是沒有firstTask就從隊列中取出task運行即getTask邏輯。每個task的運行,都有相應的週期方法調用如

beforeExecute(wt, task);
task.run();
afterExecute(task, thrown);

Worker整個運行過程就是從隊列中取出task依次運行上述方法。這裏有3個重要地方要關注:

  • getTask邏輯,咱們從上面看到while循環中一旦getTask爲null就直接退出while循環了,即Worker走向結束了,因此空閒的時候會阻塞在getTask中,一直等到獲取到task或者超時。
  • 爲何每次執行task都要獲取鎖
  • worker的退出,就再也不詳細說明了,各位可自行研究

咱們一一來詳細說明。

getTask邏輯

private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        boolean timed;      // Are workers subject to culling?

        for (;;) {
            int wc = workerCountOf(c);
            timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if (wc <= maximumPoolSize && ! (timedOut && timed))
                break;
            if (compareAndDecrementWorkerCount(c))
                return null;
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
            // else CAS failed due to workerCount change; retry inner loop
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}

從workQueue隊列中獲取task有2種方法,一種就是阻塞式獲取直到有task任務,另外一種就是阻塞必定時間,超時則就直接返回null了。此時返回null意味着Worker就要走向結束了。

當allowCoreThreadTimeOut=true即核心線程也開始timeout計時,或者wc > corePoolSize即當前線程數超過了核心線程數也要開啓計時,獲取task就採用阻塞必定時間獲取,一旦超時即該Worker在keepAliveTime時間內都沒獲取到task即處於空閒狀態,這時候就返回null,即意味着該Worker就走向結束了

其餘狀況就是不用進行線程空閒計時,便可以一直阻塞直到有task來。

接下來一個重點問題就是每次執行task的時候爲何要先獲取鎖?

首先該Worker的run方法只可能被一個線程來運行,即該Worker的run方法不可能出現多線程同時運行的狀況。那就是Worker有一些資源是多個線程共享的,是什麼呢?咱們先來看看Worker繼承AQS即AbstractQueuedSynchronizer的實現狀況

protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
        setExclusiveOwnerThread(Thread.currentThread());
        return true;
    }
    return false;
}

protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
}

public void lock()        { acquire(1); }
public boolean tryLock()  { return tryAcquire(1); }
public void unlock()      { release(1); }
public boolean isLocked() { return isHeldExclusively(); }

這裏就是一個簡單的獨佔鎖,可是重點是不可重入的,重入即當前線程獲取鎖了,還能夠再次獲取鎖,來簡單對比下重入鎖的實現

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

也就是說Worker自己就是一個簡單的獨佔鎖,而且是不可重入的。這個鎖的引入究竟是爲了什麼呢?爲何須要在每一個task執行前都要獲取這個鎖呢?這個當時也沒太理解,可是要想找出這個緣由,能夠有以下思路來找:

就看看哪些地方在調用Worker獲取鎖的方法,獲取鎖的方法有lock、tryLock

最終發現interruptIdleWorkers會調用tryLock方法,以下:

private void interruptIdleWorkers(boolean onlyOne) {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers) {
            Thread t = w.thread;
            if (!t.isInterrupted() && w.tryLock()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                } finally {
                    w.unlock();
                }
            }
            if (onlyOne)
                break;
        }
    } finally {
        mainLock.unlock();
    }
}

該方法就是用於中斷那些空閒的Worker,怎麼判斷一個Worker是否空閒呢?這裏就是使用w.tryLock()是否能獲取鎖來表示一個Worker是否空閒。當Worker在處理任務的時候即不空閒都會獲取lock,因此這裏就是依據Worker的鎖是否被佔用了來斷定一個Worker是否空閒。

如當從新設置一個ThreadPoolExecutor的核心線程數的時候,若是當前線程數大於了新設置的核心線程數,就須要中斷那些空閒的線程

public void setCorePoolSize(int corePoolSize) {
    if (corePoolSize < 0)
        throw new IllegalArgumentException();
    int delta = corePoolSize - this.corePoolSize;
    this.corePoolSize = corePoolSize;
    if (workerCountOf(ctl.get()) > corePoolSize)
        interruptIdleWorkers();
    else if (delta > 0) {
        // We don't really know how many new threads are "needed".
        // As a heuristic, prestart enough new workers (up to new
        // core size) to handle the current number of tasks in
        // queue, but stop if queue becomes empty while doing so.
        int k = Math.min(delta, workQueue.size());
        while (k-- > 0 && addWorker(null, true)) {
            if (workQueue.isEmpty())
                break;
        }
    }
}

此時再想想,爲何必須是非重入鎖呢?首先說說這裏的重入場景:Worker內部的線程不會有重入的問題,始終在執行run方法,Worker自己是一個鎖,也就是其餘外部線程而非Worker內部的線程在使用Worker的鎖的時候是不容許重入的。可是這個不容許重入還有待繼續討論清楚。

#3 結束語

本文重點介紹了ThreadPoolExecutor的幾個參數的概念,以及重點分析了execute一個task的過程,還詳細介紹了Worker的實現細節。仍是有不少代碼值得仔細回味和推敲的。下一篇就開始介紹Executors中常見的幾種線程池形式。

相關文章
相關標籤/搜索