Java併發系列 — 線程池

Java中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。java

在開發過程當中,合理地使用線程池可以帶來3個好處:git

  • 下降資源消耗:經過重複利用已建立的線程下降線程建立和銷燬形成的消耗。github

  • 提升響應速度:當任務到達時,任務能夠不須要等到線程建立就能當即執行。數據庫

  • 提升線程的可管理性:線程是稀缺資源,若是無限制地建立,不只會消耗系統資源,還會下降系統的穩定性,使用線程池能夠進行統一分配、調優和監控。可是,要作到合理利用線程池,必須對其實現原理了如指掌。數組

如何建立線程池

一、使用Executors工廠類提供的靜態方法來建立線程池,方法以下:服務器

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>());
}

public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                    60L, TimeUnit.SECONDS,
                                    new SynchronousQueue<Runnable>());
}
複製代碼

二、經過ThreadPoolExecutor的構造函數建立,代碼以下:多線程

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            threadFactory, defaultHandler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) {
    this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
            Executors.defaultThreadFactory(), handler);
}

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}
複製代碼

參數說明:併發

  • 一、corePoolSize框架

    線程池中的核心線程數,當提交一個任務時,線程池建立一個新線程執行任務,直到當前線程數等於 corePoolSize;若是當前線程數爲corePoolSize,繼續提交的任務被保存到阻塞隊列中,等待被執行;若是執行了線程池的prestartAllCoreThreads()方法,線程池會提早建立並啓動全部核心線程。異步

  • 二、maximumPoolSize

    線程池中容許的最大線程數。若是當前阻塞隊列滿了,且繼續提交任務,則建立新的線程執行任務,前提是當前線程數小於maximumPoolSize。

  • 三、keepAliveTime

    線程空閒時的存活時間,即當線程沒有任務執行時,繼續存活的時間;默認狀況下,該參數只在線程數大於corePoolSize時纔有用。

  • 四、unit

    keepAliveTime的單位

  • 五、workQueue

    用來保存等待被執行的任務的阻塞隊列,且任務必須實現Runable接口。

    在JDK中提供了以下阻塞隊列: ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務;

    LinkedBlockingQuene:基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量一般要高於ArrayBlockingQuene;

    SynchronousQuene:一個不存儲元素的阻塞隊列,每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQuene;

    priorityBlockingQuene:具備優先級的無界阻塞隊列;

  • 六、threadFactory

    建立線程的工廠,經過自定義的線程工廠能夠給每一個新建的線程設置一個具備識別度的線程名。

  • 七、handler

    線程池的飽和策略,當阻塞隊列滿了,且沒有空閒的工做線程,若是繼續提交任務,必須採起一種策略處理該任務,線程池提供了4種策略:

    1. AbortPolicy:直接拋出異常,默認策略;
    2. CallerRunsPolicy:用調用者所在的線程來執行任務;
    3. DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
    4. DiscardPolicy:直接丟棄任務;

    固然也能夠根據應用場景實現RejectedExecutionHandler接口,自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。

使用示例

public class ThreadPoolExecutorExample {

    // 一、經過threadPoolExecutor的構造函數建立線程池
    private static ThreadPoolExecutor threadPoolExecutor = 
            new ThreadPoolExecutor(5, 10, 30, TimeUnit.SECONDS,
                                    new ArrayBlockingQueue<Runnable>(10));


    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 二、使用execute方法執行沒有返回結果的任務
        threadPoolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                new Task().doSomething();
            }
        });

        // 三、使用submit方法執行有返回結果的任務且須要實現Callable接口
        Future future = threadPoolExecutor.submit(new Callable<Integer>() {
              @Override
              public Integer call() throws Exception {
                  return new Task().doOtherthing();
              }
          }
        );
        System.out.println(future.get());
    }
}

class Task {

    public void doSomething() {
        System.out.println("doSomeThing ...");
    }

    public int doOtherthing() {
        System.out.println("doOtherthing ..., and return 10.");
        return 10;
    }
}
複製代碼

execute方法和submit方法的區別

  • execute方法

    用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功。

  • submit方法

    用於提交須要返回值的任務。線程池會返回一個future類型的對象,經過這個future對象能夠判斷任務是否執行成功。get()方法會阻塞當前線程直到任務完成;get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後當即返回,這時候有可能任務沒有執行完。

ThreadPoolExecutor的實現原理

當向線程池提交一個任務以後,線程池是如何處理這個任務的呢?線程池的主要處理流程以下:

ThreadPoolExecutor的實現原理

  1. 若是當前運行的線程小於corePoolSize,則建立新線程來執行;
  2. 若是運行的線程等於或多於corePoolSize,則將任務加入BlockingQueue;
  3. 若是沒法將任務加入BlockingQueue(隊列已滿),則建立新的線程來處理任務;
  4. 若是建立新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用handler.rejectedExecution(command, this)方法。

ThreadPoolExecutor採起上述步驟的整體設計思路,是爲了在執行executor()方法時,儘量地避免獲取全局鎖(那將會是一個嚴重的可伸縮瓶頸)。在ThreadPoolExecutor完成預熱以後(當前運行的線程數大於等於corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。

ThreadPoolExecutor源碼分析

類結構圖

ThreadPoolExecutor類結構圖.png

核心變量與方法(狀態轉換)

// 初始化狀態和數量,狀態爲RUNNING,線程數爲0
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 前3位表示狀態,全部線程數佔29位
private static final int COUNT_BITS = Integer.SIZE - 3;
// 線程池容量大小爲 1 << 29 - 1
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// 線程池狀態
// RUNNING狀態:11100000000000000000000000000000(前3位爲111)
private static final int RUNNING    = -1 << COUNT_BITS;
// SHUTDOWN狀態:00000000000000000000000000000000(前3位爲000)
private static final int SHUTDOWN   =  0 << COUNT_BITS;
// STOP狀態:00100000000000000000000000000000(前3位爲001)
private static final int STOP       =  1 << COUNT_BITS;
// TIDYING狀態:01000000000000000000000000000000(前3位爲010)
private static final int TIDYING    =  2 << COUNT_BITS;
// TERMINATED狀態:01100000000000000000000000000000(前3位爲011)
private static final int TERMINATED =  3 << COUNT_BITS;

// 獲得狀態
private static int runStateOf(int c) { return c & ~CAPACITY; }
// 獲得線程數
private static int workerCountOf(int c) { return c & CAPACITY; }
private static int ctlOf(int rs, int wc) { return rs | wc; }

複製代碼

ThreadPoolExecutor線程池有5個狀態,分別是:

  1. RUNNING:能夠接受新的任務,也能夠處理阻塞隊列裏的任務。
  2. SHUTDOWN:不接受新的任務,可是能夠處理阻塞隊列裏的任務。
  3. STOP:不接受新的任務,不處理阻塞隊列裏的任務,中斷正在處理的任務。
  4. TIDYING:過渡狀態,也就是說全部的任務都執行完了,當前線程池已經沒有有效的線程,這個時候線程池的狀態將會TIDYING,而且將要調用terminated方法。
  5. TERMINATED:終止狀態。terminated方法調用完成之後的狀態。

線程池的狀態轉換過程:

  • RUNNING -> SHUTDOWN On invocation of shutdown(), perhaps implicitly in finalize()
  • (RUNNING or SHUTDOWN) -> STOP On invocation of shutdownNow()
  • SHUTDOWN -> TIDYING When both queue and pool are empty
  • STOP -> TIDYING When pool is empty
  • TIDYING -> TERMINATED When the terminated() hook method has completed

Threads waiting in awaitTermination() will return when the state reaches TERMINATED.

核心方法

execute方法

使用ThreadPoolExecutor執行任務的時候,可使用execute或submit方法,submit方法以下:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
複製代碼

經過源碼可知submit方法一樣也是由execute()完成的,execute()方法源碼以下:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    // 過程1
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }

    // 過程2
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        // 再次檢查線程池狀態
        if (! isRunning(recheck) && remove(command))
            reject(command);
        // 線程池中沒有可用的工做線程時 
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false)) // 過程3
        // 過程4
        reject(command);
}
複製代碼

addWorker方法

addWorker方法的主要工做就是建立一個工做線程執行任務,代碼以下:

/* * firstTask參數:用於指定新增的線程執行的第一個任務。 * core爲true:表示在新增線程時會判斷當前活動線程數是否少於corePoolSize, * false表示新增線程前須要判斷當前活動線程數是否少於maximumPoolSize。 */
private boolean addWorker(Runnable firstTask, boolean core) {
    retry:
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        /** * 只有當下面兩種狀況會繼續執行,其餘直接返回false(添加失敗) * 一、rs == RUNNING * 二、rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty() *(執行了shutdown方法,可是阻塞隊列還有任務沒有執行) */
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
                firstTask == null &&
                ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 判斷工做線程的數量是否超過線程池的限制
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // workerCount加1成功,跳出兩層循壞。
            if (compareAndIncrementWorkerCount(c))
                break retry;
            /** * 能執行到這裏,都是由於多線程競爭,只有兩種狀況 * 一、workCount發生變化,compareAndIncrementWorkerCount失敗, * 這種狀況不須要從新獲取ctl,繼續for循環便可。 * 二、runState發生變化,可能執行了shutdown或者shutdownNow, * 這種狀況從新走retry,取得最新的ctl並判斷狀態。 */
            c = ctl.get();  // Re-read ctl
            if (runStateOf(c) != rs)
                continue retry;
        }
    }

    // worker是否執行標識
    boolean workerStarted = false;
    // worker是否添加成功標識
    boolean workerAdded = false;
    // 保存建立的worker變量
    Worker w = null;
    try {
        w = new Worker(firstTask);
        final Thread t = w.thread;
        // 檢查線程是否建立成功
        if (t != null) {
            // 加鎖
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // 加鎖成功,從新檢查線程池的狀態 
                int rs = runStateOf(ctl.get());

                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    // 將w存儲到workers容器中
                    workers.add(w);
                    int s = workers.size();
                    if (s > largestPoolSize)
                        largestPoolSize = s;
                    // 添加成功標識
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
                // 執行任務
                t.start();
                workerStarted = true;
            }
        }
    } finally {
        if (! workerStarted)
            // 失敗回退,從 wokers 移除 w, 線程數減一,嘗試結束線程池(調用tryTerminate 方法)
            addWorkerFailed(w);
    }
    return workerStarted;
}
複製代碼

Worker類

在分析t.start()以前,須要瞭解Worker類。其源碼以下:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {

    // 工做線程
    final Thread thread;
    // 初始化任務
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;

    Worker(Runnable firstTask) {
        // 禁止中斷,直到runWorker
        setState(-1);
        this.firstTask = firstTask;
        // 很重要,worker實例被包裝成thread執行的任務。
        // 這樣t.start啓動後,將運行Worker的run方法。
        this.thread = getThreadFactory().newThread(this);
    }

    public void run() {
        runWorker(this);
    }

    // 實現AQS的相關方法
    protected boolean isHeldExclusively() {
        return getState() != 0;
    }

    protected boolean tryAcquire(int unused) {
        if (compareAndSetState(0, 1)) {
            setExclusiveOwnerThread(Thread.currentThread());
            return true;
        }
        return false;
    }

    protected boolean tryRelease(int unused) {
        setExclusiveOwnerThread(null);
        setState(0);
        return true;
    }

    public void lock() { acquire(1); }
    public boolean tryLock() { return tryAcquire(1); }
    public void unlock() { release(1); }
    public boolean isLocked() { return isHeldExclusively(); }

    void interruptIfStarted() {
        Thread t;
        if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
            try {
                t.interrupt();
            } catch (SecurityException ignore) {
            }
        }
    }
}
複製代碼
runWorker方法
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    Runnable task = w.firstTask;
    w.firstTask = null;
    w.unlock(); // allow interrupts
    boolean completedAbruptly = true;
    try {
        // 自旋操做,獲取隊列中的任務
        while (task != null || (task = getTask()) != null) {
            // 加鎖的做用是線程池關閉時,防止正在執行工做線程被中斷。
            w.lock();
             /* * 在執行任務以前先作一些處理。 * 1. 若是線程池已經處於STOP狀態而且當前線程沒有被中斷,中斷線程。 * 2. 若是線程池還處於RUNNING或SHUTDOWN狀態,而且當前線程已經被中斷了, * 從新檢查一下線程池狀態,若是處於* * STOP狀態而且沒有被中斷,那麼中斷線程。 */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                    (Thread.interrupted() &&
                    runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();

            try {
                // hook method
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                     // 真正的開始執行任務,調用的是run方法,而不是start方法。
                    // 這裏run的時候可能會被中斷,好比線程池調用了shutdownNow方法
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    // hook method
                    afterExecute(task, thrown);
                }
            } finally {
                task = null;
                w.completedTasks++;
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 回收woker
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼
getTask方法
private Runnable getTask() {

    boolean timedOut = false;

    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);

        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
            decrementWorkerCount();
            return null;
        }

        int wc = workerCountOf(c);

        // 計算從隊列獲取任務的方式( poll or take)
        boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

        // 當工做線程超過其最大值或者timed = true時其workQueue.isEmpty()時,返回null。
        // 這意味爲該worker將被回收。
        if ((wc > maximumPoolSize || (timed && timedOut))
            && (wc > 1 || workQueue.isEmpty())) {
            if (compareAndDecrementWorkerCount(c))
                return null;
            continue;
        }

        try {
            Runnable r = timed ?
                workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                workQueue.take();
            if (r != null)
                return r;
            timedOut = true;
        } catch (InterruptedException retry) {
            timedOut = false;
        }
    }
}
複製代碼

由上可知,當allowCoreThreadTimeOut爲true時,若是隊列長時間沒有任務,工做線程最終都會被銷燬。

其餘知識點

關閉線程池

能夠經過調用線程池的shutdownshutdownNow方法來關閉線程池。它們的原理是遍歷線程池中的工做線程,而後逐個調用線程的interrupt方法來中斷線程,因此沒法響應中斷的任務可能永遠沒法終止。

可是它們存在必定的區別,shutdownNow首先將線程池的狀態設置成STOP,而後嘗試中止全部的正在執行或暫停任務的線程,並返回等待執行任務的列表,而shutdown只是將線程池的狀態設置成SHUTDOWN狀態,而後中斷全部沒有正在執行任務的線程。

只要調用了這兩個關閉方法中的任意一個,isShutdown方法就會返回true。當全部的任務都已關閉後,才表示線程池關閉成功,這時調用isTerminaed方法會返回true。

合理地配置線程池

要想合理地配置線程池,就必須首先分析任務特性,能夠從如下幾個角度來分析。

  • 任務的性質:CPU密集型任務、IO密集型任務和混合型任務。

  • 任務的優先級:高、中和低。

  • 任務的執行時間:長、中和短。

  • 任務的依賴性:是否依賴其餘系統資源,如數據庫鏈接。

性質不一樣的任務能夠用不一樣規模的線程池分開處理:

1)CPU密集型任務應配置儘量小的線程,如配置N cpu +1個線程的線程池。

2)IO密集型任務線程並非一直在執行任務,則應配置儘量多的線程,如2*N cpu 。

3)混合型的任務,若是能夠拆分,將其拆分紅一個CPU密集型任務和一個IO密集型任務,只要這兩個任務執行的時間相差不是太大,那麼分解後執行的吞吐量將高於串行執行的吞吐量。若是這兩個任務執行時間相差太大,則不必進行分解。

優先級不一樣的任務可使用優先級隊列PriorityBlockingQueue來處理。它可讓優先級高的任務先獲得執行,須要注意的是若是一直有優先級高的任務提交到隊列裏,那麼優先級低的任務可能永遠不能執行。

執行時間不一樣的任務能夠交給不一樣規模的線程池來處理,或者也可使用優先級隊列,讓執行時間短的任務先執行。

依賴數據庫鏈接池的任務,由於線程提交SQL後須要等待數據庫返回結果,若是等待的時間越長CPU空閒時間就越長,那麼線程數應該設置越大,這樣才能更好的利用CPU。

能夠經過Runtime.getRuntime().availableProcessors()方法得到當前設備的CPU個數

建議使用有界隊列,有界隊列能增長系統的穩定性和預警能力,能夠根據須要設大一點,好比幾千。有一次咱們組使用的後臺任務線程池的隊列和線程池全滿了,不斷的拋出拋棄任務的異常,經過排查發現是數據庫出現了問題,致使執行SQL變得很是緩慢,由於後臺任務線程池裏的任務全是須要向數據庫查詢和插入數據的,因此致使線程池裏的工做線程所有阻塞住,任務積壓在線程池裏。若是當時咱們設置成無界隊列,線程池的隊列就會愈來愈多,有可能會撐滿內存,致使整個系統不可用,而不僅是後臺任務出現問題。固然咱們的系統全部的任務是用的單獨的服務器部署的,而咱們使用不一樣規模的線程池跑不一樣類型的任務,可是出現這樣問題時也會影響到其餘任務。

線程池監控

若是在系統中大量使用線程池,則有必要對線程池進行監控,方便在出現問題時,能夠根據線程池的使用情況快速定位問題。能夠經過線程池提供的參數進行監控,在監控線程池的時機可使用如下屬性:

  • taskCount:線程池須要執行的任務數量。
  • completedTaskCount:線程池在運行過程當中已完成的任務數量,小於或等於taskCount。
  • largestPoolSize:線程池裏曾經建立過的最大線程數量。經過這個數據能夠直到線程池是否曾經滿過。
  • getPoolSize:線程池的線程數量。
  • getActiveCount:獲取活動的線程數。

經過擴展線程池進行監控。能夠經過繼承線程池來自定義線程池,重寫線程池的beforeExecuteafterExecuteterminated方法,也能夠在任務執行前、執行後和線程池關閉以前執行一些代碼來進行監控。

參考資料

  1. Java線程池ThreadPoolExecutor源碼分析

  2. 【細談Java併發】談談線程池:ThreadPoolExecutor


若是讀完以爲有收穫的話,歡迎點贊、關注、加公衆號【牛覓技術】,查閱更多精彩歷史!!!

相關文章
相關標籤/搜索