本文將主要講解 J.U.C 中的 Future 框架,並分析結合源碼分析其內部結構邏輯;java
JDK 中的 Future 框架實際就是 Future 模式的實現,一般狀況下咱們會配合線程池使用,但也能夠單獨使用;下面咱們就單獨使用簡單舉例;算法
FutureTask<String> future = new FutureTask<>(() -> { log.info("異步任務執行..."); Thread.sleep(2000); log.info("過了好久好久..."); return "異步任務完成"; }); log.info("啓動異步任務..."); new Thread(future).start(); log.info("繼續其餘任務..."); Thread.sleep(1000); log.info("獲取異步任務結果:{}", future.get());
打印:編程
[15:38:03,231 INFO ] [main] - 啓動異步任務... [15:38:03,231 INFO ] [main] - 繼續其餘任務... [15:38:03,231 INFO ] [Thread-0] - 異步任務執行... [15:38:05,232 INFO ] [Thread-0] - 過了好久好久... [15:38:05,236 INFO ] [main] - 獲取異步任務結果:異步任務完成
如上面代碼所示,首先咱們將要執行的任務包裝成 Callable
,這裏若是不須要返回值也可使用 Runnable
;而後構建 FutureTask
由一個線程啓動,最後使用 Future.get()
獲取異步任務結果;併發
對於 Future 模式的流程圖以下:框架
對比上面的實例代碼,你們可能會發現有些不同,由於在 FutureTask 同時繼承了 Runnable 和 Future 接口,因此再提交任務後沒有返回Future,而是直接使用自身調用 get;下面咱們就對源碼進行實際分析;異步
public interface RunnableFuture<V> extends Runnable, Future<V> {} public class FutureTask<V> implements RunnableFuture<V> { private volatile int state; // 任務運行狀態 private Callable<V> callable; // 異步任務 private Object outcome; // 返回結果 private volatile Thread runner; // 異步任務執行線程 private volatile WaitNode waiters; // 等待異步結果的線程棧(經過Treiber stack算法實現) public FutureTask(Callable<V> callable) { // 須要返回值 if (callable == null) throw new NullPointerException(); this.callable = callable; this.state = NEW; // ensure visibility of callable } public FutureTask(Runnable runnable, V result) { this.callable = Executors.callable(runnable, result); this.state = NEW; // ensure visibility of callable } ... }
另外在代碼中還能夠看見有不少地方都是用了 CAS
來更新變量,而 JDK1.6 中甚至使用了 AQS
來實現;其緣由就是同一個 FutureTask
能夠多個線程同時提交,也能夠多個線程同時獲取; 因此代碼中有不少的狀態變量:源碼分析
// FutureTask.state 取值 private static final int NEW = 0; // 初始化到結果返回前 private static final int COMPLETING = 1; // 結果賦值 private static final int NORMAL = 2; // 執行完畢 private static final int EXCEPTIONAL = 3; // 執行異常 private static final int CANCELLED = 4; // 任務取消 private static final int INTERRUPTING = 5; // 設置中斷狀態 private static final int INTERRUPTED = 6; // 任務中斷
同時源碼的註釋中也詳細給出了可能出現的狀態轉換:this
注意這裏的 COMPLETING
狀態是一個很微妙的狀態,正由於有他的存在才能實現無鎖賦值;你們先留意這個狀態,而後在代碼中應該能體會到;另外這裏還有一個變量須要注意,WaitNode
;使用 Treiber stack 算法實現的無鎖棧;其原理說明能夠參考下面第三節;線程
public void run() { if (state != NEW || // 確保任務執行完成後,再也不重複執行 !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) // 確保只有一個線程執行 return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); // 設置異常結果 } if (ran) set(result); // 設置結果 } } finally { runner = null; int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); // 確保中斷狀態已經設置 } }
// 設置異步任務結果 protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保證結果只能設置一次 outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); // 喚醒等待線程 } }
protected void setException(Throwable t) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // 保證結果只能設置一次 outcome = t; UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state finishCompletion(); } }
public boolean cancel(boolean mayInterruptIfRunning) { if (!(state == NEW && // 只有在任務執行階段才能取消 UNSAFE.compareAndSwapInt(this, stateOffset, NEW, // 設置取消狀態 mayInterruptIfRunning ? INTERRUPTING : CANCELLED))) return false; try { // in case call to interrupt throws exception if (mayInterruptIfRunning) { try { Thread t = runner; if (t != null) t.interrupt(); } finally { // final state UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); } } } finally { finishCompletion(); } return true; }
注意 cancel(false)
也就是僅取消,並無打斷;異步任務會繼續執行,只是這裏首先設置了 FutureTask.state = CANCELLED
,因此最後在設置結果的時候會失敗,UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)
;code
public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); // 阻塞等待 return report(s); } private V report(int s) throws ExecutionException { // 根據最後的狀態返回結果 Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); // 移除等待節點 throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 任務已完成 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // 正在賦值,直接先出讓線程 Thread.yield(); else if (q == null) // 任務還未完成須要等待 q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); // 使用 Treiber stack 算法 else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
在《Java 併發編程實戰》中講了, 建立非阻塞算法的關鍵在於,找出如何將原子修改的範圍縮小到單個變量上,同時還要維護數據的一致性 。
@ThreadSafe public class ConcurrentStack <E> { AtomicReference<Node<E>> top = new AtomicReference<>(); private static class Node <E> { public final E item; public Node<E> next; public Node(E item) { this.item = item; } } public void push(E item) { Node<E> newHead = new Node<>(item); Node<E> oldHead; do { oldHead = top.get(); newHead.next = oldHead; } while (!top.compareAndSet(oldHead, newHead)); } public E pop() { Node<E> oldHead; Node<E> newHead; do { oldHead = top.get(); if (oldHead == null) return null; newHead = oldHead.next; } while (!top.compareAndSet(oldHead, newHead)); return oldHead.item; } }