JDK源碼分析之concurrent包(三) -- Future方式的實現

  上一篇咱們基於JDK的源碼對線程池ThreadPoolExecutor的實現作了分析,本篇來對Executor框架中另外一種典型用法Future方式作源碼解讀。咱們知道Future方式實現了帶有返回值的程序的異步調用,關於異步調用的場景你們能夠自行腦補Ajax的應用(獲取返回結果的方式不一樣,Future是主動詢問獲取,Ajax是回調函數),這裏不作過多說明。框架


 

在進入源碼前,首先來看下Future方式相關的API:異步

  • 接口Callable:有返回結果而且可能拋出異常的任務;
  • 接口Future:表示異步執行的結果;
  • 類FutureTask:實現Future、Runnable等接口,是一個異步執行的任務。能夠直接執行,或包裝成Callable執行;
  • 接口CompletionService:將生產新的異步任務與使用已完成任務的結果分離開來的服務,用來執行Callable或Runnable,並異步獲取執行結果;
  • 類ExecutorCompletionService:實現CompletionService接口,使用構造時傳入的Executor來執行Callable或Runnable,

接下來經過一個常規的使用實例來展現這些API之間的關係:函數

 1 ExecutorService executor = Executors.newFixedThreadPool(3);
 2 CompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
 3 Future<String> future = completionService.submit(new Callable<String>() {
 4 
 5     public String call() throws Exception {
 6         // do something...
 7         return "success";
 8     }
 9 });
10 
11 // 其它的程序邏輯。。。
12 
13 // 異步的獲取執行結果
14 String result = future.get();

常規調用主要是經過CompletionService.submit()方法,那咱們就從這個方法開始進入JDK源碼,如下是ExecutorCompletionService類的源碼:工具

 1 public Future<V> submit(Callable<V> task) {
 2     if (task == null) throw new NullPointerException();
 3     RunnableFuture<V> f = newTaskFor(task);
 4     executor.execute(new QueueingFuture(f));
 5     return f;
 6 }
 7 
 8 public Future<V> submit(Runnable task, V result) {
 9     if (task == null) throw new NullPointerException();
10     RunnableFuture<V> f = newTaskFor(task, result);
11     executor.execute(new QueueingFuture(f));
12     return f;
13 }

sunmit()方法有兩個重載,這裏咱們只對參數爲Callable的方法解讀,由於另外一個也是間接的封裝成了Callable最後調用的。ui

上述代碼的第3行能夠看到,Callable轉換成了RunnableFuture來交給executor執行,下面來看newTaskFor(task)方法的源碼:this

1 private RunnableFuture<V> newTaskFor(Callable<V> task) {
2     if (aes == null)
3         return new FutureTask<V>(task);
4     else
5         return aes.newTaskFor(task);
6 }

代碼第3行第5行的結果是同樣的,繼續來看這個FutureTask如何構造的:spa

1 public FutureTask(Callable<V> callable) {
2     if (callable == null)
3         throw new NullPointerException();
4     sync = new Sync(callable);
5 }

仍是沒什麼用,而後咱們詳細的來看下Sync的源碼:線程

 1 /**
 2  * Synchronization control for FutureTask. Note that this must be
 3  * a non-static inner class in order to invoke the protected
 4  * <tt>done</tt> method. For clarity, all inner class support
 5  * methods are same as outer, prefixed with "inner".
 6  *
 7  * Uses AQS sync state to represent run status
 8  */
 9 private final class Sync extends AbstractQueuedSynchronizer {
10 
11     /** The underlying callable */
12     private final Callable<V> callable;
13     /** The result to return from get() */
14     private V result;
15     /** The exception to throw from get() */
16     private Throwable exception;
17 
18     Sync(Callable<V> callable) {
19         this.callable = callable;
20     }
21 
22     V innerGet() throws InterruptedException, ExecutionException {
23         acquireSharedInterruptibly(0);
24         if (getState() == CANCELLED)
25             throw new CancellationException();
26         if (exception != null)
27             throw new ExecutionException(exception);
28         return result;
29     }
30 
31     void innerSet(V v) {
32       for (;;) {
33         int s = getState();
34         if (s == RAN)
35           return;
36         if (s == CANCELLED) {
37           // aggressively release to set runner to null,
38           // in case we are racing with a cancel request
39           // that will try to interrupt runner
40           releaseShared(0);
41           return;
42         }
43         if (compareAndSetState(s, RAN)) {
44           result = v;
45           releaseShared(0);
46           done();
47           return;
48         }
49       }
50     }
51 
52     void innerSetException(Throwable t) {
53       for (;;) {
54         int s = getState();
55         if (s == RAN)
56           return;
57         if (s == CANCELLED) {
58             // aggressively release to set runner to null,
59             // in case we are racing with a cancel request
60             // that will try to interrupt runner
61             releaseShared(0);
62             return;
63         }
64         if (compareAndSetState(s, RAN)) {
65           exception = t;
66           result = null;
67           releaseShared(0);
68           done();
69           return;
70         }
71       }
72     }
73 
74     void innerRun() {
75         if (!compareAndSetState(0, RUNNING))
76             return;
77         try {
78             runner = Thread.currentThread();
79             if (getState() == RUNNING) // recheck after setting thread
80                 innerSet(callable.call());
81             else
82                 releaseShared(0); // cancel
83         } catch (Throwable ex) {
84             innerSetException(ex);
85         }
86     }
87     
88     // others codes
89 }

上述代碼列出了幾個重要的方法,能夠大概的看出,Future方式的玄機,基本都在這個內部類裏了。下面就對這個內部類中的幾個方法少作解釋:code

首先,類的註釋主要說明了:內部類必須是非靜態的,是爲了調用外部類的done()方法(這個咱們之後再說)。還有內部類的方法都是以「前綴inner+外部類方法名」來命名的。blog

其次,經過查看外部類的源碼可得知:外部類的全部方法都是經過內部類中同名的inner方法來調用的(源碼很簡單這裏沒有列出)。

而後,咱們來看這個類中的其中3個成員變量及其註釋,就能夠大概猜到:callable是傳入的執行過程,result用來存儲callable的返回值,exception存儲callable拋出的異常(若是有)。

最後,咱們來分別看這個類中的幾個關鍵方法:

上述代碼第74行的innerRun()方法:注意外部類是FutureTask,實現了Runnable接口,事實上就是最開始所說的submit()方法中,最終要執行的Runnable任務,此時執行的實際上是內部類的innerRun(),經過代碼的第80行能夠看出,是調用了callable.call()並把返回值經過innerSet(V v)賦值給了成員變量result。若是callable.call()有異常,則經過innerSetException(Throwable t)賦值給成員變量exception。

經過innserGet()方法差很少就知道了異步獲取執行結果的原理了,第23行的acquireSharedInterruptibly(0)方法的意義在於:要等Runnable任務執行完或被中斷才能執行後面的代碼。(注:這塊實現雖然被一筆帶過,但其實邏輯仍是有點複雜,其實現主要是用死循環檢查執行Callable線程的狀態,相似自旋鎖的概念

最後,在回過頭來看CompletionService.submit()方法:

1 public Future<V> submit(Callable<V> task) {
2     if (task == null) throw new NullPointerException();
3     RunnableFuture<V> f = newTaskFor(task);
4     executor.execute(new QueueingFuture(f));
5     return f;
6 }

代碼的第4行並不是執行的是咱們上面說的FutureTask,而是將這個FutureTask由封裝成了QueueingFuture才交給executor執行,當咱們看了QueueingFuture的源碼就會了解到

 1 /**
 2  * FutureTask extension to enqueue upon completion
 3  */
 4 private class QueueingFuture extends FutureTask<Void> {
 5     QueueingFuture(RunnableFuture<V> task) {
 6         super(task, null);
 7         this.task = task;
 8     }
 9     protected void done() { completionQueue.add(task); }
10     private final Future<V> task;
11 }

這個封裝的意義無非是想在callable.call()執行完後調用第9行的completionQueue.add(task),done()方法是否是很眼熟?

註釋其實已經說明了:是爲了讓完成的任務入列到completionQueue中,以實現本文最開始羅列出來的,CompletionService接口的「將新的異步任務與完成的任務分離開來」的特性。

總結

本文經過ExecutorCompletionService類與FutureTask類及其內部類中部分關鍵處源碼的解讀,介紹了Java5中Future方式的原理。其實能夠歸納爲一句話:

  • 在某線程中執行Callable時,將執行結果或拋出的異常存放在臨時變量中,其它線程在Callable執行完或中斷前,阻塞的獲取執行結果。

關於Executor框架的源碼就解讀到這,下篇文章開始一些工具類的源碼解析。

相關文章
相關標籤/搜索