Java5中,Future以及相關使用方法提供了異步執行任務的能力,但對於結果的獲取卻不是很方便,只能經過get()方法阻塞住調用線程直至計算完成返回結果或者isDone()方法輪詢的方式獲得任務結果,也能夠用cancel方法來中止任務的執行,阻塞的方式與咱們理解的異步編程實際上是相違背的,而輪詢又會耗無謂的CPU資源,並且還不能及時獲得計算結果,爲何不能用觀察者設計模式當計算結果完成及時通知監聽者呢?java
不少語言像Node.js,採用回調的方式實現異步編程。Java的一些框架像Netty,本身擴展Java的Future接口,提供了addListener等多個擴展方法:編程
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); future.addListener((channelFuture) -> { if (channelFuture.isSuccess()) { // SUCCESS } else { // FAILURE } });
guava裏面也提供了通用的擴展Future: ListenableFuture\SettableFuture以及輔助類Futures等,方便異步編程。bootstrap
做爲正統Java類庫,是否是應該加點什麼特性,能夠增強一下自身庫的功能?設計模式
Java8裏面新增長了一個包含50個方法左右的類:CompletableFuture。app
CompletableFuture類實現了CompletionStage和Future接口。提供了很是強大的Future的擴展功能,能夠幫助簡化異步編程的複雜性,提供了函數式編程能力,能夠經過回調的方式計算處理結果,而且提供了轉換和組織CompletableFuture的方法。框架
CompletableFuture 類實現了CompletionStage和Future接口,因此仍是能夠像之前同樣經過阻塞或輪詢的方式得到結果。儘管這種方式不推薦使用。異步
public T get() public T get(long timeout, TimeUnit unit) public T getNow(T valueIfAbsent) public T join()
其中的getNow有點特殊,若是結果已經計算完則返回結果或拋異常,不然返回給定的valueIfAbsent的值。 join返回計算的結果或拋出一個uncheckd異常。函數式編程
CompletionStage是一個接口,從命名上看得知是一個完成的階段,它裏面的方法也標明是在某個運行階段獲得告終果以後要作的事情。異步編程
public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor);
以Async結尾的方法都是能夠異步執行的,若是指定了線程池,會在指定的線程池中執行,若是沒有指定,默認會在ForkJoinPool.commonPool()中執行,下文中將會有好多相似的,都不詳細解釋了。關鍵的入參只有一個Function,它是函數式接口,因此使用Lambda表示起來會更加優雅。它的入參是上一個階段計算後的結果,返回值是通過轉化後結果。函數
例如:
@Test public void thenApply() { String result = CompletableFuture.supplyAsync(() -> "hello").thenApply(s -> s + " world").join(); System.out.println(result); // hello world }
public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor);
thenAccept是針對結果進行消耗,由於他的入參是Consumer,有入參無返回值。
例如:
@Test public void thenAccept(){ CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s + " world")); }
public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor);
thenRun它的入參是一個Runnable的實例,表示當獲得上一步的結果時的操做。
例如:
@Test public void thenRun(){ CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenRun(() -> System.out.println("hello world")); // hello world while (true){} }
public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor);
它須要原來的處理返回值,而且other表明的CompletionStage也要返回值以後,利用這兩個返回值,進行轉換後返回指定類型的值。 例如:
@Test public void thenCombine() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenCombine(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }), (s1, s2) -> s1 + " " + s2).join(); System.out.println(result); // hello world }
public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor);
它須要原來的處理返回值,而且other表明的CompletionStage也要返回值以後,利用這兩個返回值,進行消耗。
例如:
@Test public void thenAcceptBoth() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }), (s1, s2) -> System.out.println(s1 + " " + s2)); // hello world while (true){} }
public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor);
不關心這兩個CompletionStage的結果,只關心這兩個CompletionStage執行完畢,以後在進行操做(Runnable)。
例如:
@Test public void runAfterBoth(){ CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s2"; }), () -> System.out.println("hello world")); // hello world while (true){} }
public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor);
咱們現實開發場景中,總會碰到有兩種渠道完成同一個事情,因此就能夠調用這個方法,找一個最快的結果進行處理。
例如:
@Test public void applyToEither() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }), s -> s).join(); System.out.println(result); // hello world }
public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor);
例如:
@Test public void acceptEither() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).acceptEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }), System.out::println); // hello world while (true){} }
public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor);
例如:
@Test public void runAfterEither() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).runAfterEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "s2"; }), () -> System.out.println("hello world")); // hello world while (true) { } }
public CompletionStage<T> exceptionally(Function<Throwable, ? extends T> fn);
例如:
@Test public void exceptionally() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (1 == 1) { throw new RuntimeException("測試一下異常狀況"); } return "s1"; }).exceptionally(e -> { System.out.println(e.getMessage()); // java.lang.RuntimeException: 測試一下異常狀況 return "hello world"; }).join(); System.out.println(result); // hello world }
public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor);
例如:
@Test public void whenComplete() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (1 == 1) { throw new RuntimeException("測試一下異常狀況"); } return "s1"; }).whenComplete((s, t) -> { System.out.println(s); System.out.println(t.getMessage()); }).exceptionally(e -> { System.out.println(e.getMessage()); return "hello world"; }).join(); System.out.println(result); }
結果:
null java.lang.RuntimeException: 測試一下異常狀況 java.lang.RuntimeException: 測試一下異常狀況 hello world
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor);
例如: 出現異常時
@Test public void handle() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //出現異常 if (1 == 1) { throw new RuntimeException("測試一下異常狀況"); } return "s1"; }).handle((s, t) -> { if (t != null) { return "hello world"; } return s; }).join(); System.out.println(result); // hello world }
未出現異常時
@Test public void handle() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).handle((s, t) -> { if (t != null) { return "hello world"; } return s; }).join(); System.out.println(result); // s1 }
上面就是CompletionStage接口中方法的使用實例,CompletableFuture一樣也一樣實現了Future,因此也一樣可使用get進行阻塞獲取值,總的來講,CompletableFuture使用起來仍是比較爽的,看起來也比較優雅一點。