上一篇咱們基於JDK的源碼對線程池ThreadPoolExecutor的實現作了分析,本篇來對Executor框架中另外一種典型用法Future方式作源碼解讀。咱們知道Future方式實現了帶有返回值的程序的異步調用,關於異步調用的場景你們能夠自行腦補Ajax的應用(獲取返回結果的方式不一樣,Future是主動詢問獲取,Ajax是回調函數),這裏不作過多說明。框架
在進入源碼前,首先來看下Future方式相關的API:異步
接下來經過一個常規的使用實例來展現這些API之間的關係:函數
1 ExecutorService executor = Executors.newFixedThreadPool(3); 2 CompletionService<String> completionService = new ExecutorCompletionService<String>(executor); 3 Future<String> future = completionService.submit(new Callable<String>() { 4 5 public String call() throws Exception { 6 // do something... 7 return "success"; 8 } 9 }); 10 11 // 其它的程序邏輯。。。 12 13 // 異步的獲取執行結果 14 String result = future.get();
常規調用主要是經過CompletionService.submit()方法,那咱們就從這個方法開始進入JDK源碼,如下是ExecutorCompletionService類的源碼:工具
1 public Future<V> submit(Callable<V> task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<V> f = newTaskFor(task); 4 executor.execute(new QueueingFuture(f)); 5 return f; 6 } 7 8 public Future<V> submit(Runnable task, V result) { 9 if (task == null) throw new NullPointerException(); 10 RunnableFuture<V> f = newTaskFor(task, result); 11 executor.execute(new QueueingFuture(f)); 12 return f; 13 }
sunmit()方法有兩個重載,這裏咱們只對參數爲Callable的方法解讀,由於另外一個也是間接的封裝成了Callable最後調用的。ui
上述代碼的第3行能夠看到,Callable轉換成了RunnableFuture來交給executor執行,下面來看newTaskFor(task)方法的源碼:this
1 private RunnableFuture<V> newTaskFor(Callable<V> task) { 2 if (aes == null) 3 return new FutureTask<V>(task); 4 else 5 return aes.newTaskFor(task); 6 }
代碼第3行和第5行的結果是同樣的,繼續來看這個FutureTask如何構造的:spa
1 public FutureTask(Callable<V> callable) { 2 if (callable == null) 3 throw new NullPointerException(); 4 sync = new Sync(callable); 5 }
仍是沒什麼用,而後咱們詳細的來看下Sync的源碼:線程
1 /** 2 * Synchronization control for FutureTask. Note that this must be 3 * a non-static inner class in order to invoke the protected 4 * <tt>done</tt> method. For clarity, all inner class support 5 * methods are same as outer, prefixed with "inner". 6 * 7 * Uses AQS sync state to represent run status 8 */ 9 private final class Sync extends AbstractQueuedSynchronizer { 10 11 /** The underlying callable */ 12 private final Callable<V> callable; 13 /** The result to return from get() */ 14 private V result; 15 /** The exception to throw from get() */ 16 private Throwable exception; 17 18 Sync(Callable<V> callable) { 19 this.callable = callable; 20 } 21 22 V innerGet() throws InterruptedException, ExecutionException { 23 acquireSharedInterruptibly(0); 24 if (getState() == CANCELLED) 25 throw new CancellationException(); 26 if (exception != null) 27 throw new ExecutionException(exception); 28 return result; 29 } 30 31 void innerSet(V v) { 32 for (;;) { 33 int s = getState(); 34 if (s == RAN) 35 return; 36 if (s == CANCELLED) { 37 // aggressively release to set runner to null, 38 // in case we are racing with a cancel request 39 // that will try to interrupt runner 40 releaseShared(0); 41 return; 42 } 43 if (compareAndSetState(s, RAN)) { 44 result = v; 45 releaseShared(0); 46 done(); 47 return; 48 } 49 } 50 } 51 52 void innerSetException(Throwable t) { 53 for (;;) { 54 int s = getState(); 55 if (s == RAN) 56 return; 57 if (s == CANCELLED) { 58 // aggressively release to set runner to null, 59 // in case we are racing with a cancel request 60 // that will try to interrupt runner 61 releaseShared(0); 62 return; 63 } 64 if (compareAndSetState(s, RAN)) { 65 exception = t; 66 result = null; 67 releaseShared(0); 68 done(); 69 return; 70 } 71 } 72 } 73 74 void innerRun() { 75 if (!compareAndSetState(0, RUNNING)) 76 return; 77 try { 78 runner = Thread.currentThread(); 79 if (getState() == RUNNING) // recheck after setting thread 80 innerSet(callable.call()); 81 else 82 releaseShared(0); // cancel 83 } catch (Throwable ex) { 84 innerSetException(ex); 85 } 86 } 87 88 // others codes 89 }
上述代碼列出了幾個重要的方法,能夠大概的看出,Future方式的玄機,基本都在這個內部類裏了。下面就對這個內部類中的幾個方法少作解釋:code
首先,類的註釋主要說明了:內部類必須是非靜態的,是爲了調用外部類的done()方法(這個咱們之後再說)。還有內部類的方法都是以「前綴inner+外部類方法名」來命名的。blog
其次,經過查看外部類的源碼可得知:外部類的全部方法都是經過內部類中同名的inner方法來調用的(源碼很簡單這裏沒有列出)。
而後,咱們來看這個類中的其中3個成員變量及其註釋,就能夠大概猜到:callable是傳入的執行過程,result用來存儲callable的返回值,exception存儲callable拋出的異常(若是有)。
最後,咱們來分別看這個類中的幾個關鍵方法:
上述代碼第74行的innerRun()方法:注意外部類是FutureTask,實現了Runnable接口,事實上就是最開始所說的submit()方法中,最終要執行的Runnable任務,此時執行的實際上是內部類的innerRun(),經過代碼的第80行能夠看出,是調用了callable.call()並把返回值經過innerSet(V v)賦值給了成員變量result。若是callable.call()有異常,則經過innerSetException(Throwable t)賦值給成員變量exception。
經過innserGet()方法差很少就知道了異步獲取執行結果的原理了,第23行的acquireSharedInterruptibly(0)方法的意義在於:要等Runnable任務執行完或被中斷才能執行後面的代碼。(注:這塊實現雖然被一筆帶過,但其實邏輯仍是有點複雜,其實現主要是用死循環檢查執行Callable線程的狀態,相似自旋鎖的概念)
最後,在回過頭來看CompletionService.submit()方法:
1 public Future<V> submit(Callable<V> task) { 2 if (task == null) throw new NullPointerException(); 3 RunnableFuture<V> f = newTaskFor(task); 4 executor.execute(new QueueingFuture(f)); 5 return f; 6 }
代碼的第4行並不是執行的是咱們上面說的FutureTask,而是將這個FutureTask由封裝成了QueueingFuture才交給executor執行,當咱們看了QueueingFuture的源碼就會了解到
1 /** 2 * FutureTask extension to enqueue upon completion 3 */ 4 private class QueueingFuture extends FutureTask<Void> { 5 QueueingFuture(RunnableFuture<V> task) { 6 super(task, null); 7 this.task = task; 8 } 9 protected void done() { completionQueue.add(task); } 10 private final Future<V> task; 11 }
這個封裝的意義無非是想在callable.call()執行完後調用第9行的completionQueue.add(task),done()方法是否是很眼熟?
註釋其實已經說明了:是爲了讓完成的任務入列到completionQueue中,以實現本文最開始羅列出來的,CompletionService接口的「將新的異步任務與完成的任務分離開來」的特性。
本文經過ExecutorCompletionService類與FutureTask類及其內部類中部分關鍵處源碼的解讀,介紹了Java5中Future方式的原理。其實能夠歸納爲一句話:
關於Executor框架的源碼就解讀到這,下篇文章開始一些工具類的源碼解析。