1. 疑惑java
Future f = executor.submit(callable);異步
線程池執行任務的時候, 爲了高效,使用的都是異步執行方式;上面這種形式是異步執行的,可是使用 f.get()方法獲取結果時又是阻塞的, 這是怎麼實現的呢?ide
2. 分析this
從使用方式上咱們能夠推測出幾點:線程
進入線程池中的任務(callable), 應該被放到Future中執行了;code
Future執行任務是在新的線程中執行(這樣纔不會阻塞原來的線程);orm
get方法調用時,阻塞主線程, 當任務執行完成後, 才取消主線程的阻塞;get
3. 實現it
/** * future有幾種狀態:new(新建立), completing(正在執行), normal(執行完畢) */ public class MyFuture<T> implements Runnable { private int state; private static final int NEW = 0; private static final int COMPLETING = 1; private static final int NORMAL = 2; private Callable<T> callable; private Object result; private WaitNode waiters; public MyFuture(Callable<T> callable) { this.callable = callable; this.state = NEW; } public T get() throws InterruptedException, ExecutionException{ int s = state; //若不是‘完成’狀態, 則阻塞當前線程 if (s <= COMPLETING) { s = await(); } return report(s); } private int await() { WaitNode q = null; boolean queued = false; for(;;) { if (state > COMPLETING) { //若是是完成狀態, 則直接返回 return state; } else if (q == null) { q = new WaitNode(); } else if (!queued) { queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); } else { //阻塞當前線程 LockSupport.park(this); } } } private T report(int s) throws ExecutionException{ Object x = result; if (s == NORMAL) { return (T)x; } throw new ExecutionException((Throwable)x); } @Override public void run() { //此方法是有線程池中線程調用, 主線程阻塞, 不會影響這個方法的執行 try { if (callable != null && state == NEW) { //任務的執行 T v = callable.call(); setResult(v); } } catch (Exception e) { e.printStackTrace(); } } private void setResult(T result) { if (UNSAFE.compareAndSwapLong(this, stateOffset, state, COMPLETING)) { this.result = result; UNSAFE.compareAndSwapLong(this, stateOffset, state, NORMAL); finishComplete(); } } private void finishComplete() { for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; //取消阻塞 LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; q = next; } break; } } callable = null; } static final class WaitNode { Thread thread; WaitNode next; WaitNode() { this.thread = Thread.currentThread(); } } private static sun.misc.Unsafe UNSAFE; private static long stateOffset; private static long waitersOffset; static { try { Field f = sun.misc.Unsafe.class.getDeclaredField("theUnsafe"); f.setAccessible(true); UNSAFE = (Unsafe)f.get(null); Class myFutureClass = MyFuture.class; stateOffset = UNSAFE.objectFieldOffset(myFutureClass.getDeclaredField("state")); waitersOffset = UNSAFE.objectFieldOffset(myFutureClass.getDeclaredField("waiters")); } catch (Exception e) { e.printStackTrace(); } } }