上一篇文章,講述了Future模式的機制、缺點,CompletableFuture產生的由來、靜態工廠方法、complete()方法等等。java
本文將繼續整理CompletableFuture的特性。app
咱們能夠經過CompletableFuture來異步獲取一組數據,並對數據進行一些轉換,相似RxJava、Scala的map、flatMap操做。異步
方法名 | 描述 |
---|---|
thenApply(Function<? super T,? extends U> fn) | 接受一個Function<? super T,? extends U>參數用來轉換CompletableFuture |
thenApplyAsync(Function<? super T,? extends U> fn) | 接受一個Function<? super T,? extends U>參數用來轉換CompletableFuture,使用ForkJoinPool |
thenApplyAsync(Function<? super T,? extends U> fn, Executor executor) | 接受一個Function<? super T,? extends U>參數用來轉換CompletableFuture,使用指定的線程池 |
thenApply的功能至關於將CompletableFuture<T>轉換成CompletableFuture<U>。ide
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello");
future = future.thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
return s + " World";
}
}).thenApply(new Function<String, String>() {
@Override
public String apply(String s) {
return s.toUpperCase();
}
});
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}複製代碼
再用lambda表達式簡化一下函數
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s -> s + " World").thenApply(String::toUpperCase);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}複製代碼
執行結果:post
HELLO WORLD複製代碼
下面的例子,展現了數據流的類型經歷了以下的轉換:String -> Integer -> Double。ui
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "10")
.thenApply(Integer::parseInt)
.thenApply(i->i*10.0);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}複製代碼
執行結果:this
100.0複製代碼
方法名 | 描述 |
---|---|
thenCompose(Function<? super T, ? extends CompletionStage<U>> fn) | 在異步操做完成的時候對異步操做的結果進行一些操做,而且仍然返回CompletableFuture類型。 |
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn) | 在異步操做完成的時候對異步操做的結果進行一些操做,而且仍然返回CompletableFuture類型。使用ForkJoinPool。 |
thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor) | 在異步操做完成的時候對異步操做的結果進行一些操做,而且仍然返回CompletableFuture類型。使用指定的線程池。 |
thenCompose能夠用於組合多個CompletableFuture,將前一個結果做爲下一個計算的參數,它們之間存在着前後順序。lua
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " World"));
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}複製代碼
執行結果:spa
Hello World複製代碼
下面的例子展現了屢次調用thenCompose()
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + "100"))
.thenCompose(s -> CompletableFuture.supplyAsync(() -> Double.parseDouble(s)));
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}複製代碼
執行結果:
100100.0複製代碼
方法名 | 描述 |
---|---|
thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 當兩個CompletableFuture都正常完成後,執行提供的fn,用它來組合另一個CompletableFuture的結果。 |
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) | 當兩個CompletableFuture都正常完成後,執行提供的fn,用它來組合另一個CompletableFuture的結果。使用ForkJoinPool。 |
thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor) | 當兩個CompletableFuture都正常完成後,執行提供的fn,用它來組合另一個CompletableFuture的結果。使用指定的線程池。 |
如今有CompletableFuture<T>、CompletableFuture<U>和一個函數(T,U)->V,thenCompose就是將CompletableFuture<T>和CompletableFuture<U>變爲CompletableFuture<V>。
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Double> future = future1.thenCombine(future2, (s, i) -> Double.parseDouble(s + i));
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}複製代碼
執行結果:
100100.0複製代碼
使用thenCombine()以後future一、future2之間是並行執行的,最後再將結果彙總。這一點跟thenCompose()不一樣。
thenAcceptBoth跟thenCombine相似,可是返回CompletableFuture類型。
方法名 | 描述 |
---|---|
thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) | 當兩個CompletableFuture都正常完成後,執行提供的action,用它來組合另一個CompletableFuture的結果。 |
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action) | 當兩個CompletableFuture都正常完成後,執行提供的action,用它來組合另一個CompletableFuture的結果。使用ForkJoinPool。 |
thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor) | 當兩個CompletableFuture都正常完成後,執行提供的action,用它來組合另一個CompletableFuture的結果。使用指定的線程池。 |
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> "100");
CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> 100);
CompletableFuture<Void> future = future1.thenAcceptBoth(future2, (s, i) -> System.out.println(Double.parseDouble(s + i)));
try {
future.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}複製代碼
執行結果:
100100.0複製代碼
當CompletableFuture完成計算結果後,咱們可能須要對結果進行一些處理。
###3.5.1 執行特定的Action
方法名 | 描述 |
---|---|
whenComplete(BiConsumer<? super T,? super Throwable> action) | 當CompletableFuture完成計算結果時對結果進行處理,或者當CompletableFuture產生異常的時候對異常進行處理。 |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action) | 當CompletableFuture完成計算結果時對結果進行處理,或者當CompletableFuture產生異常的時候對異常進行處理。使用ForkJoinPool。 |
whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor) | 當CompletableFuture完成計算結果時對結果進行處理,或者當CompletableFuture產生異常的時候對異常進行處理。使用指定的線程池。 |
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s->s+" World")
.thenApply(s->s+ "\nThis is CompletableFuture demo")
.thenApply(String::toLowerCase)
.whenComplete((result, throwable) -> System.out.println(result));複製代碼
執行結果:
hello world
this is completablefuture demo複製代碼
###3.5.2 執行完Action能夠作轉換
方法名 | 描述 |
---|---|
handle(BiFunction<? super T, Throwable, ? extends U> fn) | 當CompletableFuture完成計算結果或者拋出異常的時候,執行提供的fn |
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn) | 當CompletableFuture完成計算結果或者拋出異常的時候,執行提供的fn,使用ForkJoinPool。 |
handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor) | 當CompletableFuture完成計算結果或者拋出異常的時候,執行提供的fn,使用指定的線程池。 |
CompletableFuture<Double> future = CompletableFuture.supplyAsync(() -> "100")
.thenApply(s->s+"100")
.handle((s, t) -> s != null ? Double.parseDouble(s) : 0);
try {
System.out.println(future.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}複製代碼
執行結果:
100100.0複製代碼
在這裏,handle()的參數是BiFunction,apply()方法返回R,至關於轉換的操做。
@FunctionalInterface
public interface BiFunction<T, U, R> {
/** * Applies this function to the given arguments. * * @param t the first function argument * @param u the second function argument * @return the function result */
R apply(T t, U u);
/** * Returns a composed function that first applies this function to * its input, and then applies the {@code after} function to the result. * If evaluation of either function throws an exception, it is relayed to * the caller of the composed function. * * @param <V> the type of output of the {@code after} function, and of the * composed function * @param after the function to apply after this function is applied * @return a composed function that first applies this function and then * applies the {@code after} function * @throws NullPointerException if after is null */
default <V> BiFunction<T, U, V> andThen(Function<? super R, ? extends V> after) {
Objects.requireNonNull(after);
return (T t, U u) -> after.apply(apply(t, u));
}
}複製代碼
而whenComplete()的參數是BiConsumer,accept()方法返回void。
@FunctionalInterface
public interface BiConsumer<T, U> {
/** * Performs this operation on the given arguments. * * @param t the first input argument * @param u the second input argument */
void accept(T t, U u);
/** * Returns a composed {@code BiConsumer} that performs, in sequence, this * operation followed by the {@code after} operation. If performing either * operation throws an exception, it is relayed to the caller of the * composed operation. If performing this operation throws an exception, * the {@code after} operation will not be performed. * * @param after the operation to perform after this operation * @return a composed {@code BiConsumer} that performs in sequence this * operation followed by the {@code after} operation * @throws NullPointerException if {@code after} is null */
default BiConsumer<T, U> andThen(BiConsumer<? super T, ? super U> after) {
Objects.requireNonNull(after);
return (l, r) -> {
accept(l, r);
after.accept(l, r);
};
}
}複製代碼
因此,handle()至關於whenComplete()+轉換。
###3.5.3 純消費(執行Action)
方法名 | 描述 |
---|---|
thenAccept(Consumer<? super T> action) | 當CompletableFuture完成計算結果,只對結果執行Action,而不返回新的計算值 |
thenAcceptAsync(Consumer<? super T> action) | 當CompletableFuture完成計算結果,只對結果執行Action,而不返回新的計算值,使用ForkJoinPool。 |
thenAcceptAsync(Consumer<? super T> action, Executor executor) | 當CompletableFuture完成計算結果,只對結果執行Action,而不返回新的計算值 |
thenAccept()是隻會對計算結果進行消費而不會返回任何結果的方法。
CompletableFuture.supplyAsync(() -> "Hello")
.thenApply(s->s+" World")
.thenApply(s->s+ "\nThis is CompletableFuture demo")
.thenApply(String::toLowerCase)
.thenAccept(System.out::print);複製代碼
執行結果:
hello world
this is completablefuture demo複製代碼