java8中加入了這個CompletableFuture對象,聽說用起來會比較爽,Stream和CompletableFuture的設計都遵循了相似的模式:它們都使用了Lambda表達式以及流水線的思想。看oracle給出的說明,對於下面這樣的查詢數據庫的操做java
可是假設查詢的數據庫操做比較慢,咱們想另起一個線程去執行,等執行完成的時候,打印出來結果,之前咱們可使用Callable,用ExcutorService執行。git
可是這裏有個問題是,當調用future的get()方法獲取結果,可是會阻塞當前調用的線程;它很難直接表述多個Future 結果之間的依賴性,實際開發中,咱們常常須要達成這種目的,原來的Future接口的方法並很少。CompletionStage就應運而生了,而後咱們重寫上面的執行方法:github
用CompletableFuture(實現了CompletionStage接口)的supplyAsync()方法取代了ExecutorService的submit(),並且能夠把Executor做爲第二個參數,提供執行的線程池的選擇,沒有阻塞當前線程。固然CompletableFuture也實現了Future接口,因此也一樣可使用get進行阻塞獲取值。task能夠是正在運行,或者已經完成返回結果,或者異常。數據庫
建立一個已完成的CompletableFuture,看起來很奇怪,爲何這麼寫,聽說對於用於測試環境是頗有用的,暴露不太寫單元測試了。數組
從task建立一個CompletableFuture,這裏有兩種方式從一個Runnable或者supplier,固然還有第二參數是線程池的方法,runAsync()易於理解,注意它須要Runnable,所以它返回CompletableFuture<Void>做爲Runnable不返回任何值。若是你須要處理異步操做並返回結果,使用Supplier<U>oracle
若是沒有提供ExecutorService,會使用fork/join pool框架執行,這種方式也用於streams的並行操做,後續咱們寫的demo也是相似。關鍵的入參只有一個Function,它是函數式接口,因此使用Lambda表示起來會更加優雅。task能夠是Function,Consumer或者Runnable。app
public static void useWithThread() throws Exception{ CompletableFuture<Integer> completableFuture = new CompletableFuture(); new Thread(new Runnable() { @Override public void run() { //模擬執行耗時任務 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //告訴completableFuture任務已經完成 System.out.println(Thread.currentThread()); completableFuture.complete(121); } }).start(); System.out.println(Thread.currentThread()); //獲取任務結果,若是沒有完成會一直阻塞等待 //若是發生異常 會被限制在執行任務的線程的範圍內,最終會殺死該線程,而這會致使等待 get 方法返回結果的線程永久地被阻塞。 //客戶端可使用重載版本的 get 方法,它使用一個超時參數來避免發生這樣的狀況 Integer result=completableFuture.get(); //使用這種方法至少能防止程序永久地等待下去,超時發生時,程序會獲得通知發生了 Timeout-Exception 。 // 不過,也由於如此,你不能指定執行任務的線程內到底發生了什麼問題。 // Integer result=completableFuture.get(100, TimeUnit.SECONDS); System.out.println("計算結果:"+result); } public static void useWithThreadCompleteException() throws Exception{ CompletableFuture<String> completableFuture=new CompletableFuture(); new Thread(new Runnable() { @Override public void run() { try { //模擬執行耗時任務 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } throw new RuntimeException("拋異常了"); }catch (Exception e) { //告訴completableFuture任務發生異常了 completableFuture.completeExceptionally(e); } } }).start(); String result=completableFuture.get(5, TimeUnit.SECONDS); System.out.println("計算結果:"+result); } public static void useExceptionally() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (1 == 1) { throw new RuntimeException("測試一下異常狀況"); } return "s1"; }).exceptionally(e -> { System.out.println(e.toString()); return "hello world"; }).join(); System.out.println(result); } /** *方法接受一個生產者(Supplier)做爲參數,返回一個 CompletableFuture 對象。生產者方法會交由 ForkJoinPool池中的某個執行線程( Executor )運行, 可是你也可使用 supplyAsync 方法的重載版本,傳遞第二個參數指定線程池執行器執行生產者方法。 * @throws Exception */ public static void usesupplyAsync() throws Exception { //supplyAsync內部使用ForkJoinPool線程池執行任務 CompletableFuture<String> completableFuture=CompletableFuture.supplyAsync(()->{ //模擬執行耗時任務 System.out.println("task doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 return "hello xin"; }); System.out.println("計算結果:"+completableFuture.get(5, TimeUnit.SECONDS)); } public static void useAnyofAndAllof() throws Exception { CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{ //模擬執行耗時任務 System.out.println("task1 doing..."); try { Thread.sleep(10000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 System.out.println(Thread.currentThread()); return "hello"; }); CompletableFuture<String> completableFuture2=CompletableFuture.supplyAsync(()->{ //模擬執行耗時任務 System.out.println("task2 doing..."); try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 System.out.println(Thread.currentThread()); return "world"; }); /** * 該方法接收一個 CompletableFuture 對象構成的數組,返回由第一個執行完畢的 * CompletableFuture 對象的返回值構成的 CompletableFuture<Object> */ CompletableFuture<Object> anyResult=CompletableFuture.anyOf(completableFuture1,completableFuture2); System.out.println("第一個完成的任務結果:"+anyResult.get()); //allOf 工廠方法接收一個由CompletableFuture 構成的數組,數組中的全部 // Completable-Future 對象執行完成以後,它返回一個 CompletableFuture<Void> 對象。 // 這意味着,若是你須要等待多個 CompletableFuture 對象執行完畢,對 allOf 方法返回的 //CompletableFuture 執行 join 操做能夠等待CompletableFuture執行完成 CompletableFuture<Void> allResult=CompletableFuture.allOf(completableFuture1,completableFuture2); //阻塞等待全部任務執行完成 allResult.join(); System.out.println("全部任務執行完成"); } /** * //等第一個任務完成後,將任務結果傳給參數result,執行後面的任務並返回一個表明任務的completableFuture * @throws Exception */ public static void useThenCompose() throws Exception { CompletableFuture<String> completableFuture1=CompletableFuture.supplyAsync(()->{ //模擬執行耗時任務 System.out.println("task1 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 return "hello"; }).thenCompose(result->CompletableFuture.supplyAsync(()->{ //模擬執行耗時任務 System.out.println("task2 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 return result+" world"; })); System.out.println(completableFuture1.get(10,TimeUnit.SECONDS)); } /** * public <U,V> CompletionStage<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn); public <U,V> CompletionStage<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn,Executor executor); * 將第一個任務與第二個任務組合一塊兒執行,都執行完成後,將兩個任務的結果合併 * @throws Exception */ public static void useThenCombine() throws Exception { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { //模擬執行耗時任務 System.out.println("task1 doing..."); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 return 100; }).thenCombine( //第二個任務 CompletableFuture.supplyAsync(() -> { //模擬執行耗時任務 System.out.println("task2 doing..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 return 200; }), //合併函數 (result1, result2) -> result1 + result2); System.out.println(completableFuture1.get(10,TimeUnit.SECONDS)); } public static void usethenAccept() throws Exception { CompletableFuture<Integer> completableFuture1 = CompletableFuture.supplyAsync(() -> { //模擬執行耗時任務 System.out.println("task1 doing..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 return 100; }); //註冊完成事件 completableFuture1.thenAccept(result->System.out.println("task1 done,result:"+result)); CompletableFuture<Integer> completableFuture2= //第二個任務 CompletableFuture.supplyAsync(() -> { //模擬執行耗時任務 System.out.println("task2 doing..."); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } //返回結果 return 2000; }); //註冊完成事件 completableFuture2.thenAccept(result->System.out.println("task2 done,result:"+result)); //將第一個任務與第二個任務組合一塊兒執行,都執行完成後,將兩個任務的結果合併 CompletableFuture<Integer> completableFuture3 = completableFuture1.thenCombine(completableFuture2, //合併函數 (result1, result2) -> { try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return result1 + result2; }); System.out.println(completableFuture3.get()); } /** * public <U> CompletionStage<U> thenApply(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn); public <U> CompletionStage<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor); */ public static void useThenApply() { String result = CompletableFuture.supplyAsync(() -> { System.out.println(Thread.currentThread()); return "hello";} ).thenApplyAsync(s -> { System.out.println(Thread.currentThread());return s + " world";}).join(); System.out.println(result); } /** *public CompletionStage<Void> thenAccept(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action); public CompletionStage<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor); */ public static void useThenAccept(){ CompletableFuture.supplyAsync(() -> "hello").thenAccept(s -> System.out.println(s+" world")); } /** *public CompletionStage<Void> thenRun(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action); public CompletionStage<Void> thenRunAsync(Runnable action,Executor executor); */ public static void useThenRun() throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println(Thread.currentThread()); return "hello"; }).thenRun(() -> { System.out.println(" world"); System.out.println(Thread.currentThread()); }).get(); } /* public <U> CompletionStage<Void> thenAcceptBoth(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public <U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action); public public<U> CompletionStage<Void> thenAcceptBothAsync(CompletionStage<? extends U> other,BiConsumer<? super T, ? super U> action, Executor executor); */ public static void useThenAcceptBoth() throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "world"; }), (s1, s2) -> System.out.println(s1 + " " + s2)).get(); } /* public CompletionStage<Void> runAfterBoth(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterBothAsync(CompletionStage<?> other,Runnable action,Executor executor); */ public static void useRunAfterBoth() throws ExecutionException, InterruptedException { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).runAfterBothAsync(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s2"; }), () -> System.out.println("hello world")).get(); } /* public <U> CompletionStage<U> applyToEither(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn); public <U> CompletionStage<U> applyToEitherAsync(CompletionStage<? extends T> other,Function<? super T, U> fn,Executor executor); */ public static void useApplyToEither() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).applyToEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }), s -> s).join(); System.out.println(result); } /* public CompletionStage<Void> acceptEither(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action); public CompletionStage<Void> acceptEitherAsync(CompletionStage<? extends T> other,Consumer<? super T> action,Executor executor); */ public static void useAcceptEither() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).acceptEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "hello world"; }), System.out::println); while (true){} } /* public CompletionStage<Void> runAfterEither(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action); public CompletionStage<Void> runAfterEitherAsync(CompletionStage<?> other,Runnable action,Executor executor); */ public static void useRunAfterEither() { CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } return "s1"; }).runAfterEither(CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2000); } catch (InterruptedException e) { e.printStackTrace(); } return "s2"; }), () -> System.out.println("hello world")); while (true) { } } /** * public CompletionStage<T> whenComplete(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action); public CompletionStage<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action,Executor executor); 當運行完成時,對結果的記錄。這裏的完成時有兩種狀況,一種是正常執行,返回值。 另一種是遇到異常拋出形成程序的中斷。這裏爲何要說成記錄, 由於這幾個方法都會返回CompletableFuture,當Action執行完畢後它的結果返回原始的CompletableFuture的計算結果或者返回異常。 因此不會對結果產生任何的做用 */ public static void useWhenComplete() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } if (1 == 1) { throw new RuntimeException("測試一下異常狀況"); } return "s1"; }).whenComplete((s, t) -> { System.out.println(s); System.out.println(t.getMessage()); }).exceptionally(e -> { System.out.println(e.getMessage()); return "hello world"; }).join(); System.out.println(result); } /* public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn); public <U> CompletionStage<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn,Executor executor); */ public static void handle() { String result = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } //出現異常 if (1 == 1) { throw new RuntimeException("測試一下異常狀況"); } return "s1"; }).handle((s, t) -> { if (t != null) { return "hello world"; } return s; }).join(); System.out.println(result); }
寫的代碼也在https://github.com/woshiyexinjie/java-godliness框架
參考內容:異步
http://www.jianshu.com/p/6f3ee90ab7d3ide
http://www.jianshu.com/p/4897ccdcb278