線程池源碼分析

1、問什麼要使用線程池?

 1.下降系統資源消耗 java

 2.提升線程可控性。緩存

 2、如何建立使用的線程池?

 jdk8提供了五種建立線程池的方法: 

 1.建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。安全

public static ExecutorService newFixedThreadPool(int nThreads) {      return new ThreadPoolExecutor(
         nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, 
         new LinkedBlockingQueue<Runnable>());  } 複製代碼

 2.jdk8新增的,會根據所需的併發數來動態建立和關閉線程。能合理的使用CPU進行對任務進行併發操做,因此適合使用在很耗時的任務。bash

注意返回的是ForkJoinPool 併發

public static ExecutorService newWorkStealingPool(int parallelism) {
      return new ForkJoinPool (
          parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); 
} 複製代碼

什麼是ForkJoinPool?
 public ForkJoinPool(int parallelism, ForkJoinWorkerThreadFactory factory, 
                     UncaughtExceptionHandler handler, boolean asyncMode) { 
          this(checkParallelism(parallelism), checkFactory(factory), handler, 
        asyncMode ? FIFO_QUEUE : LIFO_QUEUE, "ForkJoinPool-" + nextPoolId() + "-worker-");             checkPermission(); } 複製代碼


使用一個無限隊列來保存所須要執行的任務,能夠傳入線程的數量;不傳入,則默認使用當前計算機中可用的CPU數量;使用分治法來解決問題,使用 fork()和join()來進行調用。框架

 3.建立一個可緩存的線程池,可靈活回收空閒線程,若無可回收,則新建線程。 async

public static ExecutorService newCachedThreadPool() {       return new ThreadPoolExecutor(
       0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());  } 複製代碼

 4.建立一個單線程的線程池。 oop

public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (
        new ThreadPoolExecutor(
        1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); }複製代碼

 5.建立一個定長線程池,支持定時及週期性任務執行。源碼分析

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
    return new ScheduledThreadPoolExecutor(corePoolSize);  } 複製代碼

3、源碼分析

Executor結構:
ui


 Executor【接口】 一個運行新任務的簡單接口 

public interface Executor{
     void execute(Runnable command); 
} 複製代碼

 ExecutorService【接口】 擴展了Executor接口

添加了一些用來管理執行生命週期和任務生命週期的方法。


 AbstractExecutorService【抽象類】

對ExecutorService接口的抽象類實現。不是咱們分析的重點。 複製代碼

 ThreadPoolExecutor【類】 

Java線程池的核心實現 複製代碼

 4、ThreadPoolExecutor

屬性解釋:

// AtomicInteger是原子類  ctlOf()返回值爲RUNNING;private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));// 高3位表示線程狀態private static final int COUNT_BITS = Integer.SIZE - 3;// 低29位表示workerCount容量private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits// 能接收任務且能處理阻塞隊列中的任務private static final int RUNNING    = -1 << COUNT_BITS;// 不能接收新任務,但能夠處理隊列中的任務。private static final int SHUTDOWN   =  0 << COUNT_BITS;// 不接收新任務,不處理隊列任務。private static final int STOP       =  1 << COUNT_BITS;// 全部任務都終止private static final int TIDYING    =  2 << COUNT_BITS;// 什麼都不作private static final int TERMINATED =  3 << COUNT_BITS;// 存聽任務的阻塞隊列private final BlockingQueue<Runnable> workQueue;複製代碼

值得注意的是狀態值越大線程越不活躍。


線程池狀態的轉換模型:     

  

構造器:       

public ThreadPoolExecutor(int corePoolSize,//線程池初始啓動時線程的數量                          int maximumPoolSize,//最大線程數量                          long keepAliveTime,//空閒線程多久關閉?                          TimeUnit unit,// 計時單位                          BlockingQueue<Runnable> workQueue,//聽任務的阻塞隊列                          ThreadFactory threadFactory,//線程工廠                          RejectedExecutionHandler handler// 拒絕策略) {    if (corePoolSize < 0 ||        maximumPoolSize <= 0 ||        maximumPoolSize < corePoolSize ||        keepAliveTime < 0)        throw new IllegalArgumentException();    if (workQueue == null || threadFactory == null || handler == null)        throw new NullPointerException();    this.acc = System.getSecurityManager() == null ?            null :            AccessController.getContext();    this.corePoolSize = corePoolSize;    this.maximumPoolSize = maximumPoolSize;    this.workQueue = workQueue;    this.keepAliveTime = unit.toNanos(keepAliveTime);    this.threadFactory = threadFactory;    this.handler = handler;}複製代碼

在向線程池提交任務時,會經過兩個方法:execute和submit。

【本次只講execute方法,submit、Future、Callable一塊兒講】

execute方法:

public void execute(Runnable command) {    if (command == null)        throw new NullPointerException();    // clt記錄着runState和workerCount    int c = ctl.get();    //workerCountOf方法取出低29位的值,表示當前活動的線程數    //而後拿線程數和 核心線程數作比較    if (workerCountOf(c) < corePoolSize) {        // 若是活動線程數<核心線程數        // 添加到        //addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷        if (addWorker(command, true))            // 若是成功則返回            return;        // 若是失敗則從新獲取 runState和 workerCount        c = ctl.get();    }    // 若是當前線程池是運行狀態而且任務添加到隊列成功    if (isRunning(c) && workQueue.offer(command)) {        // 從新獲取 runState和 workerCount        int recheck = ctl.get();        // 若是不是運行狀態而且         if (! isRunning(recheck) && remove(command))            reject(command);        else if (workerCountOf(recheck) == 0)            //第一個參數爲null,表示在線程池中建立一個線程,但不去啓動            // 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize            addWorker(null, false);    }    //再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize    else if (!addWorker(command, false))        //若是失敗則拒絕該任務        reject(command);}複製代碼

總結一下它的工做流程:

  1. 當workerCount < corePoolSize,建立線程執行任務。
  2. 當workerCount >= corePoolSize && 阻塞隊列workQueue未滿,把新的任務放入阻塞隊列。
  3. 當workQueue已滿,而且workerCount >= corePoolSize,而且workerCount < maximumPoolSize,建立線程執行任務。
  4. 當workQueue已滿,workerCount >= maximumPoolSize,採起拒絕策略,默認拒絕策略是直接拋異常。


經過上面的execute方法能夠看出,最主要的邏輯仍是在addWorker方法中實現的。


addWorker方法:

主要工做就是在線程池中建立一個新的線程並執行。複製代碼

參數定義:

  •  firstTask : the task the new thread should run first(or null if none).(指定新增線程執行的第一個任務或者不執行任務)
  • core :if true use corePoolSize as bound,else maximumPoolSize.(core若是爲true則使用corePoolSize綁定,不然爲maximumPoolSize。)此處使用布爾指示符而不是值,以確保在檢查其餘狀態後讀取新值。

private boolean addWorker(Runnable firstTask, boolean core) {    retry:    for (;;) {        int c = ctl.get();        //  獲取運行狀態        int rs = runStateOf(c);        // Check if queue empty only if necessary.        // 若是狀態值 >= SHUTDOWN (不接新任務&不處理隊列任務)        // 而且 若是 !(rs爲SHUTDOWN 且 firsTask爲空 且 阻塞隊列不爲空)        if (rs >= SHUTDOWN &&            ! (rs == SHUTDOWN &&               firstTask == null &&               ! workQueue.isEmpty()))            // 返回false            return false;        for (;;) {            //獲取線程數wc            int wc = workerCountOf(c);            // 若是wc大與容量 || core若是爲true表示根據corePoolSize來比較,不然爲maximumPoolSize            if (wc >= CAPACITY ||                wc >= (core ? corePoolSize : maximumPoolSize))                return false;            // 增長workerCount(原子操做)            if (compareAndIncrementWorkerCount(c))                // 若是增長成功,則跳出                break retry;            // wc增長失敗,則再次獲取runState            c = ctl.get();  // Re-read ctl            // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回從新執行            if (runStateOf(c) != rs)                continue retry;            // else CAS failed due to workerCount change; retry inner loop        }    }    boolean workerStarted = false;    boolean workerAdded = false;    Worker w = null;    try {        // 根據firstTask來建立Worker對象        w = new Worker(firstTask);        // 根據worker建立一個線程        final Thread t = w.thread;        if (t != null) {            // new一個鎖            final ReentrantLock mainLock = this.mainLock;            // 加鎖            mainLock.lock();            try {                // Recheck while holding lock.                // Back out on ThreadFactory failure or if                // shut down before lock acquired.                // 獲取runState                int rs = runStateOf(ctl.get());                // 若是rs小於SHUTDOWN(處於運行)或者(rs=SHUTDOWN && firstTask == null)                // firstTask == null證實只新建線程而不執行任務                if (rs < SHUTDOWN ||                    (rs == SHUTDOWN && firstTask == null)) {                    // 若是t活着就拋異常                    if (t.isAlive()) // precheck that t is startable                        throw new IllegalThreadStateException();                    // 不然加入worker(HashSet)                    //workers包含池中的全部工做線程。僅在持有mainLock時訪問。                    workers.add(w);                    // 獲取工做線程數量                    int s = workers.size();                    //largestPoolSize記錄着線程池中出現過的最大線程數量                    if (s > largestPoolSize)                        // 若是 s比它還要大,則將s賦值給它                        largestPoolSize = s;                    // worker的添加工做狀態改成true                        workerAdded = true;                }            } finally {                mainLock.unlock();            }            // 若是worker的添加工做完成            if (workerAdded) {                // 啓動線程                t.start();                // 修改線程啓動狀態                workerStarted = true;            }        }    } finally {        if (! workerStarted)            addWorkerFailed(w);    }    // 返回線啓動狀態    return workerStarted;複製代碼

爲何須要持有mainLock?

由於workers是HashSet類型的,不能保證線程安全。

w = new Worker(firstTask);如何理解呢

Work.java

private final class Worker    extends AbstractQueuedSynchronizer    implements Runnable複製代碼

能夠看到它集成AQS併發框架還發現了Runnable。證實它仍是一個線程任務類。那咱們調用t.start()事實上就是調用了該類重寫的run方法。

Worker爲何使用ReentrantLock來實現呢?

tryAcquire方法它是不容許重入的,而ReentrantLock是容許重入的。對於線程來講,若是線程正在執行是不容許其餘鎖重入進來的。

線程只須要兩個狀態,一個是獨佔鎖,代表是正在執行任務;一個是不加鎖,代表是空閒狀態。

public void run(){
    runnWorker(this);
}複製代碼

run方法有調用了runnWorker方法:

final void runWorker(Worker w) {    // 拿到當前線程    Thread wt = Thread.currentThread();    // 拿到當前任務    Runnable task = w.firstTask;    // 將Worker.firstTask置空 而且釋放鎖    w.firstTask = null;    w.unlock(); // allow interrupts    boolean completedAbruptly = true;    try {        // 若是task或者getTask不爲空,則一直循環        while (task != null || (task = getTask()) != null) {            // 加鎖            w.lock();            // If pool is stopping, ensure thread is interrupted;            // if not, ensure thread is not interrupted.  This            // requires a recheck in second case to deal with            // shutdownNow race while clearing interrupt            //  return ctl.get() >= stop             // 若是線程池狀態>=STOP 或者 (線程中斷且線程池狀態>=STOP)且當前線程沒有中斷            // 其實就是保證兩點:            // 1. 線程池沒有中止            // 2. 保證線程沒有中斷            if ((runStateAtLeast(ctl.get(), STOP) ||                 (Thread.interrupted() &&                  runStateAtLeast(ctl.get(), STOP))) &&                !wt.isInterrupted())                // 中斷當前線程                wt.interrupt();            try {                // 空方法                beforeExecute(wt, task);                Throwable thrown = null;                try {                    // 執行run方法(Runable對象)                    task.run();                } catch (RuntimeException x) {                    thrown = x; throw x;                } catch (Error x) {                    thrown = x; throw x;                } catch (Throwable x) {                    thrown = x; throw new Error(x);                } finally {                    afterExecute(task, thrown);                }            } finally {                // 執行完後, 將task置空, 完成任務++, 釋放鎖                task = null;                w.completedTasks++;                w.unlock();            }        }        completedAbruptly = false;    } finally {        // 退出工做        processWorkerExit(w, completedAbruptly);    }複製代碼

總結一下runWorker方法的執行流程:

  1. while循環中,不斷的經過getTask方法從workerQueue中獲取任務
  2. 若是線程池正在中止,則中斷線程。不然調用3.
  3. 調用task.run()執行任務。
  4. 若是task爲null則跳出循環,執行processWorkExit()方法,銷燬線程workers.remove(w).

這個流程圖很是經典:


除此以外,ThreadPoolExecutor還提供了tryAcquire、tryRelease、shutdown、shutdownNow、tryTerminate、等涉及的一系列線程狀態更改的方法。


在runWorker方法中,爲何要在執行任務的時候對每一個工做線程都加鎖呢?

shutdown方法與getTask方法存在競爭條件。

相關文章
相關標籤/搜索