CompletableFuture 是Java 8 新增長的Api,該類實現,Future和CompletionStage兩個接口,提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力,能夠經過回調的方式處理計算結果,而且提供了轉換和組合CompletableFuture的方法。java
public T get()編程
該方法爲阻塞方法,會等待計算結果完成app
public T get(long timeout,TimeUnit unit)dom
有時間限制的阻塞方法異步
public T getNow(T valueIfAbsent)函數式編程
當即獲取方法結果,若是沒有計算結束則返回傳的值異步編程
public T join()函數
和 get() 方法相似也是主動阻塞線程,等待計算結果。和get() 方法有細微的差異線程
public class ThreadUtil { public static void sleep(long ms) { try { Thread.sleep(ms); } catch (InterruptedException e) { e.printStackTrace(); throw new RuntimeException(e.getMessage()); } } }
public static void main(String[] args) { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { ThreadUtil.sleep(200); return 10 / 0; }); // System.out.println(future.join()); // System.out.println(future.get()); System.out.println(future.getNow(10)); }
public boolean complete(T value)code
當即完成計算,並把結果設置爲傳的值,返回是否設置成功
若是 CompletableFuture 沒有關聯任何的Callback、異步任務等,若是調用get方法,那會一直阻塞下去,可使用complete方法主動完成計算
public static void main(String[] args) throws Exception { CompletableFuture<Integer> future = new CompletableFuture<>(); // future.get(); future.complete(10); }
建立一個異步任務
public static <U> CompletableFuture<U> completedFuture(U value)
建立一個有初始值的CompletableFuture
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以上四個方法中,以 Async 結尾而且沒有 Executor 參數的,會默認使用 ForkJoinPool.commonPool() 做爲它的線程池執行異步代碼。 以run開頭的,由於以 Runable 類型爲參數因此沒有返回值。示例:
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("runAsync")); CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "supplyAsync"); System.out.println(future1.get()); System.out.println(future2.get()); }
結果:
runAsync null supplyAsync
當CompletableFuture的計算結果完成,或者拋出異常的時候,咱們能夠執行特定的Action。主要是下面的方法:
參數類型爲 BiConsumer<? super T, ? super Throwable> 會獲取上一步計算的計算結果和異常信息。以Async結尾的方法可能會使用其它的線程去執行,若是使用相同的線程池,也可能會被同一個線程選中執行,如下皆相同。
public static void main(String[] args) throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { ThreadUtil.sleep(100); return 20; }).whenCompleteAsync((v, e) -> { System.out.println(v); System.out.println(e); }); System.out.println(future.get()); }
public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
該方法是對異常狀況的處理,當函數異常時應該的返回值
public static void main(String[] args) throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { ThreadUtil.sleep(100); return 10 / 0; }).whenCompleteAsync((v, e) -> { System.out.println(v); System.out.println(e); }).exceptionally((e) -> { System.out.println(e.getMessage()); return 30; }); System.out.println(future.get()); }
handle 方法和whenComplete方法相似,只不過接收的是一個 BiFunction<? super T,Throwable,? extends U> fn 類型的參數,所以有 whenComplete 方法和 轉換的功能 (thenApply)
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture .supplyAsync(() -> 10 / 0) .handle((t, e) -> { System.out.println(e.getMessage()); return 10; }); System.out.println(future.get()); }
CompletableFuture 因爲有回調,能夠沒必要等待一個計算完成而阻塞着調用縣城,能夠在一個結果計算完成以後緊接着執行某個Action。咱們能夠將這些操做串聯起來。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture .supplyAsync(() -> 1) .thenApply((a) -> { System.out.println(a);//1 return a * 10; }).thenApply((a) -> { System.out.println(a);//10 return a + 10; }).thenApply((a) -> { System.out.println(a);//20 return a - 5; }); System.out.println(future.get());//15 }
這些方法不是立刻執行的,也不會阻塞,而是前一個執行完成後繼續執行下一個。
和 handle 方法的區別是,handle 會處理正常計算值和異常,不會拋出異常。而 thenApply 只會處理正常計算值,有異常則拋出。
單純的去消費結果而不會返回新的值,因些計算結果爲 Void;
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Void> future = CompletableFuture .supplyAsync(() -> 1) .thenAccept(System.out::println) //消費 上一級返回值 1 .thenAcceptAsync(System.out::println); //上一級沒有返回值 輸出null System.out.println(future.get()); //消費函數沒有返回值 輸出null }
和 thenAccept 相比,參數類型多了一個 CompletionStage<? extends U> other,以上方法會接收上一個CompletionStage返回值,和當前的一個。
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture .supplyAsync(() -> 1) .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2), (a, b) -> { System.out.println(a); System.out.println(b); }).get(); }
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterBoth 和以上方法不一樣,傳一個 Runnable 類型的參數,不接收上一級的返回值
更完全的:
以上是完全的純消費,徹底忽略計算結果
以上接收類型爲 Function<? super T,? extends CompletionStage<U>> fn ,fn 接收上一級返回的結果,並返回一個新的 CompletableFuture
public static void main(String[] args) throws ExecutionException, InterruptedException { CompletableFuture<Integer> future = CompletableFuture .supplyAsync(() -> 1) .thenApply((a) -> { ThreadUtil.sleep(1000); return a + 10; }) .thenCompose((s) -> { System.out.println(s); //11 return CompletableFuture.supplyAsync(() -> s * 5); }); System.out.println(future.get());//55 }
兩個CompletionStage是並行執行的,它們之間並無前後依賴順序,other並不會等待先前的CompletableFuture執行完畢後再執行。
public static void main(String[] args) throws ExecutionException, InterruptedException { Random random = new Random(); CompletableFuture<Integer> future = CompletableFuture .supplyAsync(() -> { ThreadUtil.sleep(random.nextInt(1000)); System.out.println("supplyAsync"); return 2; }).thenApply((a) -> { ThreadUtil.sleep(random.nextInt(1000)); System.out.println("thenApply"); return a * 3; }) .thenCombine(CompletableFuture.supplyAsync(() -> { ThreadUtil.sleep(random.nextInt(1000)); System.out.println("thenCombineAsync"); return 10; }), (a, b) -> { System.out.println(a); System.out.println(b); return a + b; }); System.out.println(future.get()); }
thenCombine 和 supplyAsync 不必定哪一個先哪一個後,是並行執行的。
先看示例:
public static void main(String[] args) throws ExecutionException, InterruptedException { Random random = new Random(); CompletableFuture .supplyAsync(() -> { ThreadUtil.sleep(random.nextInt(1000)); return "A"; }) .acceptEither(CompletableFuture.supplyAsync(() -> { ThreadUtil.sleep(random.nextInt(1000)); return "B"; }), System.out::println) .get(); }
以上代碼有時輸出A,有時輸出B,哪一個Future先執行完就會根據它的結果計算。
acceptEither方法是當任意一個 CompletionStage 完成的時候,action 這個消費者就會被執行。這個方法返回 CompletableFuture<Void>
applyToEither 方法是當任意一個 CompletionStage 完成的時候,fn會被執行,它的返回值會看成新的CompletableFuture<U>的計算結果。
acceptEither 沒有返回值,applyToEither 有返回值
這個方法的意思是把有方法都執行完才往下執行,沒有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException { Random random = new Random(); CompletableFuture.allOf( CompletableFuture.runAsync(() -> { ThreadUtil.sleep(random.nextInt(1000)); System.out.println(1); }), CompletableFuture.runAsync(() -> { ThreadUtil.sleep(random.nextInt(1000)); System.out.println(2); })) .get(); }
有時輸出1 2 有時輸出 2 1
任務一個方法執行完都往下執行,返回一個Object類型的值
public static void main(String[] args) throws ExecutionException, InterruptedException { Random random = new Random(); Object obj = CompletableFuture.anyOf( CompletableFuture.supplyAsync(() -> { ThreadUtil.sleep(random.nextInt(1000)); return 1; }), CompletableFuture.supplyAsync(() -> { ThreadUtil.sleep(random.nextInt(1000)); return 2; })).get(); System.out.println(obj); }
輸出結果有時爲1 有時間爲 2