細說線程池---高級篇

線程源碼分析

上一篇中已經講了線程池的原理。這一次來講說源碼執行過程。建議先看看細說線程池---入門篇
細說線程池---中級篇java

依然使用newFixedThreadPool()方法建立線程池。併發

看源碼從execute(Runnable runable)開始。app

   public void execute(Runnable command) {       if (command == null)           throw new NullPointerException();       int c = ctl.get();       ///1.當前池中線程比核心數少,新建一個線程執行任務       //workerCountOf計算出線程個數       if (workerCountOf(c) < corePoolSize) {           //線程池中建立一個線程worker           if (addWorker(command, true))               return;           c = ctl.get();       }       2.核心池已滿,但任務隊列未滿,添加到隊列中       if (isRunning(c) && workQueue.offer(command)) {           int recheck = ctl.get();           //任務成功添加到隊列之後,再次檢查是否須要添加新的線程,           // 由於已存在的線程可能被銷燬了           if (! isRunning(recheck) && remove(command))               若是線程池處於非運行狀態,               // 而且把當前的任務從任務隊列中               //移除成功,則拒絕該任務               reject(command);           若是以前的線程已被銷燬完,新建一個線程           else if (workerCountOf(recheck) == 0)               addWorker(null, false);       }       3.核心池已滿,隊列已滿,試着建立一個新線程       else if (!addWorker(command, false))           若是建立新線程失敗了,說明線程池被關閉或者線程池徹底滿了, 拒絕任務           reject(command);   }

ctl 的做用ide

在線程池中,ctl 貫穿在線程池的整個生命週期中函數

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING,0));

它是一個原子類,主要做用是用來保存線程數量和線程池的狀態。咱們來分析一下這段代碼,其實比較有意思,他用到了位運算一個 int 數值是 32 個 bit 位,這裏採用高 3 位來保存運行狀態,低 29 位來保存線程數量。咱們來分析默認狀況下,也就是 ctlOf(RUNNING)運行狀態,調用了 ctlOf(int rs,int wc)方法;其中oop

private static int ctlOf(int rs, int wc) { return rs | wc; }

其中 RUNNING =-1 << COUNT_BITS ;-1 左移 29 位,源碼分析

-1 的二進制是 32 個 1(1111 1111 11111111 1111 1111 1111 1111);

-1 的二進制計算方法原碼是 1000…001 . 高位 1 表示符號位。而後對原碼取反,高位不變獲得 1111…110而後對反碼進行+1 ,也就是補碼操做, 最後獲得 1111…1111ui

那麼-1 <<左移 29 位, 也就是 【111】 表示;rs | wc 。二進制的 111 | 000 。this

獲得的結果仍然是 111。spa

那麼同理可得其餘的狀態的 bit 位表示

//32-3private static final int COUNT_BITS = Integer.SIZE - 3;//將 1 的二進制向右位移 29 位,再減 1 表示最大線程容量private static final int CAPACITY = (1 << COUNT_BITS) - 1;// 運行狀態保存在 int 值的高 3 位 ( 全部數值左移 29 位 )// 接收新任務,並執行隊列中的任務private static final int RUNNING = -1 << COUNT_BITS;// 不接收新任務,可是執行隊列中的任務private static final int SHUTDOWN = 0 << COUNT_BITS;// 不接收新任務,不執行隊列中的任務,中斷正在執行中的任務private static final int STOP = 1 << COUNT_BITS;// 全部的任務都已結束,線程數量爲 0,處於該狀態的線程池即將調用 terminated()方法private static final int TIDYING = 2 << COUNT_BITS;// terminated()方法執行完成private static final int TERMINATED = 3 << COUNT_BITS;

線程池狀態變化圖

addWorker

再回到上面源碼中,當線程池中線程數小於核心線程數的時候:會調用 addWorker,顧名思義,其實就是要建立一個工做線程。咱們來看看源碼的實現源碼比較長,看起來比較唬人,其實就作了兩件事。

  1. 才用循環 CAS操做來將線程數加 1

  2. 新建一個線程並啓用

 private boolean addWorker(Runnable firstTask, boolean core) {       //goto 語句,避免死循環       retry:       for (;;) {           int c = ctl.get();           int rs = runStateOf(c);           //若是線程處於非運行狀態,而且 rs 不等於 SHUTDOWN 且 firstTask 不等於空且且          // workQueue 爲空,直接返回 false (表示不可添加 work 狀態)         //  1.  線程池已經 shutdown 後,還要添加新的任務,拒絕          // 2.  (第二個判斷) SHUTDOWN 狀態不接受新任務,但仍然會執行已經加入任務隊列的任          // 務,因此當進入 SHUTDOWN 狀態,而傳進來的任務爲空,而且任務隊列不爲空的時候,是容許添加          // 新線程的 , 若是把這個條件取反,就表示不容許添加 worker           if (rs >= SHUTDOWN &&                   ! (rs == SHUTDOWN &&                           firstTask == null &&                           ! workQueue.isEmpty()))               return false;           for (;;) { //自旋               int wc = workerCountOf(c);//得到 Worker 工做線程數               //若是工做線程數大於默認容量大小或者大於核心線程數大小,               // 則直接返回 false 表示不能再添加 worker。               if (wc >= CAPACITY ||                       wc >= (core ? corePoolSize : maximumPoolSize))                   return false;               //經過 cas 來增長工做線程數,               if (compareAndIncrementWorkerCount(c))               //若是 cas 失敗,則直接重試               break retry;               // 再次獲取 ctl 的值               c = ctl.get();               //這裏若是不想等,說明線程的狀態發生了變化,繼續重試               if (runStateOf(c) != rs)               continue retry;           }       }       //上面這段代碼主要是對 worker 數量作原子+1 操做,       // 下面的邏輯纔是正式構建一個 worker       boolean workerStarted = false; //工做線程是否啓動的標識       boolean workerAdded = false; //工做線程是否已經添加成功的標識       Worker w = null;       try {           //構建一個 Worker,這個 worker 是什麼呢?           //咱們 能夠看到構造方法裏面傳入了一個 Runnable 對象           w = new Worker(firstTask);           final Thread t = w.thread; //從 worker 對象中取出線程           if (t != null) {               final ReentrantLock mainLock = this.mainLock;               mainLock.lock(); //這裏有個重入鎖,避免併發問題               try {                   int rs = runStateOf(ctl.get());                  //只有當前線程池是正在運行狀態,[或是 SHUTDOWN                   // 且 firstTask 爲空],才能添加到 workers 集合中                   if (rs < SHUTDOWN ||                           (rs == SHUTDOWN && firstTask == null)) {                   //任務剛封裝到 work 裏面,還沒 start,你封裝的線程就是 alive,                   // 幾個意思?確定是要拋異常出去的                       if (t.isAlive()) // precheck that t is startable                           throw new IllegalThreadStateException();                       workers.add(w); //將新建立的 Worker 添加到 workers 集合中                       int s = workers.size();                     //若是集合中的工做線程數大於最大線程數,                     // 這個最大線程數表示線程池曾經出現過的最大線程數                       if (s > largestPoolSize)                           largestPoolSize = s; //更新線程池出現過的最大線程數                       workerAdded = true;//表示工做線程建立成功了                   }               } finally {                   mainLock.unlock(); //釋放鎖               }               if (workerAdded) {//若是 worker 添加成功                   t.start();//啓動線程                   workerStarted = true;               }           }       } finally {           if (! workerStarted)               //若是添加失敗,就須要作一件事,就是遞減實際工做線               //程數(還記得咱們最開始的時候增長了工做線程數嗎)               addWorkerFailed(w);       }       //返回結果       return workerStarted;   }

Worker說明

咱們發現 addWorker 方法只是構造了一個 Worker,而且把 firstTask 封裝到 worker 中,它是作什麼的呢?咱們來看看

  1. 每一個 worker,都是一條線程,同時裏面包含了一個 firstTask,即初始化時要被首先執行的任務.

  2. 最終執行任務的,是 runWorker()方法Worker 類繼承了 AQS,並實現了 Runnable 接口,注意其中的 firstTask 和 thread 屬性:firstTask 用它來保存傳入的任務;thread 是在調用構造方法時經過 ThreadFactory 來建立的線程,是用來處理任務的線程。

在調用構造方法時,須要傳入任務,這裏經過 getThreadFactory().newThread(this);來新建一個線程,newThread 方法傳入的參數是 this,由於 Worker 自己繼承了 Runnable 接口,也就是一個線程,因此一個 Worker 對象在啓動的時候會調用 Worker 類中的 run 方法。Worker 繼承了 AQS,使用 AQS 來實現獨佔鎖的功能。爲何不使用 ReentrantLock 來實現呢?能夠看到 tryAcquire 方法,它是不容許重入的,而 ReentrantLock 是容許重入的:lock 方法一旦獲取了獨佔鎖,表示當前線程正在執行任務中;那麼它會有如下幾個做用

  1. 若是正在執行任務,則不該該中斷線程;

  2. 若是該線程如今不是獨佔鎖的狀態,也就是空閒的狀態,說明它沒有在處理任務,這時能夠對該線程進行中斷;

  3. 線程池在執行 shutdown 方法或 tryTerminate 方法時會調用 interruptIdleWorkers 方法來中斷空閒的線程,interruptIdleWorkers 方法會使用 tryLock 方法來判斷線程池中的線程是不是空閒狀態

  4. 之因此設置爲不可重入,是由於咱們不但願任務在調用像 setCorePoolSize 這樣的線程池控制方法時從新獲取鎖,這樣會中斷正在運行的線程

//繼承了AQS還實現了Runnableprivate final class Worker           extends AbstractQueuedSynchronizer           implements Runnable   {       /**         * This class will never be serialized, but we provide a         * serialVersionUID to suppress a javac warning.         */       private static final long serialVersionUID = 6138294804551838833L;       //注意了,這纔是真正執行task的線程,從構造函數可知是由        //ThreadFactury 建立的       final Thread thread;       //這就是須要執行的 task       Runnable firstTask;       /** 每一個線程完成任務的計數器*/       volatile long completedTasks;       /**         * Creates with given first task and thread from ThreadFactory.         * @param firstTask the first task (null if none)         */       Worker(Runnable firstTask) {            初始狀態 -1, 防止在調用 runWorker() ,           // 也就是真正執行task前中斷thread 。           setState(-1);             this.firstTask = firstTask;           this.thread = getThreadFactory().newThread(this);       }       /** Delegates main run loop to outer runWorker  */       public void run() {           runWorker(this);       }       // Lock methods       //       // The value 0 represents the unlocked state.       // The value 1 represents the locked state.       protected boolean isHeldExclusively() {           return getState() != 0;       }       protected boolean tryAcquire(int unused) {           if (compareAndSetState(0, 1)) {               setExclusiveOwnerThread(Thread.currentThread());               return true;           }           return false;       }       protected boolean tryRelease(int unused) {           setExclusiveOwnerThread(null);           setState(0);           return true;       }       public void lock()        { acquire(1); }       public boolean tryLock()  { return tryAcquire(1); }       public void unlock()      { release(1); }       public boolean isLocked() { return isHeldExclusively(); }       void interruptIfStarted() {           Thread t;           if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {               try {                   t.interrupt();               } catch (SecurityException ignore) {               }           }       }   }

addWorkerFailed方法

addWorker方法中,若是添加 Worker 而且啓動線程失敗,則會作失敗後的處理。這個方法主要作兩件事

  1. 若是 worker 已經構造好了,則從 workers 集合中移除這個 worker

  2. 原子遞減核心線程數(由於在 addWorker 方法中先作了原子增長)

  3. 嘗試結束線程池

   private void addWorkerFailed(java.util.concurrent.ThreadPoolExecutor.Worker w) {       final ReentrantLock mainLock = this.mainLock;       //上鎖       mainLock.lock();       try {           if (w != null)               workers.remove(w);           decrementWorkerCount();           tryTerminate();       } finally {           //釋放鎖           mainLock.unlock();       }   }

runWorker 方法

前面已經瞭解了 ThreadPoolExecutor的核心方法 addWorker,主要做用是增長工做線程,而 Worker 簡單理解其實就是一個線程,裏面從新了 run 方法,這塊是線程池中執行任務的真正處理邏輯,也就是 runWorker方法,這個方法主要作幾件事:

  1. 若是 task 不爲空,則開始執行 task

  2. 若是 task 爲空,則經過 getTask()再去取任務,並賦值給 task,若是取到的 Runnable 不爲空,則執行該任務

  3. 執行完畢後,經過 while 循環繼續 getTask()取任務

  4. 若是 getTask()取到的任務依然是空,那麼整個runWorker()方法執行完畢

相關文章
相關標籤/搜索