Java多線程之線程池深入分析(下)

一、數據結構與線程構造方法


由於已經看到了ThreadPoolExecutor的源碼,因此很容易就看到了ThreadPoolExecutor線程池的數據結構。圖1描述了這種數據結構。


圖1 ThreadPoolExecutor 數據結構

其實,即使沒有上述圖形描述ThreadPoolExecutor的數據結構,我們根據線程池的要求也很能夠猜測出其數據結構出來。

  • 線程池需要支持多個線程併發執行,因此有一個線程集合Collection<Thread>來執行線程任務;
  • 涉及任務的異步執行,因此需要有一個集合來緩存任務隊列Collection<Runnable>;
  • 很顯然在多個線程之間協調多個任務,那麼就需要一個線程安全的任務集合,同時還需要支持阻塞、超時操作,那麼BlockingQueue是必不可少的;
  • 既然是線程池,出發點就是提高系統性能同時降低資源消耗,那麼線程池的大小就有限制,因此需要有一個核心線程池大小(線程個數)和一個最大線程池大小(線程個數),有一個計數用來描述當前線程池大小;
  • 如果是有限的線程池大小,那麼長時間不使用的線程資源就應該銷燬掉,這樣就需要一個線程空閒時間的計數來描述線程何時被銷燬;
  • 前面描述過線程池也是有生命週期的,因此需要有一個狀態來描述線程池當前的運行狀態;
  • 線程池的任務隊列如果有邊界,那麼就需要有一個任務拒絕策略來處理過多的任務,同時在線程池的銷燬階段也需要有一個任務拒絕策略來處理新加入的任務;
  • 上面種的線程池大小、線程空閒實際那、線程池運行狀態等等狀態改變都不是線程安全的,因此需要有一個全局的鎖(mainLock)來協調這些競爭資源;
  • 除了以上數據結構以外,ThreadPoolExecutor還有一些狀態用來描述線程池的運行計數,例如線程池運行的任務數、曾經達到的最大線程數,主要用於調試和性能分析。

 

對於ThreadPoolExecutor而言,一個線程就是一個Worker對象,它與一個線程綁定,當Worker執行完畢就是線程執行完畢,這個在後面詳細討論線程池中線程的運行方式。

既然是線程池,那麼就首先研究下線程的構造方法。

 

  1. public interface ThreadFactory {  
  2.     Thread newThread(Runnable r);  
  3. }  
  
  
  1. public interface ThreadFactory {
  2. Thread newThread(Runnable r);
  3. }

ThreadPoolExecutor使用一個線程工廠來構造線程。線程池都是提交一個任務Runnable,然後在某一個線程Thread中執行,ThreadFactory 負責如何創建一個新線程。

在J.U.C中有一個通用的線程工廠java.util.concurrent.Executors.DefaultThreadFactory,它的構造方式如下:

 

  1. static class DefaultThreadFactory implements ThreadFactory {  
  2.     static final AtomicInteger poolNumber = new AtomicInteger(1);  
  3.     final ThreadGroup group;  
  4.     final AtomicInteger threadNumber = new AtomicInteger(1);  
  5.     final String namePrefix;  
  6.     DefaultThreadFactory() {  
  7.         SecurityManager s = System.getSecurityManager();  
  8.         group = (s != null)? s.getThreadGroup() :  
  9.                              Thread.currentThread().getThreadGroup();  
  10.         namePrefix = "pool-" +  
  11.                       poolNumber.getAndIncrement() +  
  12.                      "-thread-";  
  13.     }  
  14.     public Thread newThread(Runnable r) {  
  15.         Thread t = new Thread(group, r,  
  16.                               namePrefix + threadNumber.getAndIncrement(),  
  17.                               0);  
  18.         if (t.isDaemon())  
  19.             t.setDaemon(false);  
  20.         if (t.getPriority() != Thread.NORM_PRIORITY)  
  21.             t.setPriority(Thread.NORM_PRIORITY);  
  22.         return t;  
  23.     }  
  24. }  
  
  
  1. static class DefaultThreadFactory implements ThreadFactory {
  2. static final AtomicInteger poolNumber = new AtomicInteger( 1);
  3. final ThreadGroup group;
  4. final AtomicInteger threadNumber = new AtomicInteger( 1);
  5. final String namePrefix;
  6. DefaultThreadFactory() {
  7. SecurityManager s = System.getSecurityManager();
  8. group = (s != null)? s.getThreadGroup() :
  9. Thread.currentThread().getThreadGroup();
  10. namePrefix = "pool-" +
  11. poolNumber.getAndIncrement() +
  12. "-thread-";
  13. }
  14. public Thread newThread(Runnable r) {
  15. Thread t = new Thread(group, r,
  16. namePrefix + threadNumber.getAndIncrement(),
  17. 0);
  18. if (t.isDaemon())
  19. t.setDaemon( false);
  20. if (t.getPriority() != Thread.NORM_PRIORITY)
  21. t.setPriority(Thread.NORM_PRIORITY);
  22. return t;
  23. }
  24. }

在這個線程工廠中,同一個線程池的所有線程屬於同一個線程組,也就是創建線程池的那個線程組,同時線程池的名稱都是「pool-<poolNum>-thread-<threadNum>」,其中poolNum是線程池的數量序號,threadNum是此線程池中的線程數量序號。這樣如果使用jstack的話很容易就看到了系統中線程池的數量和線程池中線程的數量。另外對於線程池中的所有線程默認都轉換爲非後臺線程,這樣主線程退出時不會直接退出JVM,而是等待線程池結束。還有一點就是默認將線程池中的所有線程都調爲同一個級別,這樣在操作系統角度來看所有系統都是公平的,不會導致競爭堆積。


二、線程池中線程生命週期


一個線程Worker被構造出來以後就開始處於運行狀態。以下是一個線程執行的簡版邏輯。

 

  1. private final class Worker implements Runnable {  
  2.     private final ReentrantLock runLock = new ReentrantLock();  
  3.     private Runnable firstTask;  
  4.     Thread thread;  
  5.     Worker(Runnable firstTask) {  
  6.         this.firstTask = firstTask;  
  7.     }  
  8.     private void runTask(Runnable task) {  
  9.         final ReentrantLock runLock = this.runLock;  
  10.         runLock.lock();  
  11.         try {  
  12.            task.run();  
  13.         } finally {  
  14.             runLock.unlock();  
  15.         }  
  16.     }  
  17.     public void run() {  
  18.         try {  
  19.             Runnable task = firstTask;  
  20.             firstTask = null;  
  21.             while (task != null || (task = getTask()) != null) {  
  22.                 runTask(task);  
  23.                 task = null;  
  24.             }  
  25.         } finally {  
  26.             workerDone(this);  
  27.         }  
  28.     }  
  29. }  
  
  
  1. private final class Worker implements Runnable {
  2. private final ReentrantLock runLock = new ReentrantLock();
  3. private Runnable firstTask;
  4. Thread thread;
  5. Worker(Runnable firstTask) {
  6. this.firstTask = firstTask;
  7. }
  8. private void runTask(Runnable task) {
  9. final ReentrantLock runLock = this.runLock;
  10. runLock.lock();
  11. try {
  12. task.run();
  13. } finally {
  14. runLock.unlock();
  15. }
  16. }
  17. public void run() {
  18. try {
  19. Runnable task = firstTask;
  20. firstTask = null;
  21. while (task != null || (task = getTask()) != null) {
  22. runTask(task);
  23. task = null;
  24. }
  25. } finally {
  26. workerDone( this);
  27. }
  28. }
  29. }


當提交一個任務時,如果需要創建一個線程(何時需要在下一節中探討)時,就調用線程工廠創建一個線程,同時將線程綁定到Worker工作隊列中。需要說明的是,Worker隊列構造的時候帶着一個任務Runnable,因此Worker創建時總是綁定着一個待執行任務。換句話說,創建線程的前提是有必要創建線程(任務數已經超出了線程或者強制創建新的線程,至於爲何強制創建新的線程後面章節會具體分析),不會無緣無故創建一堆空閒線程等着任務。這是節省資源的一種方式。

一旦線程池啓動線程後(調用線程run())方法,那麼線程工作隊列Worker就從第1個任務開始執行(這時候發現構造Worker時傳遞一個任務的好處了),一旦第1個任務執行完畢,就從線程池的任務隊列中取出下一個任務進行執行。循環如此,直到線程池被關閉或者任務拋出了一個RuntimeException。

由此可見,線程池的基本原理其實也很簡單,無非預先啓動一些線程,線程進入死循環狀態,每次從任務隊列中獲取一個任務進行執行,直到線程池被關閉。如果某個線程因爲執行某個任務發生異常而終止,那麼重新創建一個新的線程而已。如此反覆。

其實,線程池原理看起來簡單,但是複雜的是各種策略,例如何時該啓動一個線程,何時該終止、掛起、喚醒一個線程,任務隊列的阻塞與超時,線程池的生命週期以及任務拒絕策略等等。


三、線程池任務執行流程


我們從一個API開始接觸Executor是如何處理任務隊列的。

java.util.concurrent.Executor.execute(Runnable)

Executes the given task sometime in the future. The task may execute in a new thread or in an existing pooled thread. If the task cannot be submitted for execution, either because this executor has been shutdown or because its capacity has been reached, the task is handled by the current RejectedExecutionHandler.

線程池中所有任務執行都依賴於此接口。這段話有以下幾個意思:

  1. 任務可能在將來某個時刻被執行,有可能不是立即執行。爲什麼這裏有兩個「可能」?繼續往下面看。
  2. 任務可能在一個新的線程中執行或者線程池中存在的一個線程中執行。
  3. 任務無法被提交執行有以下兩個原因:線程池已經關閉或者線程池已經達到了容量限制。
  4. 所有失敗的任務都將被「當前」的任務拒絕策略RejectedExecutionHandler 處理。

回答上面兩個「可能「。任務可能被執行,那不可能的情況就是上面說的情況3;可能不是立即執行,是因爲任務可能還在隊列中排隊,因此還在等待分配線程執行。瞭解完了字面上的問題,我們再來看具體的實現。

  1. public void execute(Runnable command) {  
  2.     if (command == null)  
  3.         throw new NullPointerException();  
  4.     if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {  
  5.         if (runState == RUNNING && workQueue.offer(command)) {  
  6.             if (runState != RUNNING || poolSize == 0)  
  7.                 ensureQueuedTaskHandled(command);  
  8.         }  
  9.         else if (!addIfUnderMaximumPoolSize(command))  
  10.             reject(command); // is shutdown or saturated   
  11.     }  
  12. }  
  
  
  1. public void execute(Runnable command) {
  2. if (command == null)
  3. throw new NullPointerException();
  4. if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
  5. if (runState == RUNNING && workQueue.offer(command)) {
  6. if (runState != RUNNING || poolSize == 0)
  7. ensureQueuedTaskHandled(command);
  8. }
  9. else if (!addIfUnderMaximumPoolSize(command))
  10. reject(command); // is shutdown or saturated
  11. }
  12. }


這一段代碼看起來挺簡單的,其實這就是線程池最重要的一部分,如果能夠完全理解這一塊,線程池還是挺容易的。整個執行流程是這樣的:

  1. 如果任務command爲空,則拋出空指針異常,返回。否則進行2。
  2. 如果當前線程池大小 大於或等於 核心線程池大小,進行4。否則進行3。
  3. 創建一個新工作隊列(線程,參考上一節),成功直接返回,失敗進行4。
  4. 如果線程池正在運行並且任務加入線程池隊列成功,進行5,否則進行7。
  5. 如果線程池已經關閉或者線程池大小爲0,進行6,否則直接返回。
  6. 如果線程池已經關閉則執行拒絕策略返回,否則啓動一個新線程來進行執行任務,返回。
  7. 如果線程池大小 不大於 最大線程池數量,則啓動新線程來進行執行,否則進行拒絕策略,結束。

文字描述步驟不夠簡單?下面圖形詳細表述了此過程。


老實說這個圖比上面步驟更難以理解,那麼從何入手呢。

流程的入口很簡單,我們就是要執行一個任務(Runnable command),那麼它的結束點在哪或者有哪幾個?

根據左邊這個圖我們知道可能有以下幾種出口:

(1)圖中的P1、P7,我們根據這條路徑可以看到,僅僅是將任務加入任務隊列(offer(command))了;

(2)圖中的P3,這條路徑不將任務加入任務隊列,但是啓動了一個新工作線程(Worker)進行掃尾操作,用戶處理爲空的任務隊列;

(3)圖中的P4,這條路徑沒有將任務加入任務隊列,但是啓動了一個新工作線程(Worker),並且工作現場的第一個任務就是當前任務;

(4)圖中的P5、P6,這條路徑沒有將任務加入任務隊列,也沒有啓動工作線程,僅僅是拋給了任務拒絕策略。P2是任務加入了任務隊列卻因爲線程池已經關閉於是又從任務隊列中刪除,並且拋給了拒絕策略。

如果上面的解釋還不清楚,可以去研究下面兩段代碼:

  1. java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable)  
  2. java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable)  
  3. java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)  
  
  
  1. java.util.concurrent.ThreadPoolExecutor.addIfUnderCorePoolSize(Runnable)
  2. java.util.concurrent.ThreadPoolExecutor.addIfUnderMaximumPoolSize(Runnable)
  3. java.util.concurrent.ThreadPoolExecutor.ensureQueuedTaskHandled(Runnable)


那麼什麼時候一個任務被立即執行呢?

在線程池運行狀態下,如果線程池大小 小於 核心線程池大小或者線程池已滿(任務隊列已滿)並且線程池大小 小於 最大線程池大小(此時線程池大小 大於 核心線程池大小的),用程序描述爲:

  1. runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())  
  
  
runState == RUNNING && ( poolSize < corePoolSize || poolSize < maxnumPoolSize && workQueue.isFull())

上面的條件就是一個任務能夠被立即執行的條件。

有了execute的基礎,我們看看ExecutorService中的幾個submit方法的實現。

  1. public Future<?> submit(Runnable task) {  
  2.         if (task == nullthrow new NullPointerException();  
  3.         RunnableFuture<Object> ftask = newTaskFor(task, null);  
  4.         execute(ftask);  
  5.         return ftask;  
  6.     }  
  7.   
  8.     public <T> Future<T> submit(Runnable task, T result) {  
  9.         if (task == nullthrow new NullPointerException();  
  10.         RunnableFuture<T> ftask = newTaskFor(task, result);  
  11.         execute(ftask);  
  12.         return ftask;  
  13.     }  
  14.   
  15.     public <T> Future<T> submit(Callable<T> task) {  
  16.         if (task == nullthrow new NullPointerException();  
  17.         RunnableFuture<T> ftask = newTaskFor(task);  
  18.         execute(ftask);  
  19.         return ftask;  
  20.     }  
  
  
  1. public Future<?> submit(Runnable task) {
  2. if (task == null) throw new NullPointerException();
  3. RunnableFuture<Object> ftask = newTaskFor(task, null);
  4. execute(ftask);
  5. return ftask;
  6. }
  7. public <T> Future<T> submit(Runnable task, T result) {
  8. if (task == null) throw new NullPointerException();
  9. RunnableFuture<T> ftask = newTaskFor(task, result);
  10. execute(ftask);
  11. return ftask;
  12. }
  13. public <T> Future<T> submit(Callable<T> task) {
  14. if (task == null) throw new NullPointerException();
  15. RunnableFuture<T> ftask = newTaskFor(task);
  16. execute(ftask);
  17. return ftask;
  18. }


很簡單,不是麼?對於一個線程池來說複雜的地方也就在execute方法的執行流程。在下一節中我們來討論下如何獲取任務的執行結果,也就是Future類的使用和原理。


四、線程池任務執行結果


這一節來探討下線程池中任務執行的結果以及如何阻塞線程、取消任務等等。

  1. package info.imxylz.study.concurrency.future;  
  2.    
  3.  public class SleepForResultDemo implements Runnable {  
  4.    
  5.      static boolean result = false;  
  6.    
  7.      static void sleepWhile(long ms) {  
  8.          try {  
  9.              Thread.sleep(ms);  
  10.          } catch (Exception e) {}  
  11.      }  
  12.    
  13.      @Override  
  14.      public void run() {  
  15.          //do work   
  16.          System.out.println("Hello, sleep a while.");  
  17.          sleepWhile(2000L);  
  18.          result = true;  
  19.      }  
  20.    
  21.      public static void main(String[] args) {  
  22.          SleepForResultDemo demo = new SleepForResultDemo();  
  23.          Thread t = new Thread(demo);  
  24.          t.start();  
  25.          sleepWhile(3000L);  
  26.          System.out.println(result);  
  27.      }  
  28.    
  29.  }  
  
  
  1. package info.imxylz.study.concurrency.future;
  2. public class SleepForResultDemo implements Runnable {
  3. static boolean result = false;
  4. static void sleepWhile(long ms) {
  5. try {
  6. Thread.sleep(ms);
  7. } catch (Exception e) {}
  8. }
  9. @Override
  10. public void run() {
  11. //do work
  12. System.out.println( "Hello, sleep a while.");
  13. sleepWhile( 2000L);
  14. result = true;
  15. }
  16. public static void main(String[] args) {
  17. SleepForResultDemo demo = new SleepForResultDemo();
  18. Thread t = new Thread(demo);
  19. t.start();
  20. sleepWhile( 3000L);
  21. System.out.println(result);
  22. }
  23. }


在沒有線程池的時代裏面,使用Thread.sleep(long)去獲取線程執行完畢的場景很多。顯然這種方式很笨拙,他需要你事先知道任務可能的執行時間,並且還會阻塞主線程,不管任務有沒有執行完畢。

  1. package info.imxylz.study.concurrency.future;  
  2.    
  3.  public class SleepLoopForResultDemo implements Runnable {  
  4.    
  5.      boolean result = false;  
  6.    
  7.      volatile boolean finished = false;  
  8.    
  9.      static void sleepWhile(long ms) {  
  10.          try {  
  11.              Thread.sleep(ms);  
  12.          } catch (Exception e) {}  
  13.      }  
  14.    
  15.      @Override  
  16.      public void run() {  
  17.          //do work   
  18.          try {  
  19.              System.out.println("Hello, sleep a while.");  
  20.              sleepWhile(2000L);  
  21.              result = true;  
  22.          } finally {  
  23.              finished = true;  
  24.          }  
  25.      }  
  26.    
  27.      public static void main(String[] args) {  
  28.          SleepLoopForResultDemo demo = new SleepLoopForResultDemo();  
  29.          Thread t = new Thread(demo);  
  30.          t.start();  
  31.          while (!demo.finished) {  
  32.              sleepWhile(10L);  
  33.          }  
  34.          System.out.println(demo.result);  
  35.      }  
  36.    
  37. }  
  
  
  1. package info.imxylz.study.concurrency.future;
  2. public class SleepLoopForResultDemo implements Runnable {
  3. boolean result = false;
  4. volatile boolean finished = false;
  5. static void sleepWhile(long ms) {
  6. try {
  7. Thread.sleep(ms);
  8. } catch (Exception e) {}
  9. }
  10. @Override
  11. public void run() {
  12. //do work
  13. try {
  14. System.out.println( "Hello, sleep a while.");
  15. sleepWhile( 2000L);
  16. result = true;
  17. } finally {
  18. finished = true;
  19. }
  20. }
  21. public static void main(String[] args) {
  22. SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
  23. Thread t = new Thread(demo);
  24. t.start();
  25. while (!demo.finished) {
  26. sleepWhile( 10L);
  27. }
  28. System.out.println(demo.result);
  29. }
  30. }


使用volatile與while死循環的好處就是等待的時間可以稍微小一點,但是依然有CPU負載高並且阻塞主線程的問題。最簡單的降低CPU負載的方式就是使用Thread.join().

  1. SleepLoopForResultDemo demo = new SleepLoopForResultDemo();  
  2.         Thread t = new Thread(demo);  
  3.         t.start();  
  4.         t.join();  
  5.         System.out.println(demo.result);  
  
  
  1. SleepLoopForResultDemo demo = new SleepLoopForResultDemo();
  2. Thread t = new Thread(demo);
  3. t.start();
  4. t.join();
  5. System.out.println(demo.result);


顯然這也是一種不錯的方式,另外還有自己寫鎖使用wait/notify的方式。其實join()從本質上講就是利用while和wait來實現的。

上面的方式中都存在一個問題,那就是會阻塞主線程並且任務不能被取消。爲了解決這個問題,線程池中提供了一個Future接口。


在Future接口中提供了5個方法。

  • V get() throws InterruptedException, ExecutionException: 等待計算完成,然後獲取其結果。
  • V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException。最多等待爲使計算完成所給定的時間之後,獲取其結果(如果結果可用)。
  • boolean cancel(boolean mayInterruptIfRunning):試圖取消對此任務的執行。
  • boolean isCancelled():如果在任務正常完成前將其取消,則返回 true
  • boolean isDone():如果任務已完成,則返回 true。 可能由於正常終止、異常或取消而完成,在所有這些情況中,此方法都將返回true

API看起來容易,來研究下異常吧。get()請求獲取一個結果會阻塞當前進程,並且可能拋出以下三種異常:

  • InterruptedException:執行任務的線程被中斷則會拋出此異常,此時不能知道任務是否執行完畢,因此其結果是無用的,必須處理此異常。
  • ExecutionException:任務執行過程中(Runnable#run())方法可能拋出RuntimeException,如果提交的是一個java.util.concurrent.Callable<V>接口任務,那麼java.util.concurrent.Callable.call()方法有可能拋出任意異常。
  • CancellationException:實際上get()方法還可能拋出一個CancellationException的RuntimeException,也就是任務被取消了但是依然去獲取結果。

對於get(long timeout, TimeUnit unit)而言,除了get()方法的異常外,由於有超時機制,因此還可能得到一個TimeoutException。

boolean cancel(boolean mayInterruptIfRunning)方法比較複雜,各種情況比較多:

  1. 如果任務已經執行完畢,那麼返回false。
  2. 如果任務已經取消,那麼返回false。
  3. 循環直到設置任務爲取消狀態,對於未啓動的任務將永遠不再執行,對於正在運行的任務,將根據mayInterruptIfRunning是否中斷其運行,如果不中斷那麼任務將繼續運行直到結束。
  4. 此方法返回後任務要麼處於運行結束狀態,要麼處於取消狀態。isDone()將永遠返回true,如果cancel()方法返回true,isCancelled()始終返回true。

來看看Future接口的實現類java.util.concurrent.FutureTask<V>具體是如何操作的。

在FutureTask中使用了一個AQS數據結構來完成各種狀態以及加鎖、阻塞的實現。

在此AQS類java.util.concurrent.FutureTask.Sync中一個任務用4中狀態:


初始情況下任務狀態state=0,任務執行(innerRun)後狀態變爲運行狀態RUNNING(state=1),執行完畢後變成運行結束狀態RAN(state=2)。任務在初始狀態或者執行狀態被取消後就變爲狀態CANCELLED(state=4)。AQS最擅長無鎖情況下處理幾種簡單的狀態變更的。

  1. void innerRun() {  
  2.             if (!compareAndSetState(0, RUNNING))  
  3.                 return;  
  4.             try {  
  5.                 runner = Thread.currentThread();  
  6.                 if (getState() == RUNNING) // recheck after setting thread   
  7.                     innerSet(callable.call());  
  8.                 else  
  9.                     releaseShared(0); // cancel   
  10.             } catch (Throwable ex) {  
  11.                 innerSetException(ex);  
  12.             }  
  13.         }  
  
  
  1. void innerRun() {
  2. if (!compareAndSetState( 0, RUNNING))
  3. return;
  4. try {
  5. runner = Thread.currentThread();
  6. if (getState() == RUNNING) // recheck after setting thread
  7. innerSet(callable.call());
  8. else
  9. releaseShared( 0); // cancel
  10. } catch (Throwable ex) {
  11. innerSetException(ex);
  12. }
  13. }


執行一個任務有四步:設置運行狀態、設置當前線程(AQS需要)、執行任務(Runnable#run或者Callable#call)、設置執行結果。這裏也可以看到,一個任務只能執行一次,因爲執行完畢後它的狀態不在爲初始值0,要麼爲CANCELLED,要麼爲RAN。

取消一個任務(cancel)又是怎樣進行的呢?對比下前面取消任務的描述是不是很簡單,這裏無非利用AQS的狀態來改變任務的執行狀態,最終達到放棄未啓動或者正在執行的任務的目的。

  1. boolean innerCancel(boolean mayInterruptIfRunning) {  
  2.     for (;;) {  
  3.         int s = getState();  
  4.         if (ranOrCancelled(s))  
  5.             return false;  
  6.         if (compareAndSetState(s, CANCELLED))  
  7.             break;  
  8.     }  
  9.     if (mayInterruptIfRunning) {  
  10.         Thread r = runner;  
  11.         if (r != null)  
  12.             r.interrupt();  
  13.     }  
  14.     releaseShared(0);  
  15.     done();  
  16.     return true;  
  17. }  
  
  
  1. boolean innerCancel(boolean mayInterruptIfRunning) {
  2. for (;;) {
  3. int s = getState();
  4. if (ranOrCancelled(s))
  5. return false;
  6. if (compareAndSetState(s, CANCELLED))
  7. break;
  8. }
  9. if (mayInterruptIfRunning) {
  10. Thread r = runner;
  11. if (r != null)
  12. r.interrupt();
  13. }
  14. releaseShared( 0);
  15. done();
  16. return true;
  17. }


到目前爲止我們依然沒有說明到底是如何阻塞獲取一個結果的。下面四段代碼描述了這個過程。

  1. V innerGet() throws InterruptedException, ExecutionException {  
  2.          acquireSharedInterruptibly(0);  
  3.          if (getState() == CANCELLED)  
  4.              throw new CancellationException();  
  5.          if (exception != null)  
  6.              throw new ExecutionException(exception);  
  7.          return result;  
  8.      }  
  9.      //AQS#acquireSharedInterruptibly   
  10.      public final void acquireSharedInterruptibly(int arg) throws InterruptedException {  
  11.          if (Thread.interrupted())  
  12.              throw new InterruptedException();  
  13.          if (tryAcquireShared(arg) < 0)  
  14.              doAcquireSharedInterruptibly(arg); //park current Thread for result   
  15.      }  
  16.      protected int tryAcquireShared(int ignore) {  
  17.          return innerIsDone()? 1 : -1;  
  18.      }  
  19.    
  20.      boolean innerIsDone() {  
  21.          return ranOrCancelled(getState()) && runner == null;  
  22.      }  
  
  
  1. V innerGet() throws InterruptedException, ExecutionException {
  2. acquireSharedInterruptibly( 0);
  3. if (getState() == CANCELLED)
  4. throw new CancellationException();
  5. if (exception != null)
  6. throw new ExecutionException(exception);
  7. return result;
  8. }
  9. //AQS#acquireSharedInterruptibly
  10. public final void acquireSharedInterruptibly(int arg) throws InterruptedException {
  11. if (Thread.interrupted())
  12. throw new InterruptedException();
  13. if (tryAcquireShared(arg) < 0)
  14. doAcquireSharedInterruptibly(arg); //park current Thread for result
  15. }
  16. protected int tryAcquireShared(int ignore) {
  17. return innerIsDone()? 1 : - 1;
  18. }
  19. boolean innerIsDone() {
  20. return ranOrCancelled(getState()) && runner == null;
  21. }


當調用Future#get()的時候嘗試去獲取一個共享變量。這就涉及到AQS的使用方式了。這裏獲取一個共享變量的狀態是任務是否結束(innerIsDone()),也就是任務是否執行完畢或者被取消。如果不滿足條件,那麼在AQS中就會doAcquireSharedInterruptibly(arg)掛起當前線程,直到滿足條件。AQS前面講過,掛起線程使用的是LockSupport的park方式,因此性能消耗是很低的。

至於將Runnable接口轉換成Callable接口,java.util.concurrent.Executors.callable(Runnable, T)也提供了一個簡單實現。

  1. static final class RunnableAdapter<T> implements Callable<T> {  
  2.        final Runnable task;  
  3.        final T result;  
  4.        RunnableAdapter(Runnable  task, T result) {  
  5.            this.task = task;  
  6.            this.result = result;  
  7.        }  
  8.        public T call() {  
  9.            task.run();  
  10.            return result;  
  11.        }  
  12.    }  
  
  
  1. static final class RunnableAdapter<T> implements Callable<T> {
  2. final Runnable task;
  3. final T result;
  4. RunnableAdapter(Runnable task, T result) {
  5. this.task = task;
  6. this.result = result;
  7. }
  8. public T call() {
  9. task.run();
  10. return result;
  11. }
  12. }


五、延遲、週期性任務調度的實現


java.util.concurrent.ScheduledThreadPoolExecutor是默認的延遲、週期性任務調度的實現。

有了整個線程池的實現,再回頭來看延遲、週期性任務調度的實現應該就很簡單了,因爲所謂的延遲、週期性任務調度,無非添加一系列有序的任務隊列,然後按照執行順序的先後來處理整個任務隊列。如果是週期性任務,那麼在執行完畢的時候加入下一個時間點的任務即可。

由此可見,ScheduledThreadPoolExecutor和ThreadPoolExecutor的唯一區別在於任務是有序(按照執行時間順序)的,並且需要到達時間點(臨界點)才能執行,並不是任務隊列中有任務就需要執行的。也就是說唯一不同的就是任務隊列BlockingQueue<Runnable> workQueue不一樣。ScheduledThreadPoolExecutor的任務隊列是java.util.concurrent.ScheduledThreadPoolExecutor.DelayedWorkQueue,它是基於java.util.concurrent.DelayQueue<RunnableScheduledFuture>隊列的實現。

DelayQueue是基於有序隊列PriorityQueue實現的。PriorityQueue 也叫優先級隊列,按照自然順序對元素進行排序,類似於TreeMap/Collections.sort一樣。

同樣是有序隊列,DelayQueue和PriorityQueue區別在什麼地方?

由於DelayQueue在獲取元素時需要檢測元素是否「可用」,也就是任務是否達到「臨界點」(指定時間點),因此加入元素和移除元素會有一些額外的操作。

典型的,移除元素需要檢測元素是否達到「臨界點」,增加元素的時候如果有一個元素比「頭元素」更早達到臨界點,那麼就需要通知任務隊列。因此這需要一個條件變量final Condition available 。

移除元素(出隊列)的過程是這樣的:

  • 總是檢測隊列的頭元素(順序最小元素,也是最先達到臨界點的元素)
  • 檢測頭元素與當前時間的差,如果大於0,表示還未到底臨界點,因此等待響應時間(使用條件變量available)
  • 如果小於或者等於0,說明已經到底臨界點或者已經過了臨界點,那麼就移除頭元素,並且喚醒其它等待任務隊列的線程。
    1. public E take() throws InterruptedException {  
    2.        final ReentrantLock lock = this.lock;  
    3.        lock.lockInterruptibly();  
    4.        try {  
    5.            for (;;) {  
    6.                E first = q.peek();  
    7.                if (first == null) {  
    8.                    available.await();  
    9.                } else {  
    10.                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
    11.                    if (delay > 0) {  
    12.                        long tl = available.awaitNanos(delay);  
    13.                    } else {  
    14.                        E x = q.poll();  
    15.                        assert x != null;  
    16.                        if (q.size() != 0)  
    17.                            available.signalAll(); // wake up other takers   
    18.                        return x;  
    19.   
    20.                    }  
    21.                }  
    22.            }  
    23.        } finally {  
    24.            lock.unlock();  
    25.        }  
    26.    }  
        
        
    1. public E take() throws InterruptedException {
    2. final ReentrantLock lock = this.lock;
    3. lock.lockInterruptibly();
    4. try {
    5. for (;;) {
    6. E first = q.peek();
    7. if (first == null) {
    8. available.await();
    9. } else {
    10. long delay = first.getDelay(TimeUnit.NANOSECONDS);
    11. if (delay > 0) {
    12. long tl = available.awaitNanos(delay);
    13. } else {
    14. E x = q.poll();
    15. assert x != null;
    16. if (q.size() != 0)
    17. available.signalAll(); // wake up other takers
    18. return x;
    19. }
    20. }
    21. }
    22. } finally {
    23. lock.unlock();
    24. }
    25. }


同樣加入元素也會有相應的條件變量操作。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒「等待線程」去檢測元素。因爲頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。

  1. public E take() throws InterruptedException {  
  2.        final ReentrantLock lock = this.lock;  
  3.        lock.lockInterruptibly();  
  4.        try {  
  5.            for (;;) {  
  6.                E first = q.peek();  
  7.                if
try {
  • for (;;) {
  • E first = q.peek();
  • if (first == null) {
  • available.await();
  • } else {
  • long delay = first.getDelay(TimeUnit.NANOSECONDS);
  • if (delay > 0) {
  • long tl = available.awaitNanos(delay);
  • } else {
  • E x = q.poll();
  • assert x != null;
  • if (q.size() != 0)
  • available.signalAll(); // wake up other takers
  • return x;
  • }
  • }
  • }
  • } finally {
  • lock.unlock();
  • }
  • }


  • 同樣加入元素也會有相應的條件變量操作。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒「等待線程」去檢測元素。因爲頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。

    1. public E take() throws InterruptedException {  
    2.        final ReentrantLock lock = this.lock;  
    3.        lock.lockInterruptibly();  
    4.        try {  
    5.            for (;;) {  
    6.                E first = q.peek();  
    7.                if (first == null) {  
    8.                    available.await();  
    9.                } else {  
    10.                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
    11.                    if (delay > 0) {  
    12.                        long tl = available.awaitNanos(delay);  
    13.                    } else {  
    14.                        E x = q.poll();  
    15.                        assert x != null;  
    16.  &nbsiv>
  • }


  • 同樣加入元素也會有相應的條件變量操作。當前僅當隊列爲空或者要加入的元素比隊列中的頭元素還小的時候才需要喚醒「等待線程」去檢測元素。因爲頭元素都沒有喚醒那麼比頭元素更延遲的元素就更加不會喚醒。

    1. public E take() throws InterruptedException {  
    2.        final ReentrantLock lock = this.lock;  
    3.        lock.lockInterruptibly();  
    4.        try {  
    5.            for (;;) {  
    6.                E first = q.peek();  
    7.                if (first == null) {  
    8.                    available.await();  
    9.                } else {  
    10.                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);  
    11.                    if (delay > 0) {  
    12.                        long tl = available.awaitNanos(delay);  
    13.                    } else {  
    14.                        E x = q.poll();  
    15.                        assert x != null;  
    16.                        if (q.size() != 0)  
    17.                            available.signalAll(); // wake up other takers   
    18.                        return x;  
    19.   
    20.                    }  
    21.                }  
    22.            }  
    23.        } finally {  
    24.            lock.unlock();  
    25.        }  
    26.    }  
      
      
    1. public E take() throws InterruptedException {
    2. final ReentrantLock lock = this.lock;
    3. lock.lockInterruptibly();
    4. try {
    5. for (;;) {
    6. E first = q.peek();
    7. if (first == null) {
    8. available.await();
    9.   
    10.        final ReentrantLock lock = this.lock;  
    11.        lock.lockInterruptibly();  
    12.        try {  
    13.            for (;;) {  
    14.                E first&
    相關文章
    相關標籤/搜索