線程池小結(JDK8)

 一、線程池的好處

  1. 下降資源消耗(重複利用已建立的線程減小建立和銷燬線程的開銷)
  2. 提升響應速度(無須建立線程)
  3. 提升線程的可管理性

二、相關類圖

JDK5之後將工做單元和執行機制分離開來,工做單元包括Runnable和Callable;執行機制由Executor框架提供,管理線程的生命週期,將任務的提交和如何執行進行解耦。Executors是一個快速獲得線程池的工具類,相關的類圖以下所示:java

 

三、Executor框架接口

Executor接口數組

Executor接口只有一個execute方法,用來替代一般建立或啓動線程的方法。框架

public interface Executor {
    void execute(Runnable command);
}

ExecutorService接口異步

ExecutorService接口繼承自Executor接口,加入了關閉方法、submit方法和對Callable、Future的支持。ide

ScheduledExecutorService接口工具

 ScheduledExecutorService擴展ExecutorService接口並加入了對定時任務的支持。oop

四、ThreadPoolExecutor分析

ThreadPoolExecutor繼承自AbstractExecutorService,也是實現了ExecutorService接口。ui

 4.1 內部狀態this

 1     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 2     private static final int COUNT_BITS = Integer.SIZE - 3;
 3     private static final int CAPACITY   = (1 << COUNT_BITS) - 1;
 4 
 5     // runState is stored in the high-order bits
 6     private static final int RUNNING    = -1 << COUNT_BITS;
 7     private static final int SHUTDOWN   =  0 << COUNT_BITS;
 8     private static final int STOP       =  1 << COUNT_BITS;
 9     private static final int TIDYING    =  2 << COUNT_BITS;
10     private static final int TERMINATED =  3 << COUNT_BITS;
11 
12     // Packing and unpacking ctl
13     private static int runStateOf(int c)     { return c & ~CAPACITY; }
14     private static int workerCountOf(int c)  { return c & CAPACITY; }
15     private static int ctlOf(int rs, int wc) { return rs | wc; }

ctl是對線程池的運行狀態(高3位)和線程池中有效線程的數量(低29位)進行控制的一個字段。線程池有五種狀態,分別是:atom

  1. RUNNING:-1 << COUNT_BITS,即高3位爲111,該狀態的線程池會接收新任務,並處理阻塞隊列中的任務;
  2. SHUTDOWN: 0 << COUNT_BITS,即高3位爲000,該狀態的線程池不會接收新任務,但會處理阻塞隊列中的任務;
  3. STOP : 1 << COUNT_BITS,即高3位爲001,該狀態的線程不會接收新任務,也不會處理阻塞隊列中的任務,並且會中斷正在運行的任務;
  4. TIDYING : 2 << COUNT_BITS,即高3位爲010, 全部的任務都已經終止;
  5. TERMINATED: 3 << COUNT_BITS,即高3位爲011, terminated()方法已經執行完成。

 

4.2 構造方法

構造方法有4個,這裏只列出其中最基礎的一個。

 1     public ThreadPoolExecutor(int corePoolSize,
 2                               int maximumPoolSize,
 3                               long keepAliveTime,
 4                               TimeUnit unit,
 5                               BlockingQueue<Runnable> workQueue,
 6                               ThreadFactory threadFactory,
 7                               RejectedExecutionHandler handler) {
 8         if (corePoolSize < 0 ||
 9             maximumPoolSize <= 0 ||
10             maximumPoolSize < corePoolSize ||
11             keepAliveTime < 0)
12             throw new IllegalArgumentException();
13         if (workQueue == null || threadFactory == null || handler == null)
14             throw new NullPointerException();
15         this.corePoolSize = corePoolSize;
16         this.maximumPoolSize = maximumPoolSize;
17         this.workQueue = workQueue;
18         this.keepAliveTime = unit.toNanos(keepAliveTime);
19         this.threadFactory = threadFactory;
20         this.handler = handler;
21     }

構造方法中參數的含義以下:

  • corePoolSize:核心線程數量,線程池中應該常駐的線程數量
  • maximumPoolSize:線程池容許的最大線程數,非核心線程在超時以後會被清除
  • keepAliveTime:線程沒有任務執行時能夠保持的時間
  • unit:時間單位
  • workQueue:阻塞隊列,存儲等待執行的任務。JDK提供了以下4種阻塞隊列:
    • ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO排序任務;
    • LinkedBlockingQuene:基於鏈表結構的阻塞隊列,按FIFO排序任務,吞吐量一般要高於ArrayBlockingQuene;
    • SynchronousQuene:一個不存儲元素的阻塞隊列,每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQuene;
    • PriorityBlockingQuene:具備優先級的無界阻塞隊列;
  • threadFactory:線程工廠,來建立線程
  • handler:線程池的飽和策略。若是阻塞隊列滿了而且沒有空閒的線程,這時若是繼續提交任務,就須要採起一種策略處理該任務。線程池提供了4種策略:
    • AbortPolicy:直接拋出異常,這是默認策略;
    • CallerRunsPolicy:用調用者所在的線程來執行任務;
    • DiscardOldestPolicy:丟棄阻塞隊列中靠最前的任務,並執行當前任務;
    • DiscardPolicy:直接丟棄任務。

4.3 execute方法

ThreadPoolExecutor.execute(task)實現了Executor.execute(task),用來提交任務,不能獲取返回值,代碼以下:

 1     public void execute(Runnable command) {
 2         if (command == null)
 3             throw new NullPointerException();
 4         /*
 5          * Proceed in 3 steps:
 6          *
 7          * 1. If fewer than corePoolSize threads are running, try to
 8          * start a new thread with the given command as its first
 9          * task.  The call to addWorker atomically checks runState and
10          * workerCount, and so prevents false alarms that would add
11          * threads when it shouldn't, by returning false.
12          *
13          * 2. If a task can be successfully queued, then we still need
14          * to double-check whether we should have added a thread
15          * (because existing ones died since last checking) or that
16          * the pool shut down since entry into this method. So we
17          * recheck state and if necessary roll back the enqueuing if
18          * stopped, or start a new thread if there are none.
19          *
20          * 3. If we cannot queue task, then we try to add a new
21          * thread.  If it fails, we know we are shut down or saturated
22          * and so reject the task.
23          */
24         int c = ctl.get();
25     /*
26      * workerCountOf方法取出低29位的值,表示當前活動的線程數;
27      * 若是當前活動線程數小於corePoolSize,則新建一個線程放入線程池中;
28      * 並把任務添加到該線程中。
29      */
30     
31         if (workerCountOf(c) < corePoolSize) {
32         /*
33          * addWorker中的第二個參數表示限制添加線程的數量是根據corePoolSize來判斷仍是maximumPoolSize來判斷;
34          * 若是爲true,根據corePoolSize來判斷;
35          * 若是爲false,則根據maximumPoolSize來判斷
36          */
37             if (addWorker(command, true))
38                 return;
39         /*
40          * 若是添加失敗,則從新獲取ctl值
41          */
42             c = ctl.get();
43         }
44     /*
45      * 線程池處於RUNNING狀態,把提交的任務成功放入阻塞隊列中
46      */
47         if (isRunning(c) && workQueue.offer(command)) {
48     // 從新獲取ctl值
49             int recheck = ctl.get();
50         // 再次判斷線程池的運行狀態,若是不是運行狀態,因爲以前已經把command添加到workQueue中了,
51         // 這時須要移除該command
52         // 執行事後經過handler使用拒絕策略對該任務進行處理,整個方法返回
53             if (! isRunning(recheck) && remove(command))
54                 reject(command);
55         /*
56          * 獲取線程池中的有效線程數,若是數量是0,則執行addWorker方法
57          * 這裏傳入的參數表示:
58          * 1. 第一個參數爲null,表示在線程池中建立一個線程,但不去啓動;
59          * 2. 第二個參數爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize,添加線程時根據maximumPoolSize來判斷;
60          * 若是判斷workerCount大於0,則直接返回,在workQueue中新增的command會在未來的某個時刻被執行。
61          */
62             else if (workerCountOf(recheck) == 0)
63                 addWorker(null, false);
64         }
65     /*
66      * 若是執行到這裏,有兩種狀況:
67      * 1. 線程池已經不是RUNNING狀態;
68      * 2. 線程池是RUNNING狀態,但workerCount >= corePoolSize而且workQueue已滿。
69      * 這時,再次調用addWorker方法,但第二個參數傳入爲false,將線程池的有限線程數量的上限設置爲maximumPoolSize;
70      * 若是失敗則拒絕該任務
71      */
72         else if (!addWorker(command, false))
73             reject(command);
74     }

若是線程池狀態一直是RUNNING,則執行過程以下:

  1. 若是workerCount < corePoolSize,則建立並啓動一個線程來執行新提交的任務;
  2. 若是workerCount >= corePoolSize,且線程池內的阻塞隊列未滿,則將任務添加到該阻塞隊列中;
  3. 若是workerCount >= corePoolSize && workerCount < maximumPoolSize,且線程池內的阻塞隊列已滿,則建立並啓動一個線程來執行新提交的任務;
  4. 若是workerCount >= maximumPoolSize,而且線程池內的阻塞隊列已滿, 則根據拒絕策略來處理該任務, 默認的處理方式是直接拋異常。

 

4.4 addWorker方法

從executor的方法實現能夠看出,addWorker主要負責建立新的線程並執行任務。線程池建立新線程執行任務時,須要獲取全局鎖:

 

 1     private boolean addWorker(Runnable firstTask, boolean core) {
 2         retry:
 3         for (;;) {
 4             int c = ctl.get();
 5             // 獲取運行狀態
 6             int rs = runStateOf(c);
 7         /*
 8          * 這個if判斷
 9          * 若是rs >= SHUTDOWN,則表示此時再也不接收新任務;
10          * 接着判斷如下3個條件,只要有1個不知足,則返回false:
11          * 1. rs == SHUTDOWN,這時表示關閉狀態,再也不接受新提交的任務,但卻能夠繼續處理阻塞隊列中已保存的任務
12          * 2. firsTask爲空
13          * 3. 阻塞隊列不爲空
14          *
15          * 首先考慮rs == SHUTDOWN的狀況
16          * 這種狀況下不會接受新提交的任務,因此在firstTask不爲空的時候會返回false;
17          * 而後,若是firstTask爲空,而且workQueue也爲空,則返回false,
18          * 由於隊列中已經沒有任務了,不須要再添加線程了
19          */
20             // Check if queue empty only if necessary.
21             if (rs >= SHUTDOWN &&
22                 ! (rs == SHUTDOWN &&
23                    firstTask == null &&
24                    ! workQueue.isEmpty()))
25                 return false;
26 
27             for (;;) {
28                 // 獲取線程數
29                 int wc = workerCountOf(c);
30               // 若是wc超過CAPACITY,也就是ctl的低29位的最大值(二進制是29個1),返回false;
31               // 這裏的core是addWorker方法的第二個參數,若是爲true表示根據corePoolSize來比較,
32               // 若是爲false則根據maximumPoolSize來比較。
33                 if (wc >= CAPACITY ||
34                     wc >= (core ? corePoolSize : maximumPoolSize))
35                     return false;
36                // 嘗試增長workerCount,若是成功,則跳出第一個for循環
37                 if (compareAndIncrementWorkerCount(c))
38                     break retry;
39               // 若是增長workerCount失敗,則從新獲取ctl的值
40                 c = ctl.get();  // Re-read ctl
41               // 若是當前的運行狀態不等於rs,說明狀態已被改變,返回第一個for循環繼續執行
42                 if (runStateOf(c) != rs)
43                     continue retry;
44                 // else CAS failed due to workerCount change; retry inner loop
45             }
46         }
47 
48         boolean workerStarted = false;
49         boolean workerAdded = false;
50         Worker w = null;
51         try {
52             // 根據firstTask來建立Worker對象
53             w = new Worker(firstTask);
54             // 每個Worker對象都會建立一個線程
55             final Thread t = w.thread;
56             if (t != null) {
57                 final ReentrantLock mainLock = this.mainLock;
58                 mainLock.lock();
59                 try {
60                     // Recheck while holding lock.
61                     // Back out on ThreadFactory failure or if
62                     // shut down before lock acquired.
63                     int rs = runStateOf(ctl.get());
64                   // rs < SHUTDOWN表示是RUNNING狀態;
65                   // 若是rs是RUNNING狀態或者rs是SHUTDOWN狀態而且firstTask爲null,向線程池中添加線程。
66                   // 由於在SHUTDOWN時不會在添加新的任務,但仍是會執行workQueue中的任務
67                     if (rs < SHUTDOWN ||
68                         (rs == SHUTDOWN && firstTask == null)) {
69                         if (t.isAlive()) // precheck that t is startable
70                             throw new IllegalThreadStateException();
71                 // workers是一個HashSet
72                         workers.add(w);
73                         int s = workers.size();
74                 // largestPoolSize記錄着線程池中出現過的最大線程數量
75                         if (s > largestPoolSize)
76                             largestPoolSize = s;
77                         workerAdded = true;
78                     }
79                 } finally {
80                     mainLock.unlock();
81                 }
82                 if (workerAdded) {
83             // 啓動線程,執行任務(Worker.thread(firstTask).start());
84             //啓動時會調用Worker類中的run方法,Worker自己實現了Runnable接口,因此一個Worker類型的對象也是一個線程。
85                     t.start();
86                     workerStarted = true;
87                 }
88             }
89         } finally {
90             if (! workerStarted)
91                 addWorkerFailed(w);
92         }
93         return workerStarted;
94     }

4.5 Worker類

線程池中的每個線程被封裝成一個Worker對象,ThreadPool維護的其實就是一組Worker對象。Worker類設計以下:

  1. 繼承了AQS類,用於判斷線程是否空閒以及是否能夠被中斷,能夠方便的實現工做線程的停止操做;
  2. 實現了Runnable接口,能夠將自身做爲一個任務在工做線程中執行;
  3. 當前提交的任務firstTask做爲參數傳入Worker的構造方法;
 1     private final class Worker
 2         extends AbstractQueuedSynchronizer
 3         implements Runnable
 4     {
 5         /**
 6          * This class will never be serialized, but we provide a
 7          * serialVersionUID to suppress a javac warning.
 8          */
 9         private static final long serialVersionUID = 6138294804551838833L;
10 
11         /** Thread this worker is running in.  Null if factory fails. */
12         final Thread thread;
13         /** Initial task to run.  Possibly null. */
14         Runnable firstTask;
15         /** Per-thread task counter */
16         volatile long completedTasks;
17 
18         /**
19          * Creates with given first task and thread from ThreadFactory.
20          * @param firstTask the first task (null if none)
21          */
22         Worker(Runnable firstTask) {
23             setState(-1); // inhibit interrupts until runWorker
24             this.firstTask = firstTask;
25             this.thread = getThreadFactory().newThread(this);
26         }
27 
28         /** Delegates main run loop to outer runWorker  */
29         public void run() {
30             runWorker(this);
31         }
32 
33         // Lock methods
34         //
35         // The value 0 represents the unlocked state.
36         // The value 1 represents the locked state.
37 
38         protected boolean isHeldExclusively() {
39             return getState() != 0;
40         }
41 
42         protected boolean tryAcquire(int unused) {
43             if (compareAndSetState(0, 1)) {
44                 setExclusiveOwnerThread(Thread.currentThread());
45                 return true;
46             }
47             return false;
48         }
49 
50         protected boolean tryRelease(int unused) {
51             setExclusiveOwnerThread(null);
52             setState(0);
53             return true;
54         }
55 
56         public void lock()        { acquire(1); }
57         public boolean tryLock()  { return tryAcquire(1); }
58         public void unlock()      { release(1); }
59         public boolean isLocked() { return isHeldExclusively(); }
60 
61         void interruptIfStarted() {
62             Thread t;
63             if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
64                 try {
65                     t.interrupt();
66                 } catch (SecurityException ignore) {
67                 }
68             }
69         }
70     }

4.6 runWorker方法

 Worker類中的run方法調用了runWorker方法來執行任務,執行過程以下:

  1. 線程啓動以後,經過unlock方法釋放鎖,設置AQS的state爲0,表示運行可中斷;
  2. Worker執行firstTask或從workQueue中獲取任務:
    1. 進行加鎖操做,保證thread不被其餘線程中斷(除非線程池被中斷)
    2. 檢查線程池狀態,假若線程池處於中斷狀態,當前線程將中斷。
    3. 執行beforeExecute
    4. 執行任務的run方法
    5. 執行afterExecute方法
    6. 解鎖操做
 1     final void runWorker(Worker w) {
 2         Thread wt = Thread.currentThread();
 3         // 獲取第一個任務
 4         Runnable task = w.firstTask;
 5         w.firstTask = null;
 6         // 容許中斷
 7         w.unlock(); // allow interrupts
 8         boolean completedAbruptly = true;
 9         try {
10         // 若是task爲空,則經過getTask來獲取任務
11             while (task != null || (task = getTask()) != null) {
12                 w.lock();
13                 // If pool is stopping, ensure thread is interrupted;
14                 // if not, ensure thread is not interrupted.  This
15                 // requires a recheck in second case to deal with
16                 // shutdownNow race while clearing interrupt
17                 if ((runStateAtLeast(ctl.get(), STOP) ||
18                      (Thread.interrupted() &&
19                       runStateAtLeast(ctl.get(), STOP))) &&
20                     !wt.isInterrupted())
21                     wt.interrupt();
22                 try {
23                     beforeExecute(wt, task);
24                     Throwable thrown = null;
25                     try {
26                         task.run();
27                     } catch (RuntimeException x) {
28                         thrown = x; throw x;
29                     } catch (Error x) {
30                         thrown = x; throw x;
31                     } catch (Throwable x) {
32                         thrown = x; throw new Error(x);
33                     } finally {
34                         afterExecute(task, thrown);
35                     }
36                 } finally {
37                     task = null;
38                     w.completedTasks++;
39                     w.unlock();
40                 }
41             }
42             completedAbruptly = false;
43         } finally {
44             processWorkerExit(w, completedAbruptly);
45         }
46     }

4.7 getTask方法

 getTask方法用來從阻塞隊列中取等待的任務

 1     private Runnable getTask() {
 2         boolean timedOut = false; // Did the last poll() time out?
 3 
 4         for (;;) {
 5             int c = ctl.get();
 6             int rs = runStateOf(c);
 7 
 8             // Check if queue empty only if necessary.
 9             if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
10                 decrementWorkerCount();
11                 return null;
12             }
13 
14             int wc = workerCountOf(c);
15 
16             // Are workers subject to culling?
17         // timed變量用於判斷是否須要進行超時控制。
18         // allowCoreThreadTimeOut默認是false,也就是核心線程不容許進行超時;
19         // wc > corePoolSize,表示當前線程池中的線程數量大於核心線程數量;
20         // 對於超過核心線程數量的這些線程,須要進行超時控制    
21             boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
22 
23             if ((wc > maximumPoolSize || (timed && timedOut))
24                 && (wc > 1 || workQueue.isEmpty())) {
25                 if (compareAndDecrementWorkerCount(c))
26                     return null;
27                 continue;
28             }
29 
30             try {
31             /*
32              * 根據timed來判斷,若是爲true,則經過阻塞隊列的poll方法進行超時控制,若是在keepAliveTime時間內沒有獲取到任務,則返回null;
33              * 不然經過take方法,若是這時隊列爲空,則take方法會阻塞直到隊列不爲空。
34              *
35              */
36                 Runnable r = timed ?
37                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
38                     workQueue.take();
39                 if (r != null)
40                     return r;
41                 timedOut = true;
42             } catch (InterruptedException retry) {
43                 timedOut = false;
44             }
45         }
46     }

5 任務的提交

  • submit任務,等待線程池execute
  • 執行FutureTask類的get方法時,會把主線程封裝成WaitNode節點並保存在waiters鏈表中, 並阻塞等待運行結果;
  • FutureTask任務執行完成後,經過UNSAFE設置waiters相應的waitNode爲null,並經過LockSupport類unpark方法喚醒主線程。
 1 public class Test{
 2 
 3     public static void main(String[] args) {
 4 
 5         ExecutorService es = Executors.newCachedThreadPool();
 6         Future<String> future = es.submit(new Callable<String>() {
 7             @Override
 8             public String call() throws Exception {
 9                 try {
10                     TimeUnit.SECONDS.sleep(2);
11                 } catch (InterruptedException e) {
12                     e.printStackTrace();
13                 }
14                 return "future result";
15             }
16         });
17         try {
18             String result = future.get();
19             System.out.println(result);
20         } catch (Exception e) {
21             e.printStackTrace();
22         }
23     }
24 }

在實際業務場景中,Future和Callable基本是成對出現的,Callable負責產生結果,Future負責獲取結果。

  • Callable接口相似於Runnable,只是Runnable沒有返回值。
  • Callable任務除了返回正常結果以外,若是發生異常,該異常也會被返回,即Future能夠拿到異步執行任務各類結果;
  • Future.get方法會致使主線程阻塞,直到Callable任務執行完成;

 5.1 submit方法

AbstractExecutorService.submit()實現了ExecutorService.submit(),能夠得到執行完的返回值。而ThreadPoolExecutor是AbstractExecutorService的子類,因此submit方法也是ThreadPoolExecutor的方法。

 1     public Future<?> submit(Runnable task) {
 2         if (task == null) throw new NullPointerException();
 3         RunnableFuture<Void> ftask = newTaskFor(task, null);
 4         execute(ftask);
 5         return ftask;
 6     }
 7     public <T> Future<T> submit(Runnable task, T result) {
 8         if (task == null) throw new NullPointerException();
 9         RunnableFuture<T> ftask = newTaskFor(task, result);
10         execute(ftask);
11         return ftask;
12     }
13     public <T> Future<T> submit(Callable<T> task) {
14         if (task == null) throw new NullPointerException();
15         RunnableFuture<T> ftask = newTaskFor(task);
16         execute(ftask);
17         return ftask;
18     }

經過submit方法提交的Callable或者Runnable任務會被封裝成了一個FutureTask對象。經過Executor.execute方法提交FutureTask到線程池中等待被執行,最終執行的是FutureTask的run方法。

5.2 FutureTask對象

類圖

內部狀態

    /**
     *...
     * Possible state transitions:
     * NEW -> COMPLETING -> NORMAL
     * NEW -> COMPLETING -> EXCEPTIONAL
     * NEW -> CANCELLED
     * NEW -> INTERRUPTING -> INTERRUPTED
     */
    private volatile int state;
    private static final int NEW          = 0;
    private static final int COMPLETING   = 1;
    private static final int NORMAL       = 2;
    private static final int EXCEPTIONAL  = 3;
    private static final int CANCELLED    = 4;
    private static final int INTERRUPTING = 5;
    private static final int INTERRUPTED  = 6;

內部狀態的修改經過sun.misc.Unsafe修改。

get方法

1     public V get() throws InterruptedException, ExecutionException {
2         int s = state;
3         if (s <= COMPLETING)
4             s = awaitDone(false, 0L);
5         return report(s);
6     }

內部經過awaitDone方法對主線程進行阻塞,具體實現以下:

 1     /**
 2      * Awaits completion or aborts on interrupt or timeout.
 3      *
 4      * @param timed true if use timed waits
 5      * @param nanos time to wait, if timed
 6      * @return state upon completion
 7      */
 8     private int awaitDone(boolean timed, long nanos)
 9         throws InterruptedException {
10         final long deadline = timed ? System.nanoTime() + nanos : 0L;
11         WaitNode q = null;
12         boolean queued = false;
13         for (;;) {
14             if (Thread.interrupted()) {
15                 removeWaiter(q);
16                 throw new InterruptedException();
17             }
18 
19             int s = state;
20             if (s > COMPLETING) {
21                 if (q != null)
22                     q.thread = null;
23                 return s;
24             }
25             else if (s == COMPLETING) // cannot time out yet
26                 Thread.yield();
27             else if (q == null)
28                 q = new WaitNode();
29             else if (!queued)
30                 queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
31                                                      q.next = waiters, q);
32             else if (timed) {
33                 nanos = deadline - System.nanoTime();
34                 if (nanos <= 0L) {
35                     removeWaiter(q);
36                     return state;
37                 }
38                 LockSupport.parkNanos(this, nanos);
39             }
40             else
41                 LockSupport.park(this);
42         }
43     }
  1. 若是主線程被中斷,則拋出中斷異常;
  2. 判斷FutureTask當前的state,若是大於COMPLETING,說明任務已經執行完成,則直接返回;
  3. 若是當前state等於COMPLETING,說明任務已經執行完,這時主線程只需經過yield方法讓出cpu資源,等待state變成NORMAL;
  4. 經過WaitNode類封裝當前線程,並經過UNSAFE添加到waiters鏈表;
  5. 最終經過LockSupport的park或parkNanos掛起線程。

run方法

 1     public void run() {
 2         if (state != NEW ||
 3             !UNSAFE.compareAndSwapObject(this, runnerOffset,
 4                                          null, Thread.currentThread()))
 5             return;
 6         try {
 7             Callable<V> c = callable;
 8             if (c != null && state == NEW) {
 9                 V result;
10                 boolean ran;
11                 try {
12                     result = c.call();
13                     ran = true;
14                 } catch (Throwable ex) {
15                     result = null;
16                     ran = false;
17                     setException(ex);
18                 }
19                 if (ran)
20                     set(result);
21             }
22         } finally {
23             // runner must be non-null until state is settled to
24             // prevent concurrent calls to run()
25             runner = null;
26             // state must be re-read after nulling runner to prevent
27             // leaked interrupts
28             int s = state;
29             if (s >= INTERRUPTING)
30                 handlePossibleCancellationInterrupt(s);
31         }
32     }

FutureTask.run方法是在線程池中被執行的,而非主線程

  1. 經過執行Callable任務的call方法;
  2. 若是call執行成功,則經過set方法保存結果;
  3. 若是call執行有異常,則經過setException保存異常。

6 Executors類

 Exectors工廠類提供了線程池的初始化接口,主要有以下幾種:

newFixedThreadPool

1     public static ExecutorService newFixedThreadPool(int nThreads) {
2         return new ThreadPoolExecutor(nThreads, nThreads,
3                                       0L, TimeUnit.MILLISECONDS,
4                                       new LinkedBlockingQueue<Runnable>());
5     }

建立一個固定大小、任務隊列容量無界(Integer.MAX_VALUE)的線程池,其中corePoolSize =maximumPoolSize=nThreads,阻塞隊列爲LinkedBlockingQuene。

注意點:

  1. 線程池的線程數量達corePoolSize後,即便線程池沒有可執行任務時,也不會釋放線程;
  2. 線程池裏的線程數量不超過corePoolSize,這致使了maximumPoolSizekeepAliveTime將會是個無用參數 ;
  3. 因爲使用了無界隊列, 因此FixedThreadPool永遠不會拒絕, 即飽和策略失效。

newSingleThreadExecutor

1     public static ExecutorService newSingleThreadExecutor() {
2         return new FinalizableDelegatedExecutorService
3             (new ThreadPoolExecutor(1, 1,
4                                     0L, TimeUnit.MILLISECONDS,
5                                     new LinkedBlockingQueue<Runnable>()));
6     }

只有一個線程來執行無界任務隊列的單一線程池。若是該線程異常結束,會從新建立一個新的線程繼續執行任務,惟一的線程能夠保證所提交任務的順序執行。因爲使用了無界隊列, 因此SingleThreadPool永遠不會拒絕,即飽和策略失效。與newFixedThreadPool(1)的區別在於單一線程池的大小不能再改變。

newCachedThreadPool

1     public static ExecutorService newCachedThreadPool() {
2         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
3                                       60L, TimeUnit.SECONDS,
4                                       new SynchronousQueue<Runnable>());
5     }

建立一個大小無界的緩衝線程池。任務隊列是一個同步隊列。緩衝線程池適用於執行耗時較小的異步任務。池的核心線程數=0 最大線程數=Integer.MAX_VLUE。與前兩種稍微不一樣的是:

  1. 任務加入到池中,若是池中有空閒線程,則用空閒線程執行,如無則建立新線程執行。
  2. 池中的線程空閒超過60秒,將被銷燬釋放。
  3. 池中的線程數隨任務的多少變化。

newScheduledThreadPool

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

能定時執行任務的線程,池的核心線程數由參數指定。和前面3個線程池基於ThreadPoolExecutor類實現不一樣的是,它基於ScheduledThreadPoolExecutor實現。

7 線程池的監控

 可使用ThreadPoolExecutor如下方法:

  • getTaskCount:線程池已經執行的和未執行的任務總數;
  • getCompletedTaskCount:線程池已完成的任務數量,該值小於等於taskCount;
  • getLargestPoolSize:線程池曾經建立過的最大線程數量。經過這個數據能夠知道線程池是否滿過,也就是達到了maximumPoolSize;
  • getPoolSize:線程池當前的線程數量;
  • getActiveCount:當前線程池中正在執行任務的線程數量。
相關文章
相關標籤/搜索