CompletableFuture 是Java 8 新增長的Api,該類實現,Future和CompletionStage兩個接口,提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力,能夠經過回調的方式處理計算結果,而且提供了轉換和組合CompletableFuture的方法。java
public T get()編程
該方法爲阻塞方法,會等待計算結果完成app
public T get(long timeout,TimeUnit unit)dom
有時間限制的阻塞方法異步
public T getNow(T valueIfAbsent)函數式編程
當即獲取方法結果,若是沒有計算結束則返回傳的值異步編程
public T join()函數
和 get() 方法相似也是主動阻塞線程,等待計算結果。和get() 方法有細微的差異spa
public class ThreadUtil {
public static void sleep(long ms) {
try {
Thread.sleep(ms);
} catch (InterruptedException e) {
e.printStackTrace();
throw new RuntimeException(e.getMessage());
}
}
}
複製代碼
public static void main(String[] args) {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(200);
return 10 / 0;
});
// System.out.println(future.join());
// System.out.println(future.get());
System.out.println(future.getNow(10));
}
複製代碼
public boolean complete(T value)線程
當即完成計算,並把結果設置爲傳的值,返回是否設置成功
若是 CompletableFuture 沒有關聯任何的Callback、異步任務等,若是調用get方法,那會一直阻塞下去,可使用complete方法主動完成計算
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = new CompletableFuture<>();
// future.get();
future.complete(10);
}
複製代碼
建立一個異步任務
public static <U> CompletableFuture<U> completedFuture(U value)
建立一個有初始值的CompletableFuture
public static CompletableFuture runAsync(Runnable runnable)
public static CompletableFuture runAsync(Runnable runnable, Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)
以上四個方法中,以 Async 結尾而且沒有 Executor 參數的,會默認使用 ForkJoinPool.commonPool() 做爲它的線程池執行異步代碼。 以run開頭的,由於以 Runable 類型爲參數因此沒有返回值。示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("runAsync"));
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "supplyAsync");
System.out.println(future1.get());
System.out.println(future2.get());
}
複製代碼
結果:
runAsync
null
supplyAsync
複製代碼
當CompletableFuture的計算結果完成,或者拋出異常的時候,咱們能夠執行特定的Action。主要是下面的方法:
參數類型爲 BiConsumer<? super T, ? super Throwable> 會獲取上一步計算的計算結果和異常信息。以Async結尾的方法可能會使用其它的線程去執行,若是使用相同的線程池,也可能會被同一個線程選中執行,如下皆相同。
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(100);
return 20;
}).whenCompleteAsync((v, e) -> {
System.out.println(v);
System.out.println(e);
});
System.out.println(future.get());
}
複製代碼
public CompletableFuture exceptionally(Function<Throwable,? extends T> fn)
該方法是對異常狀況的處理,當函數異常時應該的返回值
public static void main(String[] args) throws Exception {
CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(100);
return 10 / 0;
}).whenCompleteAsync((v, e) -> {
System.out.println(v);
System.out.println(e);
}).exceptionally((e) -> {
System.out.println(e.getMessage());
return 30;
});
System.out.println(future.get());
}
複製代碼
handle 方法和whenComplete方法相似,只不過接收的是一個 BiFunction<? super T,Throwable,? extends U> fn 類型的參數,所以有 whenComplete 方法和 轉換的功能 (thenApply)
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> 10 / 0)
.handle((t, e) -> {
System.out.println(e.getMessage());
return 10;
});
System.out.println(future.get());
}
複製代碼
CompletableFuture 因爲有回調,能夠沒必要等待一個計算完成而阻塞着調用縣城,能夠在一個結果計算完成以後緊接着執行某個Action。咱們能夠將這些操做串聯起來。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> 1)
.thenApply((a) -> {
System.out.println(a);//1
return a * 10;
}).thenApply((a) -> {
System.out.println(a);//10
return a + 10;
}).thenApply((a) -> {
System.out.println(a);//20
return a - 5;
});
System.out.println(future.get());//15
}
複製代碼
這些方法不是立刻執行的,也不會阻塞,而是前一個執行完成後繼續執行下一個。
和 handle 方法的區別是,handle 會處理正常計算值和異常,不會拋出異常。而 thenApply 只會處理正常計算值,有異常則拋出。
單純的去消費結果而不會返回新的值,因些計算結果爲 Void;
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Void> future = CompletableFuture
.supplyAsync(() -> 1)
.thenAccept(System.out::println) //消費 上一級返回值 1
.thenAcceptAsync(System.out::println); //上一級沒有返回值 輸出null
System.out.println(future.get()); //消費函數沒有返回值 輸出null
}
複製代碼
和 thenAccept 相比,參數類型多了一個 CompletionStage<? extends U> other,以上方法會接收上一個CompletionStage返回值,和當前的一個。
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture
.supplyAsync(() -> 1)
.thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2), (a, b) -> {
System.out.println(a);
System.out.println(b);
}).get();
}
複製代碼
public CompletableFuture runAfterBoth(CompletionStage<?> other, Runnable action)
runAfterBoth 和以上方法不一樣,傳一個 Runnable 類型的參數,不接收上一級的返回值
更完全的:
以上是完全的純消費,徹底忽略計算結果
以上接收類型爲 Function<? super T,? extends CompletionStage<U>> fn ,fn 接收上一級返回的結果,並返回一個新的 CompletableFuture
public static void main(String[] args) throws ExecutionException, InterruptedException {
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> 1)
.thenApply((a) -> {
ThreadUtil.sleep(1000);
return a + 10;
})
.thenCompose((s) -> {
System.out.println(s); //11
return CompletableFuture.supplyAsync(() -> s * 5);
});
System.out.println(future.get());//55
}
複製代碼
兩個CompletionStage是並行執行的,它們之間並無前後依賴順序,other並不會等待先前的CompletableFuture執行完畢後再執行。
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture<Integer> future = CompletableFuture
.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("supplyAsync");
return 2;
}).thenApply((a) -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("thenApply");
return a * 3;
})
.thenCombine(CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println("thenCombineAsync");
return 10;
}), (a, b) -> {
System.out.println(a);
System.out.println(b);
return a + b;
});
System.out.println(future.get());
}
複製代碼
thenCombine 和 supplyAsync 不必定哪一個先哪一個後,是並行執行的。
先看示例:
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture
.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return "A";
})
.acceptEither(CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return "B";
}), System.out::println)
.get();
}
複製代碼
以上代碼有時輸出A,有時輸出B,哪一個Future先執行完就會根據它的結果計算。
acceptEither方法是當任意一個 CompletionStage 完成的時候,action 這個消費者就會被執行。這個方法返回 CompletableFuture
applyToEither 方法是當任意一個 CompletionStage 完成的時候,fn會被執行,它的返回值會看成新的CompletableFuture<U>的計算結果。
acceptEither 沒有返回值,applyToEither 有返回值
這個方法的意思是把有方法都執行完才往下執行,沒有返回值
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
CompletableFuture.allOf(
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println(1);
}),
CompletableFuture.runAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
System.out.println(2);
}))
.get();
}
複製代碼
有時輸出1 2 有時輸出 2 1
任務一個方法執行完都往下執行,返回一個Object類型的值
public static void main(String[] args) throws ExecutionException, InterruptedException {
Random random = new Random();
Object obj = CompletableFuture.anyOf(
CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return 1;
}),
CompletableFuture.supplyAsync(() -> {
ThreadUtil.sleep(random.nextInt(1000));
return 2;
})).get();
System.out.println(obj);
}
複製代碼
輸出結果有時爲1 有時間爲 2