轉自:http://www.importnew.com/25286.htmlhtml
在Java中通常經過繼承Thread類或者實現Runnable接口這兩種方式來建立多線程,可是這兩種方式都有個缺陷,就是不能在執行完成後獲取執行的結果,所以Java 1.5以後提供了Callable和Future接口,經過它們就能夠在任務執行完畢以後獲得任務的執行結果。本文會簡要的介紹使用方法,而後會從源代碼角度分析下具體的實現原理。
本文以Java 1.7的代碼進行分析。java
Callable接口多線程
對於須要執行的任務須要實現Callable接口,Callable接口定義以下:異步
public interface Callable<V> { /** * Computes a result, or throws an exception if unable to do so. * * @return computed result * @throws Exception if unable to compute a result */ V call() throws Exception; }
能夠看到Callable是個泛型接口,泛型V就是要call()方法返回的類型。Callable接口和Runnable接口很像,均可以被另一個線程執行,可是正如前面所說的,Runnable不會返回數據也不能拋出異常。ide
Future接口函數
Future接口表明異步計算的結果,經過Future接口提供的方法能夠查看異步計算是否執行完成,或者等待執行結果並獲取執行結果,同時還能夠取消執行。Future接口的定義以下:this
public interface Future<V> { boolean cancel(boolean mayInterruptIfRunning); boolean isCancelled(); boolean isDone(); V get() throws InterruptedException, ExecutionException; V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }
FutureTaskspa
Future只是一個接口,不能直接用來建立對象,FutureTask是Future的實現類,
FutureTask的繼承圖以下:線程
能夠看到,FutureTask實現了RunnableFuture接口,則RunnableFuture接口繼承了Runnable接口和Future接口,因此FutureTask既能當作一個Runnable直接被Thread執行,也能做爲Future用來獲得Callable的計算結果。指針
使用
FutureTask通常配合ExecutorService來使用,也能夠直接經過Thread來使用。
package com.beautyboss.slogen.callback; import java.util.concurrent.*; /** * Author : Slogen * AddTime : 17/6/4 * Email : huangjian13@meituan.com */ public class CallDemo { public static void main(String[] args) throws ExecutionException, InterruptedException { /** * 第一種方式:Future + ExecutorService * Task task = new Task(); * ExecutorService service = Executors.newCachedThreadPool(); * Future<Integer> future = service.submit(task1); * service.shutdown(); */ /** * 第二種方式: FutureTask + ExecutorService * ExecutorService executor = Executors.newCachedThreadPool(); * Task task = new Task(); * FutureTask<Integer> futureTask = new FutureTask<Integer>(task); * executor.submit(futureTask); * executor.shutdown(); */ /** * 第三種方式:FutureTask + Thread */ // 2. 新建FutureTask,須要一個實現了Callable接口的類的實例做爲構造函數參數 FutureTask<Integer> futureTask = new FutureTask<Integer>(new Task()); // 3. 新建Thread對象並啓動 Thread thread = new Thread(futureTask); thread.setName("Task thread"); thread.start(); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("Thread [" + Thread.currentThread().getName() + "] is running"); // 4. 調用isDone()判斷任務是否結束 if(!futureTask.isDone()) { System.out.println("Task is not done"); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } } int result = 0; try { // 5. 調用get()方法獲取任務結果,若是任務沒有執行完成則阻塞等待 result = futureTask.get(); } catch (Exception e) { e.printStackTrace(); } System.out.println("result is " + result); } // 1. 繼承Callable接口,實現call()方法,泛型參數爲要返回的類型 static class Task implements Callable<Integer> { @Override public Integer call() throws Exception { System.out.println("Thread [" + Thread.currentThread().getName() + "] is running"); int result = 0; for(int i = 0; i < 100;++i) { result += i; } Thread.sleep(3000); return result; } } }
構造函數
先從FutureTask的構造函數看起,FutureTask有兩個構造函數,其中一個以下:
public FutureTask(Callable<V> callable) { if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable }
這個構造函數會把傳入的Callable變量保存在this.callable字段中,該字段定義爲private Callable<V> callable;用來保存底層的調用,在被執行完成之後會指向null,接着會初始化state字段爲NEW。state字段用來保存FutureTask內部的任務執行狀態,一共有7中狀態,每種狀態及其對應的值以下:
private volatile int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private static final int EXCEPTIONAL = 3; private static final int CANCELLED = 4; private static final int INTERRUPTING = 5; private static final int INTERRUPTED = 6;
其中須要注意的是state是volatile類型的,也就是說只要有任何一個線程修改了這個變量,那麼其餘全部的線程都會知道最新的值。
爲了後面更好的分析FutureTask的實現,這裏有必要解釋下各個狀態。
有一點須要注意的是,全部值大於COMPLETING的狀態都表示任務已經執行完成(任務正常執行完成,任務執行異常或者任務被取消)。
各個狀態之間的可能轉換關係以下圖所示:
另一個構造函數以下,
public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable }
這個構造函數會把傳入的Runnable封裝成一個Callable對象保存在callable字段中,同時若是任務執行成功的話就會返回傳入的result。這種狀況下若是不須要返回值的話能夠傳入一個null。
順帶看下Executors.callable()這個方法,這個方法的功能是把Runnable轉換成Callable,代碼以下:
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); }
能夠看到這裏採用的是適配器模式,調用RunnableAdapter<T>(task, result)方法來適配,實現以下:
static final class RunnableAdapter<T> implements Callable<T> { final Runnable task; final T result; RunnableAdapter(Runnable task, T result) { this.task = task; this.result = result; } public T call() { task.run(); return result; } }
這個適配器很簡單,就是簡單的實現了Callable接口,在call()實現中調用Runnable.run()方法,而後把傳入的result做爲任務的結果返回。
在new了一個FutureTask對象以後,接下來就是在另外一個線程中執行這個Task,不管是經過直接new一個Thread仍是經過線程池,執行的都是run()方法,接下來就看看run()方法的實現。
run()
run()方法實現以下:
public void run() { // 1. 狀態若是不是NEW,說明任務或者已經執行過,或者已經被取消,直接返回 // 2. 狀態若是是NEW,則嘗試把當前執行線程保存在runner字段中 // 若是賦值失敗則直接返回 if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { // 3. 執行任務 result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; // 4. 任務異常 setException(ex); } if (ran) // 4. 任務正常執行完畢 set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; // 5. 若是任務被中斷,執行中斷處理 if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
run()方法首先會
若是任務執行發生異常,則調用setException()方法保存異常信息。setException()方法以下:
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
在setException()方法中
5. 若是任務成功執行則調用set()方法設置執行結果,該方法實現以下:
protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
這個方法跟上面分析的setException()差很少,
發起任務線程跟執行任務線程一般狀況下都不會是同一個線程,在任務執行線程執行任務的時候,任務發起線程能夠查看任務執行狀態、獲取任務執行結果、取消任務等等操做,接下來分析下這些操做。
get()
任務發起線程能夠調用get()方法來獲取任務執行結果,若是此時任務已經執行完畢則會直接返回任務結果,若是任務還沒執行完畢,則調用方會阻塞直到任務執行結束返回結果爲止。get()方法實現以下:
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
get()方法實現比較簡單,會
awaitDone()
當調用get()獲取任務結果可是任務還沒執行完成的時候,調用線程會調用awaitDone()方法進行阻塞等待,該方法定義以下:
private int awaitDone(boolean timed, long nanos) throws InterruptedException { // 計算等待截止時間 final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { // 1. 判斷阻塞線程是否被中斷,若是被中斷則在等待隊 // 列中刪除該節點並拋出InterruptedException異常 if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } // 2. 獲取當前狀態,若是狀態大於COMPLETING // 說明任務已經結束(要麼正常結束,要麼異常結束,要麼被取消) // 則把thread顯示置空,並返回結果 int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } // 3. 若是狀態處於中間狀態COMPLETING // 表示任務已經結束可是任務執行線程還沒來得及給outcome賦值 // 這個時候讓出執行權讓其餘線程優先執行 else if (s == COMPLETING) // cannot time out yet Thread.yield(); // 4. 若是等待節點爲空,則構造一個等待節點 else if (q == null) q = new WaitNode(); // 5. 若是尚未入隊列,則把當前節點加入waiters首節點並替換原來waiters else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 若是須要等待特定時間,則先計算要等待的時間 // 若是已經超時,則刪除對應節點並返回對應的狀態 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } // 6. 阻塞等待特定時間 LockSupport.parkNanos(this, nanos); } else // 6. 阻塞等待直到被其餘線程喚醒 LockSupport.park(this); } }
awaitDone()中有個死循環,每一次循環都會
假設當前state=NEW且waiters爲NULL,也就是說尚未任何一個線程調用get()獲取執行結果,這個時候有兩個線程threadA和threadB前後調用get()來獲取執行結果。再假設這兩個線程在加入阻塞隊列進行阻塞等待以前任務都沒有執行完成且threadA和threadB都沒有被中斷的狀況下(由於若是threadA和threadB在進行阻塞等待結果以前任務就執行完成或線程自己被中斷的話,awaitDone()就執行結束返回了),執行過程是這樣的,以threadA爲例:
在threadA和threadB都阻塞等待以後的waiters結果如圖
cancel()方法會作下面幾件事:
1 .判斷任務當前執行狀態,若是任務狀態不爲NEW,則說明任務或者已經執行完成,或者執行異常,不能被取消,直接返回false表示執行失敗。
2. 判斷須要中斷任務執行線程,則
3. 若是不須要中斷任務執行線程,直接把任務狀態從NEW轉化爲CANCELLED。若是轉化失敗則返回false表示取消失敗。這個轉換過程對應上圖中的四。
4. 調用finishCompletion()。
finishCompletion()
根據前面的分析,不論是任務執行異常仍是任務正常執行完畢,或者取消任務,最後都會調用finishCompletion()方法,該方法實現以下:
private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
這個方法的實現比較簡單,依次遍歷waiters鏈表,喚醒節點中的線程,而後把callable置空。
被喚醒的線程會各自從awaitDone()方法中的LockSupport.park*()阻塞中返回,而後會進行新一輪的循環。在新一輪的循環中會返回執行結果(或者更確切的說是返回任務的狀態)。
report()
report()方法用在get()方法中,做用是把不一樣的任務狀態映射成任務執行結果。實現以下:
private V report(int s) throws ExecutionException { Object x = outcome; // 1. 任務正常執行完成,返回任務執行結果 if (s == NORMAL) return (V)x; // 2. 任務被取消,拋出CancellationException異常 if (s >= CANCELLED) throw new CancellationException(); // 3. 其餘狀態,拋出執行異常ExecutionException throw new ExecutionException((Throwable)x); }
映射關係以下圖所示:
若是任務處於NEW、COMPLETING和INTERRUPTING這三種狀態的時候是執行不到report()方法的,因此不必對這三種狀態進行轉換。
get(long,TimeUnit)
帶超時等待的獲取任務結果,實現以下:
public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { if (unit == null) throw new NullPointerException(); int s = state; if (s <= COMPLETING && // 若是awaitDone()超時返回以後任務還沒結束,則拋出異常 (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING) throw new TimeoutException(); return report(s); }
跟get()不一樣點在於get(long,TimeUnit)會在awaitDone()超時返回以後拋出TimeoutException異常。
isCancelled()和isDone()
這兩個方法分別用來判斷任務是否被取消和任務是否執行完成,實現都比較簡單,代碼以下:
public boolean isCancelled() { return state >= CANCELLED; } public boolean isDone() { return state != NEW; }
根據前面的分析,這兩個方法就很容易理解不用多作解釋了,O(∩_∩)O。
總結下,其實FutureTask的實現仍是比較簡單的,當用戶實現Callable()接口定義好任務以後,把任務交給其餘線程進行執行。FutureTask內部維護一個任務狀態,任何操做都是圍繞着這個狀態進行,並隨時更新任務狀態。任務發起者調用get*()獲取執行結果的時候,若是任務尚未執行完畢,則會把本身放入阻塞隊列中而後進行阻塞等待。當任務執行完成以後,任務執行線程會依次喚醒阻塞等待的線程。調用cancel()取消任務的時候也只是簡單的修改任務狀態,若是須要中斷任務執行線程的話則調用Thread.interrupt()中斷任務執行線程。
有個值得關注的問題就是當任務還在執行的時候用戶調用cancel(true)方法可否真正讓任務中止執行呢?
在前面的分析中咱們直到,當調用cancel(true)方法的時候,實際執行仍是Thread.interrupt()方法,而interrupt()方法只是設置中斷標誌位,若是被中斷的線程處於sleep()、wait()或者join()邏輯中則會拋出InterruptedException異常。
所以結論是:cancel(true)並不必定可以中止正在執行的異步任務。