JDK future框架,提供了一種異步編程模式,基於線程池的。將任務runnable/callable提交到線程池executor,返回一個Future對象。經過future.get()獲取執行結果,這裏提交到線程池,後面的操做不會阻塞。future.get()獲取結果會阻塞,其實也是用多線線程執行任務。java
future.get()這裏會阻塞,google的guava提供了一個calllback解決辦法,這也是我準備看的編程
下面是一個future的demo框架
import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; /** * @Author: <guanxianseng@163.com> * @Description: jdk future demo * @Date: Created in : 2018/11/30 6:01 PM **/ public class TestJdkFuture { public static void main(String[] args) throws ExecutionException, InterruptedException { testJdkFuture(); } public static void testJdkFuture() throws ExecutionException, InterruptedException { Callable<Integer> callable = () ->{ System.out.println("callable do some compute"); return 1; } ; Runnable runnable = () -> { System.out.println("runable do some compute"); }; ExecutorService executorService = Executors.newCachedThreadPool(); Future<Integer> future = executorService.submit(callable); Future runableFuture = executorService.submit(runnable); Object runableRes = runableFuture.get(); int res = future.get(); executorService.shutdown(); System.out.println("callable res: " + res); System.out.println("runnable res: " + runableRes); } }
demo裏面提交了一個Callable和一個Runnable到線程池,經過future獲取計算結果less
executorService.submit(callable)源碼異步
public <T> Future<T> submit(Callable<T> task) { if (task == null) throw new NullPointerException(); RunnableFuture<T> ftask = newTaskFor(task); execute(ftask); return ftask; }
這裏主要是封裝了一個RunnableFuture和提交任務到線程池異步編程
咱們看下RunnableFuture裏面的run方法,由於線程池執行任務,是執行run方法。看FutureTask裏面的run方法this
public void run() { if (state != NEW || !UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())) return; try { Callable<V> c = callable; if (c != null && state == NEW) { V result; boolean ran; try { result = c.call(); ran = true; } catch (Throwable ex) { result = null; ran = false; setException(ex); } if (ran) set(result); } } finally { // runner must be non-null until state is settled to // prevent concurrent calls to run() runner = null; // state must be re-read after nulling runner to prevent // leaked interrupts int s = state; if (s >= INTERRUPTING) handlePossibleCancellationInterrupt(s); } }
首先,設置runner爲線程池中的當前線程,後面執行call()方法,計算出結果,set(result)。跟進set(result)google
/** * Sets the result of this future to the given value unless * this future has already been set or has been cancelled. * * <p>This method is invoked internally by the {@link #run} method * upon successful completion of the computation. * * @param v the value */ protected void set(V v) { if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { outcome = v; UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state finishCompletion(); } }
這裏用了一個cas設置FutureTask的state字段爲COMPLETING,完成中的一個狀態。接着設置outcom爲計算結果,咱們跟進finishCompletion()spa
/** * Removes and signals all waiting threads, invokes done(), and * nulls out callable. */ private void finishCompletion() { // assert state > COMPLETING; for (WaitNode q; (q = waiters) != null;) { if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) { for (;;) { Thread t = q.thread; if (t != null) { q.thread = null; LockSupport.unpark(t); } WaitNode next = q.next; if (next == null) break; q.next = null; // unlink to help gc q = next; } break; } } done(); callable = null; // to reduce footprint }
WaitNode主要是一個阻塞線程鏈表,即調用future.get()方法的線程鏈表。這裏主要做用是注意喚醒這些線程,經過LockSupport.unpark(t)喚醒。這裏用阻塞線程鏈表,主要是考慮到可能有多個線程會調用future.get()阻塞線程
ok,到這裏執行任務,把計算結果放到future中,並喚醒阻塞線程已經理清楚了
咱們再來看下,future.get()是如何實現阻塞,和獲取到計算結果的
進入future.get()
/** * Waits if necessary for the computation to complete, and then * retrieves its result. * * @return the computed result * @throws CancellationException if the computation was cancelled * @throws ExecutionException if the computation threw an * exception * @throws InterruptedException if the current thread was interrupted * while waiting */ V get() throws InterruptedException, ExecutionException;
Future.java是一個接口,這裏看註釋能夠看出,會阻塞等待結果計算完成。咱們看下一個實現FutureTask.java的get()方法
/** * @throws CancellationException {@inheritDoc} */ public V get() throws InterruptedException, ExecutionException { int s = state; if (s <= COMPLETING) s = awaitDone(false, 0L); return report(s); }
awaitDone(false, 0l)主要是阻塞線程,進入方法
/** * Awaits completion or aborts on interrupt or timeout. * * @param timed true if use timed waits * @param nanos time to wait, if timed * @return state upon completion */ private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet Thread.yield(); else if (q == null) q = new WaitNode(); else if (!queued) queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else LockSupport.park(this); } }
這裏,主要是阻塞線程,把當前線程放到阻塞線程鏈表中,經過LockSupport.park(this)阻塞當前線程,等待線程池裏面的線程喚醒。喚醒以後,回到get()方法,看report()方法
/** * Returns result or throws exception for completed task. * * @param s completed state value */ @SuppressWarnings("unchecked") private V report(int s) throws ExecutionException { Object x = outcome; if (s == NORMAL) return (V)x; if (s >= CANCELLED) throw new CancellationException(); throw new ExecutionException((Throwable)x); }
這裏主要是返回計算結果給阻塞線程
到這裏,基本理清楚了,future的阻塞實現,以及獲取計算結果的步驟。
future框架 = 線程池 + futrue