【死磕JDK源碼】ThreadPoolExecutor源碼保姆級詳解

位運算表示線程池狀態,由於位運算是改變當前值的一種高效手段。java

屬性

線程池狀態

Integer 有32位:緩存

  • 最左邊3位表示線程池狀態,可表示從0至7的8個不一樣數值
  • 最右邊29位表工做線程數
private static final int COUNT_BITS = Integer.SIZE - 3;
複製代碼

線程池的狀態用高3位表示,其中包括了符號位。五種狀態的十進制值按從小到大依次排序爲:bash

RUNNING < SHUTDOWN < STOP < TIDYING <TERMINATED
複製代碼

這就能經過比較值的大小來肯定線程池的狀態。markdown

源碼中常常出現經過isRunning判斷 併發

000-1111111111111111111111111; 相似於子網掩碼,用於與運算。獲得左3位 or 右29位 框架

  • 111 - 0000000000000000000000000000(十進制: -536, 870, 912);

該狀態表示:線程池能接受新任務 異步

  • 000 - 0000000000000000000000000(十進制: 0);

再也不接受新任務,但可繼續執行隊列中的任務 ide

  • 001 - 00000000000000000000000000(十進制: 536,870, 912);

全面拒絕,並中斷正在處理的任務 函數

  • 010 - 00000000000000000000000000.(十進制值: 1, 073, 741, 824);

全部任務已經被終止 oop

  • 101 - 000000000000000000000000000(十進制值: 1, 610,612, 736)

已清理完現場

與運算

000 - 11111111111111111111 位掩碼,用於後續的掩碼參與計算。

private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
複製代碼

好比:001 - 000000000000000000000100011 表 35 個工做線程。 掩碼取反:111 - 00000000000000000000000,即獲得左邊3位001; 表示線程池當前處於STOP狀態

private static int runStateOf(int c) { return c & ~CAPACITY; }
複製代碼

同理掩碼 000 - 11111111111111111111,獲得右29位,即工做線程數:

private static int workerCountOf(int c) { return c & CAPACITY; }
複製代碼

把左3位與右29位或運算,合併成一個值

private static int ctlOf(int rs, int wc) { return rs | wc; }
複製代碼

線程池狀態控制器,保存了線程池的狀態、工做線程數

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
複製代碼

咱們都知道Executor接口有且只有一個方法execute(),經過參數傳入待執行線程的對象。

execute

線程池執行任務

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /** * 分 3 步: * 2. 若是一個任務能夠成功排隊,那麼仍需double-check咱們是否應該添加一個線程(由於自上次檢查以來現有線程已死亡)或池在進入此方法後關閉.因此咱們從新檢查狀態,並在必要時回滾.若是中止,或者若是沒有,則啓動一個新線程 * 3. 若是咱們沒法將任務排隊,則嘗試添加一個新線程。若是失敗,咱們知道咱們已經關閉或飽和,所以拒絕該任務 */
    int c = ctl.get();

    // 1. 若工做線程數 < 核心線程數,則建立新線程並執行當前任務
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
	        return;
         // 若建立失敗,爲防止外部已經在線程池中加入新任務,在此從新獲取一下
         c = ctl.get();
     }
複製代碼

execute方法在不一樣的階段有三次addWorker的嘗試動做。

// 若 工做線程數 >=核心線程數 或線程建立失敗,則將當前任務放到工做隊列中
// 只有線程池處於 RUNNING 態,才執行後半句 : 置入隊列
if (isRunning(c) && workQueue.offer(command)) {
    int recheck = ctl.get();
    
    // 只有線程池處於 RUNNING 態,才執行後半句 : 置入隊列
    if (! isRunning(recheck) && remove(command))
        reject(command);
    // 若以前的線程已被消費完,新建一個線程
    else if (workerCountOf(recheck) == 0)
        addWorker(null, false);
// 核心線程和隊列都已滿,嘗試建立一個新線程
}
else if (!addWorker(command, false))
    // 拋出RejectedExecutionException異常
    // 若 addWorker 返回是 false,即建立失敗,則喚醒拒絕策略.
    reject(command);
}
複製代碼

發生拒絕的理由有兩個 ( 1 )線程池狀態爲非RUNNING狀態 (2)等待隊列已滿。

下面繼續分析addWorker

addWorker 源碼解析

原子性地檢查 runState 和 workerCount,經過返回 false 來防止在不該該添加線程時出現誤報。

根據當前線程池狀態,檢查是否能夠添加新的線程:

  • 若可

則建立並啓動任務;若一切正常則返回true;

  • 返回false的可能緣由:
  1. 線程池沒有處RUNNING
  2. 線程工廠建立新的任務線程失敗

參數

  • firstTask

外部啓動線程池時須要構造的第一個線程,它是線程的母體

  • core

新增工做線程時的判斷指標 - true 須要判斷當前RUNNING態的線程是否少於corePoolsize - false 須要判斷當前RUNNING態的線程是否少於maximumPoolsize

JDK8源碼

private boolean addWorker(Runnable firstTask, boolean core) {
	// 1. 不須要任務預約義的語法標籤,響應下文的continue retry
	// 快速退出多層嵌套循環
    retry:
    // 外自旋,判斷線程池的運行狀態
    for (;;) {
        int c = ctl.get();
        int rs = runStateOf(c);
		// 2. 若RUNNING態,則條件爲false,不執行後面判斷
		// 若STOP及以上狀態,或firstTask初始線程非空,或隊列爲空
		// 都會直接返回建立失敗
        // Check if queue empty only if necessary.
        if (rs >= SHUTDOWN &&
            ! (rs == SHUTDOWN &&
               firstTask == null &&
               ! workQueue.isEmpty()))
            return false;

        for (;;) {
            int wc = workerCountOf(c);
            // 若超過最大容許線程數,則不能再添加新線程
            if (wc >= CAPACITY ||
                wc >= (core ? corePoolSize : maximumPoolSize))
                return false;
            // 3. 將當前活動線程數+1
            if (compareAndIncrementWorkerCount(c))
                break retry;
			// 線程池狀態和工做線程數是可變化的,需常常讀取最新值
            c = ctl.get();  // Re-read ctl
            // 若已關閉,則再次從retry 標籤處進入,在第2處再作判斷(第4處)
            if (runStateOf(c) != rs)
                continue retry;
            //若是線程池仍是RUNNING態,說明僅僅是第3處失敗
//繼續循環執行(第5外) 
            // else CAS failed due to workerCount change; retry inner loop
        }
    }
    
	// 開始建立工做線程
    boolean workerStarted = false;
    boolean workerAdded = false;
    Worker w = null;
    try {
        // 利用Worker 構造方法中的線程池工廠建立線程,並封裝成工做線程Worker對象
        // 和 AQS 有關!!!
        w = new Worker(firstTask);
        // 6. 注意這是Worker中的屬性對象thread
        final Thread t = w.thread;
        if (t != null) {
        	// 在進行ThreadpoolExecutor的敏感操做時
        	// 都須要持有主鎖,避免在添加和啓動線程時被幹擾
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                // Recheck while holding lock.
                // Back out on ThreadFactory failure or if
                // shut down before lock acquired.
                int rs = runStateOf(ctl.get());
				// 當線程池狀態爲RUNNING 或SHUTDOWN
				// 且firstTask 初始線程爲空時
                if (rs < SHUTDOWN ||
                    (rs == SHUTDOWN && firstTask == null)) {
                    if (t.isAlive()) // precheck that t is startable
                        throw new IllegalThreadStateException();
                    workers.add(w);
                    int s = workers.size();
                    // 整個線程池在運行期間的最大併發任務個數
                    if (s > largestPoolSize)
                    	// 更新爲工做線程的個數
                        largestPoolSize = s;
                    // 新增工做線程成功 
                    workerAdded = true;
                }
            } finally {
                mainLock.unlock();
            }
            if (workerAdded) {
            	// 看到親切迷人的start方法了!
            	// 這並不是線程池的execute 的command 參數指向的線程
                t.start();
                workerStarted = true;
            }
        }
    } finally {
	    // 線程啓動失敗,把剛纔第3處加,上的工做線程計數再減-回去
        if (! workerStarted)
            addWorkerFailed(w);
    }
    return workerStarted;
}
複製代碼
  • 第1處,配合循環語句出現的標籤,相似於goto語法做用

label 定義時,必須把標籤和冒號的組合語句牢牢相鄰定義在循環體以前,不然編譯報錯。目的是在實現多重循環時可以快速退出到任何一層。出發點彷佛很是貼心,但在大型軟件項目中,濫用標籤行跳轉的後果將是沒法維護的!

workerCount 加1成功後,直接退出兩層循環。

  • 第2處,這樣的表達式不利於閱讀,應如是

  • 第3處,與第1處的標籤呼應,AtomicInteger對象的加1操做是原子性的;

break retry表 直接跳出與retry 相鄰的這個循環體

  • 第4處,此continue跳轉至標籤處,繼續執行循環.

若是條件爲false,則說明線程池還處於運行狀態,即繼續在for(;)循環內執行.

  • 第5處,compareAndIncrementWorkerCount 方法執行失敗的機率很是低.

即便失敗,再次執行時成功的機率也是極高的,相似於自旋原理. 這裏是先加1,建立失敗再減1,這是輕量處理併發建立線程的方式; 若是先建立線程,成功再加1,當發現超出限制後再銷燬線程,那麼這樣的處理方式明顯比前者代價要大.

  • 第6處,Worker 對象是工做線程的核心類實現,部分源碼以下

它實現了Runnable接口,並把本對象做爲參數輸入給run()中的runWorker (this); 因此內部屬性線程threadstart的時候,即會調用runWorker.

t 究竟是誰?

setState(-1)是爲什麼

設置個簡單的狀態,檢查狀態以防止中斷。在調用中止線程池時會判斷state 字段,決定是否中斷之。

總結

線程池的相關源碼比較精煉,還包括線程池的銷燬、任務提取和消費等,與線程狀態圖同樣,線程池也有本身獨立的狀態轉化流程,本節再也不展開。 總結一下,使用線程池要注意以下幾點: (1)合理設置各種參數,應根據實際業務場景來設置合理的工做線程數。 (2)線程資源必須經過線程池提供,不容許在應用中自行顯式建立線程。 (3)建立線程或線程池時請指定有意義的線程名稱,方便出錯時回溯。

線程池不容許直接使用Executors,而應該經過ThreadPoolExecutor建立,這樣的處理方式能更加明確線程池的運行規則,規避資源耗盡的風險。 這些方法最終都調用了ThreadPoolExecutor和ScheduledThreadPoolExecutor的構造函數 而ScheduledThreadPoolExecutor繼承自ThreadPoolExecutor

0.2 ThreadPoolExecutor 自定義線程池

[外鏈圖片轉存失敗,源站可能有防盜鏈機制,建議將圖片保存下來直接上傳(img-5A6eRvc8-1570557031390)(uploadfiles.nowcoder.com/images/2019… "圖片標題")] 它們都是某種線程池,能夠控制線程建立,釋放,並經過某種策略嘗試複用線程去執行任務的一個管理框架

,所以最終全部線程池的構造函數都調用了Java5後推出的ThreadPoolExecutor的以下構造函數

Java默認提供的線程池

Java中的線程池是運用場景最多的併發框架,幾乎全部須要異步或併發執行任務的程序均可以使用線程池。

new Thread的弊端:

  • 每次new Thread新建對象,性能差
  • 線程缺少統一管理,可能無限制的新建線程,相互競爭,有可能佔用過多系統資源致使死機或OOM
  • 缺乏更多功能,如更多執行、按期執行、線程中斷

而使用線程的好處:

  • 重用存在的線程,減小對象建立、消亡的開銷,性能佳
  • 可有效控制最大併發線程數,提升系統資源利用率,同時能夠避免過多資源競爭,避免阻塞
  • 提供定時執行、按期執行、單線程、併發數控制等功能

咱們只須要將待執行的方法放入 run 方法中,將 Runnable 接口的實現類交給線程池的 execute 方法,做爲他的一個參數,好比:

Executor e = Executors.newSingleThreadExecutor();           
e.execute(new Runnable(){ //匿名內部類 public void run(){ 
//須要執行的任務 
} 
}); 
複製代碼

線程池原理 - 任務execute過程

  • 流程圖

  • 示意圖

ThreadPoolExecutor執行execute():

  • 若當前運行的線程少於corePoolSize,則建立新線程來執行任務(該步須要獲取全局鎖)
  • 若運行的線程多於或等於corePoolSize,且工做隊列沒滿,則將新提交的任務存儲在工做隊列裏。即,將任務加入BlockingQueue
  • 若沒法將任務加入BlockingQueue,且沒達到線程池最大數量,則建立新的線程來處理任務(該步須要獲取全局鎖)
  • 若建立新線程將使當前運行的線程超出maximumPoolSize,任務將被拒絕,並調用RejectedExecutionHandler.rejectedExecution()

採起上述思路,是爲了在執行execute()時,儘量避免獲取全局鎖。在ThreadPoolExecutor完成預熱後(當前運行的線程數≥corePoolSize),幾乎全部的execute()方法調用都是執行步驟2,而步驟2不須要獲取全局鎖。

實例

  • 結果

源碼分析

/** * 檢查是否能夠根據當前池狀態和給定的邊界(核心或最大) * 添加新工做線程。若是是這樣,工做線程數量會相應調整,若是可能的話,一個新的工做線程建立並啓動 * 將firstTask做爲其運行的第一項任務。 * 若是池已中止此方法返回false * 若是線程工廠在被訪問時未能建立線程,也返回false * 若是線程建立失敗,或者是因爲線程工廠返回null,或者因爲異常(一般是在調用Thread.start()後的OOM)),咱們乾淨地回滾。 */
    private boolean addWorker(Runnable firstTask, boolean core) { 
    /** * Check if queue empty only if necessary. * * 若是線程池已關閉,並知足如下條件之一,那麼不建立新的 worker: * 1. 線程池狀態大於 SHUTDOWN,也就是 STOP, TIDYING, 或 TERMINATED * 2. firstTask != null * 3. workQueue.isEmpty() * 簡單分析下: * 狀態控制的問題,當線程池處於 SHUTDOWN ,不容許提交任務,可是已有任務繼續執行 * 當狀態大於 SHUTDOWN ,不容許提交任務,且中斷正在執行任務 * 多說一句:若線程池處於 SHUTDOWN,但 firstTask 爲 null,且 workQueue 非空,是容許建立 worker 的 * */
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                // 若是成功,那麼就是全部建立線程前的條件校驗都知足了,準備建立線程執行任務
                // 這裏失敗的話,說明有其餘線程也在嘗試往線程池中建立線程
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                // 因爲有併發,從新再讀取一下 ctl
                c = ctl.get();  // Re-read ctl
                // 正常若是是 CAS 失敗的話,進到下一個裏層的for循環就能夠了
                // 可若是是由於其餘線程的操做,致使線程池的狀態發生了變動,若有其餘線程關閉了這個線程池
                // 那麼須要回到外層的for循環
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

     /* * * 到這裏,咱們認爲在當前這個時刻,能夠開始建立線程來執行任務 */
         
        // worker 是否已經啓動
        boolean workerStarted = false;
        // 是否已將這個 worker 添加到 workers 這個 HashSet 中
        boolean workerAdded = false;
        Worker w = null;
        try {
           // 把 firstTask 傳給 worker 的構造方法
            w = new Worker(firstTask);
            // 取 worker 中的線程對象,Worker的構造方法會調用 ThreadFactory 來建立一個新的線程
            final Thread t = w.thread;
            if (t != null) {
               //先加鎖
                final ReentrantLock mainLock = this.mainLock;
                // 這個是整個類的全局鎖,持有這個鎖才能讓下面的操做「瓜熟蒂落」,
                // 由於關閉一個線程池須要這個鎖,至少我持有鎖的期間,線程池不會被關閉
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    // 小於 SHUTTDOWN 即 RUNNING
                    // 若是等於 SHUTDOWN,不接受新的任務,可是會繼續執行等待隊列中的任務
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        // worker 裏面的 thread 不能是已啓動的
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        // 加到 workers 這個 HashSet 中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
               // 若添加成功
                if (workerAdded) {
                    // 啓動線程
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            // 若線程沒有啓動,作一些清理工做,若前面 workCount 加了 1,將其減掉
            if (! workerStarted)
                addWorkerFailed(w);
        }
        // 返回線程是否啓動成功
        return workerStarted;
    }
複製代碼

看下 addWorkFailed workers 中刪除掉相應的 worker,workCount 減 1
private void addWor

記錄 workers 中的個數的最大值,由於 workers 是不斷增長減小的,經過這個值能夠知道線程池的大小曾經達到的最大值

worker 中的線程 start 後,其 run 方法會調用 runWorker 繼續往下看 runWorker

// worker 線程啓動後調用,while 循環(即自旋!)不斷從等待隊列獲取任務並執行
// worker 初始化時,可指定 firstTask,那麼第一個任務也就能夠不須要從隊列中獲取
final void runWorker(Worker w) {
    Thread wt = Thread.currentThread();
    // 該線程的第一個任務(如有)
    Runnable task = w.firstTask;
    w.firstTask = null;
    // 容許中斷
    w.unlock(); 
 
    boolean completedAbruptly = true;
    try {
        // 循環調用 getTask 獲取任務
        while (task != null || (task = getTask()) != null) {
            w.lock();          
            // 若線程池狀態大於等於 STOP,那麼意味着該線程也要中斷
              /** * 若線程池STOP,請確保線程 已被中斷 * 若是沒有,請確保線程未被中斷 * 這須要在第二種狀況下進行從新檢查,以便在關中斷時處理shutdownNow競爭 */
            if ((runStateAtLeast(ctl.get(), STOP) ||
                 (Thread.interrupted() &&
                  runStateAtLeast(ctl.get(), STOP))) &&
                !wt.isInterrupted())
                wt.interrupt();
            try {
                // 這是一個鉤子方法,留給須要的子類實現
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    // 到這裏終於能夠執行任務了
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    // 這裏不容許拋出 Throwable,因此轉換爲 Error
                    thrown = x; throw new Error(x);
                } finally {
                    // 也是一個鉤子方法,將 task 和異常做爲參數,留給須要的子類實現
                    afterExecute(task, thrown);
                }
            } finally {
                // 置空 task,準備 getTask 下一個任務
                task = null;
                // 累加完成的任務數
                w.completedTasks++;
                // 釋放掉 worker 的獨佔鎖
                w.unlock();
            }
        }
        completedAbruptly = false;
    } finally {
        // 到這裏,須要執行線程關閉
        // 1. 說明 getTask 返回 null,也就是說,這個 worker 的使命結束了,執行關閉
        // 2. 任務執行過程當中發生了異常
        // 第一種狀況,已經在代碼處理了將 workCount 減 1,這個在 getTask 方法分析中說
        // 第二種狀況,workCount 沒有進行處理,因此須要在 processWorkerExit 中處理
        processWorkerExit(w, completedAbruptly);
    }
}
複製代碼

看看 getTask()

// 此方法有三種可能
// 1. 阻塞直到獲取到任務返回。默認 corePoolSize 以內的線程是不會被回收的,它們會一直等待任務
// 2. 超時退出。keepAliveTime 起做用的時候,也就是若是這麼多時間內都沒有任務,那麼應該執行關閉
// 3. 若是發生瞭如下條件,須返回 null
// 池中有大於 maximumPoolSize 個 workers 存在(經過調用 setMaximumPoolSize 進行設置)
// 線程池處於 SHUTDOWN,並且 workQueue 是空的,前面說了,這種再也不接受新的任務
// 線程池處於 STOP,不只不接受新的線程,連 workQueue 中的線程也再也不執行
private Runnable getTask() {
    boolean timedOut = false; // Did the last poll() time out?

   for (;;) {
   // 容許核心線程數內的線程回收,或當前線程數超過了核心線程數,那麼有可能發生超時關閉
 
            // 這裏 break,是爲了避免往下執行後一個 if (compareAndDecrementWorkerCount(c))
            // 兩個 if 一塊兒看:若是當前線程數 wc > maximumPoolSize,或者超時,都返回 null
            // 那這裏的問題來了,wc > maximumPoolSize 的狀況,爲何要返回 null?
            // 換句話說,返回 null 意味着關閉線程。
            // 那是由於有可能開發者調用了 setMaximumPoolSize 將線程池的 maximumPoolSize 調小了
        
            // 若是此 worker 發生了中斷,採起的方案是重試
            // 解釋下爲何會發生中斷,這個讀者要去看 setMaximumPoolSize 方法,
            // 若是開發者將 maximumPoolSize 調小了,致使其小於當前的 workers 數量,
            // 那麼意味着超出的部分線程要被關閉。從新進入 for 循環,天然會有部分線程會返回 null
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                // CAS 操做,減小工做線程數
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
            // 若是此 worker 發生了中斷,採起的方案是重試
            // 解釋下爲何會發生中斷,這個讀者要去看 setMaximumPoolSize 方法,
            // 若是開發者將 maximumPoolSize 調小了,致使其小於當前的 workers 數量,
            // 那麼意味着超出的部分線程要被關閉。從新進入 for 循環,天然會有部分線程會返回 null
                timedOut = false;
            }
        }
}
複製代碼

到這裏,基本上也說完了整個流程,回到 execute(Runnable command) 方法,看看各個分支,我把代碼貼過來一下:

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        //表示 「線程池狀態」 和 「線程數」 的整數
        int c = ctl.get();
        // 若是當前線程數少於核心線程數,直接添加一個 worker 執行任務,
        // 建立一個新的線程,並把當前任務 command 做爲這個線程的第一個任務(firstTask)
        if (workerCountOf(c) < corePoolSize) {
        // 添加任務成功,即結束
        // 執行的結果,會包裝到 FutureTask 
        // 返回 false 表明線程池不容許提交任務
            if (addWorker(command, true))
                return;
           
            c = ctl.get();
        }
        // 到這說明,要麼當前線程數大於等於核心線程數,要麼剛剛 addWorker 失敗
 
        // 若是線程池處於 RUNNING ,把這個任務添加到任務隊列 workQueue 中
        if (isRunning(c) && workQueue.offer(command)) {
            /* 若任務進入 workQueue,咱們是否須要開啓新的線程 * 線程數在 [0, corePoolSize) 是無條件開啓新線程的 * 若線程數已經大於等於 corePoolSize,則將任務添加到隊列中,而後進到這裏 */
            int recheck = ctl.get();
            // 若線程池不處於 RUNNING ,則移除已經入隊的這個任務,而且執行拒絕策略
            if (! isRunning(recheck) && remove(command))
                reject(command);
            // 若線程池仍是 RUNNING ,且線程數爲 0,則開啓新的線程
            // 這塊代碼的真正意圖:擔憂任務提交到隊列中了,可是線程都關閉了
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        // 若 workQueue 滿,到該分支
        // 以 maximumPoolSize 爲界建立新 worker,
        // 若失敗,說明當前線程數已經達到 maximumPoolSize,執行拒絕策略
        else if (!addWorker(command, false))
            reject(command);
    }
複製代碼

工做線程:線程池建立線程時,會將線程封裝成工做線程Worker,Worker在執行完任務後,還會循環獲取工做隊列裏的任務來執行.咱們能夠從Worker類的run()方法裏看到這點

public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);
        }
    }
 boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);

            final Thread t = w.thread;
            if (t != null) {
               //先加鎖
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }
複製代碼

線程池中的線程執行任務分兩種狀況

  • 在execute()方法中建立一個線程時,會讓這個線程執行當前任務
  • 這個線程執行完上圖中 1 的任務後,會反覆從BlockingQueue獲取任務來執行

線程池的使用

向線程池提交任務

可使用兩個方法向線程池提交任務

execute()

用於提交不須要返回值的任務,因此沒法判斷任務是否被線程池執行成功.經過如下代碼可知execute()方法輸入的任務是一個Runnable類的實例.

threadsPool.execute(new Runnable() {
            @Override
            public void run() {
                   // TODO Auto-generated method stub
            }
        });
複製代碼

從運行結果能夠看出,單線程池中的線程是順序執行的。固定線程池(參數爲2)中,永遠最多隻有兩個線程併發執行。緩存線程池中,全部線程都併發執行。 第二個例子,測試單線程調度線程池和固定調度線程池。

public class ScheduledThreadPoolExam {
    public static void main(String[] args) {
        //first test for singleThreadScheduledPool
        ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
        //second test for scheduledThreadPool
// ScheduledExecutorService scheduledPool = Executors.newScheduledThreadPool(2);
        for (int i = 0; i < 5; i++) {
            scheduledPool.schedule(new TaskInScheduledPool(i), 0, TimeUnit.SECONDS);
        }
        scheduledPool.shutdown();
    }
}

class TaskInScheduledPool implements Runnable {
    private final int id;

    TaskInScheduledPool(int id) {
        this.id = id;
    }

    @Override
    public void run() {
        try {
            for (int i = 0; i < 5; i++) {
                System.out.println("TaskInScheduledPool-["+id+"] is running phase-"+i);
                TimeUnit.SECONDS.sleep(1);
            }
            System.out.println("TaskInScheduledPool-["+id+"] is over");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
複製代碼

從運行結果能夠看出,單線程調度線程池和單線程池相似,而固定調度線程池和固定線程池相似。 總結:

  • 若是沒有特殊要求,使用緩存線程池老是合適的;
  • 若是隻能運行一個線程,就使用單線程池。
  • 若是要運行調度任務,則按需使用調度線程池或單線程調度線程池
  • 若是有其餘特殊要求,則能夠直接使用ThreadPoolExecutor類的構造函數來建立線程池,並本身給定那五個參數。

submit()

用於提交須要返回值的任務。線程池會返回一個future類型對象,經過此對象能夠判斷任務是否執行成功。 並可經過get()獲取返回值,get()會阻塞當前線程直到任務完成。而使用get(long timeout,TimeUnit unit)方法則會阻塞當前線程一段時間後當即返回,這時候可能任務沒有執行完。

Future<Object> future = executor.submit(harReturnValuetask);
     try {
         Object s = future.get();
     } catch (InterruptedException e) {
         // 處理中斷異常
     } catch (ExecutionException e) {
         // 處理沒法執行任務異常
     } finally {
         // 關閉線程池
         executor.shutdown();
     }
複製代碼
相關文章
相關標籤/搜索