前言
對於Node開發者來講,非阻塞異步編程是他們引覺得傲的地方。而在JDK8中,也引入了非阻塞異步編程的概念。所謂非阻塞異步編程,就是一種不須要等待返回結果的多線程的回調方法的封裝。使用非阻塞異步編程,能夠很大程度上解決高併發場景的各類問題,提升程序的運行效率。html
爲何要使用非阻塞異步編程
在jdk8以前,咱們使用java的多線程編程,通常是經過Runnable中的run方法進行的。這種方法有個明顯的缺點:沒有返回值。這時候,你們會使用Callable+Future的方式去實現,代碼以下。java
public static void main(String[] args) throws Exception { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<String> stringFuture = executor.submit(new Callable<String>() { @Override public String call() throws Exception { Thread.sleep(2000); return "async thread"; } }); Thread.sleep(1000); System.out.println("main thread"); System.out.println(stringFuture.get()); }
這無疑是對高併發訪問的一種緩衝方法。這種方式有一個致命的缺點就是阻塞式調用,當調用了get方法以後,會有大量的時間耗費在等待返回值之中。編程
無論怎麼看,這種作法貌似都不太穩當,至少在代碼美觀性上就看起來很蛋疼。並且某些場景沒法使用,好比:性能優化
- 多個異步線程執行時間可能不一致,咱們的主線程不能一直等着。
- 兩個異步任務之間相互獨立,可是第二個依賴第一個的執行結果
在這種場景下,CompletableFuture的優點就展示出來了 。同時,CompletableFuture的封裝中使用了函數式編程,這讓咱們的代碼顯得更加簡潔、優雅。多線程
不瞭解函數式編程的朋友,能夠參考我以前的博客。JDK8新特性併發
CompletableFuture使用詳解
runAsync和supplyAsync方法
CompletableFuture提供了四個靜態方法來建立一個異步操做。app
public static CompletableFuture<Void> runAsync(Runnable runnable) public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
沒有指定Executor的方法會使用ForkJoinPool.commonPool() 做爲它的線程池執行異步代碼。若是指定線程池,則使用指定的線程池運行。如下全部的方法都類同。dom
- runAsync方法不支持返回值。
- supplyAsync能夠支持返回值。
代碼示例異步
/** * 無返回值 * * @throws Exception */ @Test public void testRunAsync() throws Exception { CompletableFuture<Void> future = CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (Exception ignored) { } System.out.println("run end ..."); }); future.get(); } /** * 有返回值 * * @throws Exception */ @Test public void testSupplyAsync() throws Exception { CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> { System.out.println("run end..."); return System.currentTimeMillis(); }); Long time = future.get(); System.out.println(time); }
計算結果完成時的回調方法
當CompletableFuture的計算結果完成,或者拋出異常的時候,能夠執行特定的操做。async
public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)
這裏須要說的一點是,whenComplete和whenCompleteAsync的區別。
- whenComplete:使用執行當前任務的線程繼續執行whenComplete的任務。
- whenCompleteAsync:使用新的線程執行任務。
- exceptionally:執行出現異常時,走這個方法。
代碼示例
/** * 當CompletableFuture的計算結果完成,或者拋出異常的時候,能夠執行特定的Action。 * whenComplete:是執行當前任務的線程執行繼續執行 whenComplete 的任務。 * whenCompleteAsync:是執行把 whenCompleteAsync 這個任務繼續提交給線程池來進行執行。 * exceptionally:執行出現異常時,走這個方法 * * @throws Exception */ @Test public void testWhenComplete() throws Exception { CompletableFuture.runAsync(() -> { try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("運行結束"); }).whenComplete((t, action) -> { System.out.println("執行完成"); }).exceptionally(t -> { System.out.println("出現異常:" + t.getMessage()); return null; }); TimeUnit.SECONDS.sleep(2); }
thenApply
當一個線程依賴另外一個線程時,可使用thenApply方法把這兩個線程串行化,第二個任務依賴第一個任務的返回值。
代碼示例
/** * 當一個線程依賴另外一個線程時,可使用 thenApply 方法來把這兩個線程串行化。 * 第二個任務依賴第一個任務的結果 * * @throws Exception */ @Test public void testThenApply() throws Exception { CompletableFuture<Long> future = CompletableFuture.supplyAsync(() -> { long result = new Random().nextInt(); System.out.println("result:" + result); return result; }).thenApply(t -> { long result = t * 5; System.out.println("result2:" + result); return result; }); Long result = future.get(); System.out.println(result); }
handle
handle是執行任務完成時對結果的處理。與thenApply方法處理方式基本一致,
不一樣的是,handle是在任務完成後執行,無論這個任務是否出現了異常,而thenApply只能夠執行正常的任務,任務出現了異常則不執行。
代碼示例
/** * handle 是執行任務完成時對結果的處理。 * handle 方法和 thenApply 方法處理方式基本同樣。 * 不一樣的是 handle 是在任務完成後再執行,還能夠處理異常的任務。 * thenApply 只能夠執行正常的任務,任務出現異常則不執行 thenApply 方法。 * * @throws Exception */ @Test public void testHandle() throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int i = 10 / 0; return i; }).handle((p, t) -> { int result = -1; if (t == null) { result = p * 2; } else { System.out.println(t.getMessage()); } return result; }); System.out.println(future.get()); }
thenAccept
thenAccept用於接收任務的處理結果,並消費處理,無返回結果。
代碼示例
/** * 接收任務的處理結果,並消費處理,無返回結果。 * * @throws Exception */ @Test public void testThenAccept() throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return new Random().nextInt(); }).thenAccept(num -> { System.out.println(num); }); System.out.println(future.get()); }
thenRun
上個任務執行完以後再執行thenRun的任務,兩者只存在前後執行順序的關係,後者並不依賴前者的計算結果,同時,沒有返回值。
代碼示例
/** * 該方法同 thenAccept 方法相似。不一樣的是上個任務處理完成後,並不會把計算的結果傳給 thenRun 方法。 * 只是處理玩任務後,執行 thenRun 的後續操做。 * * @throws Exception */ @Test public void testThenRun() throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return new Random().nextInt(); }).thenRun(() -> { System.out.println("進入了thenRun"); }); System.out.println(future.get()); }
thenCombine
thenCombine會把兩個CompletableFuture的任務都執行完成後,把兩個任務的返回值一塊交給thenCombine處理(有返回值)。
代碼示例
/** * thenCombine 會把 兩個 CompletableFuture 的任務都執行完成後 * 把兩個任務的結果一塊交給 thenCombine 來處理。 * * @throws Exception */ @Test public void testThenCombine() throws Exception { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { return "hello"; }).thenCombine(CompletableFuture.supplyAsync(() -> { return "world"; }), (t1, t2) -> { return t1 + " " + t2; }); System.out.println(future.get()); }
thenAcceptBoth
當兩個CompletableFuture都執行完成後,把結果一塊交給thenAcceptBoth處理(無返回值)
代碼示例
/** * 當兩個 CompletableFuture 都執行完成後 * 把結果一塊交給thenAcceptBoth來進行消耗 * * @throws Exception */ @Test public void testThenAcceptBoth() throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "hello"; }).thenAcceptBoth(CompletableFuture.supplyAsync(() -> { return "world"; }), (t1, t2) -> { System.out.println(t1 + " " + t2); }); System.out.println(future.get()); }
applyToEither
兩個CompletableFuture,誰執行返回的結果快,就用哪一個的結果進行下一步操做(有返回值)。
代碼示例
/** * 兩個CompletableFuture,誰執行返回的結果快,我就用那個CompletionStage的結果進行下一步的轉化操做 * * @throws Exception */ @Test public void testApplyToEither() throws Exception { CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { try { Thread.sleep(2); } catch (InterruptedException e) { e.printStackTrace(); } return "hello"; }).applyToEither(CompletableFuture.supplyAsync(() -> { return "world"; }), (t) -> { return t; }); System.out.println(future.get()); }
acceptEither
兩個CompletableFuture,誰執行返回的結果快,就用哪一個的結果進行下一步操做(無返回值)。
代碼示例
/** * 兩個CompletableFuture,誰執行返回的結果快,我就用那個CompletionStage的結果進行下一步的消耗操做。 * * @throws Exception */ @Test public void testAcceptEither() throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "hello"; }).acceptEither(CompletableFuture.supplyAsync(() -> { return "world"; }), t1 -> { System.out.println(t1); }); System.out.println(future.get()); }
runAfterEither
兩個CompletableFuture,任何一個完成了都會執行下一步操做
代碼示例
/** * 兩個CompletableFuture,任何一個完成了都會執行下一步的操做 * * @throws Exception */ @Test public void testRunAfterEither() throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "hello"; }).runAfterEither(CompletableFuture.supplyAsync(() -> { return "world"; }), () -> { System.out.println("執行完了"); }); System.out.println(future.get()); }
runAfterBoth
兩個CompletableFuture,都完成了纔會執行下一步操做。
代碼示例
/** * 兩個CompletableFuture,都完成了計算纔會執行下一步的操做 * * @throws Exception */ @Test public void testRunAfterBoth() throws Exception { CompletableFuture<Void> future = CompletableFuture.supplyAsync(() -> { return "hello"; }).runAfterBoth(CompletableFuture.supplyAsync(() -> { return "world"; }), () -> { System.out.println("執行完了"); }); System.out.println(future.get()); }
thenCompose
thenCompose方法容許對兩個CompletableFuture進行流水線操做,當第一個操做完成時,將其結果做爲參數傳遞給第二個操做。
代碼示例
/** * thenCompose 方法容許你對兩個 CompletableFuture 進行流水線操做, * 第一個操做完成時,將其結果做爲參數傳遞給第二個操做。 * @throws Exception */ @Test public void testThenCompose() throws Exception { CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> { int t = new Random().nextInt(); System.out.println(t); return t; }).thenCompose(param -> { return CompletableFuture.supplyAsync(() -> { int t = param * 2; System.out.println("t2=" + t); return t; }); }); System.out.println(future.get()); }
結語
CompletableFuture是jdk8中新增的一個特性,特色是非阻塞異步編程。合理的使用非阻塞異步編程,好比將兩步關聯不大的操做並行處理,能夠優化代碼的執行效率。同時,在高併發場景下,CompletableFuture也能夠進行有效的性能優化。
若是你以爲該博客對你有幫助,不放動動手指加一下交流羣。