Future模式之CompletableFuture

CompletableFuture 是Java 8 新增長的Api,該類實現,Future和CompletionStage兩個接口,提供了很是強大的Future的擴展功能,能夠幫助咱們簡化異步編程的複雜性,提供了函數式編程的能力,能夠經過回調的方式處理計算結果,而且提供了轉換和組合CompletableFuture的方法。java

1、主動完成計算

  • public T get()編程

    該方法爲阻塞方法,會等待計算結果完成app

  • public T get(long timeout,TimeUnit unit)dom

    有時間限制的阻塞方法異步

  • public T getNow(T valueIfAbsent)函數式編程

    當即獲取方法結果,若是沒有計算結束則返回傳的值異步編程

  • public T join()函數

    和 get() 方法相似也是主動阻塞線程,等待計算結果。和get() 方法有細微的差異線程

public class ThreadUtil {
    public static void sleep(long ms) {
        try {
            Thread.sleep(ms);
        } catch (InterruptedException e) {
            e.printStackTrace();
            throw new RuntimeException(e.getMessage());
        }
    }
}
public static void main(String[] args) {

     CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.sleep(200);
            return 10 / 0;
    });
//       System.out.println(future.join());
//       System.out.println(future.get());
    System.out.println(future.getNow(10));
}
  • public boolean complete(T value)code

    當即完成計算,並把結果設置爲傳的值,返回是否設置成功

    若是 CompletableFuture 沒有關聯任何的Callback、異步任務等,若是調用get方法,那會一直阻塞下去,可使用complete方法主動完成計算

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> future = new CompletableFuture<>();
//        future.get();
    future.complete(10);
}
  • public boolean completeExceptionally(Throwable ex) 當即完成計算,並拋出異常

2、執行異步任務

建立一個異步任務

  • public static <U> CompletableFuture<U> completedFuture(U value)

    建立一個有初始值的CompletableFuture

  • public static CompletableFuture<Void> runAsync(Runnable runnable)

  • public static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor)

  • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)

  • public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor)

    以上四個方法中,以 Async 結尾而且沒有 Executor 參數的,會默認使用 ForkJoinPool.commonPool() 做爲它的線程池執行異步代碼。 以run開頭的,由於以 Runable 類型爲參數因此沒有返回值。示例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> System.out.println("runAsync"));
        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> "supplyAsync");
        
        System.out.println(future1.get());
        System.out.println(future2.get());
    }

結果:

runAsync
null
supplyAsync

3、計算完成時對結果的處理 whenComplete/exceptionally/handle

當CompletableFuture的計算結果完成,或者拋出異常的時候,咱們能夠執行特定的Action。主要是下面的方法:

  • public CompletableFuture<T> whenComplete(BiConsumer<? super T,? super Throwable> action)
  • public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action)
  • public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T,? super Throwable> action, Executor executor)

參數類型爲 BiConsumer<? super T, ? super Throwable> 會獲取上一步計算的計算結果和異常信息。以Async結尾的方法可能會使用其它的線程去執行,若是使用相同的線程池,也可能會被同一個線程選中執行,如下皆相同。

public static void main(String[] args) throws Exception {
    CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
        ThreadUtil.sleep(100);
        return 20;
    }).whenCompleteAsync((v, e) -> {
        System.out.println(v);
        System.out.println(e);
    });
    System.out.println(future.get());
}
  • public CompletableFuture<T> exceptionally(Function<Throwable,? extends T> fn)

    該方法是對異常狀況的處理,當函數異常時應該的返回值

public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future = CompletableFuture.supplyAsync(() -> {
            ThreadUtil.sleep(100);
            return 10 / 0;
        }).whenCompleteAsync((v, e) -> {
            System.out.println(v);
            System.out.println(e);
        }).exceptionally((e) -> {
            System.out.println(e.getMessage());
            return 30;
        });
        System.out.println(future.get());
    }

  • public <U>CompletableFuture<U> handle(BiFunction<? super T,Throwable,? extends U> fn)
  • public <U>CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn)
  • public <U> CompletableFuture<U> handleAsync(BiFunction<? super T,Throwable,? extends U> fn, Executor executor)

handle 方法和whenComplete方法相似,只不過接收的是一個 BiFunction<? super T,Throwable,? extends U> fn 類型的參數,所以有 whenComplete 方法和 轉換的功能 (thenApply)

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> 10 / 0)
                .handle((t, e) -> {
                    System.out.println(e.getMessage());
                    return 10;
                });

        System.out.println(future.get());
    }

4、結果處理轉換 thenApply

CompletableFuture 因爲有回調,能夠沒必要等待一個計算完成而阻塞着調用縣城,能夠在一個結果計算完成以後緊接着執行某個Action。咱們能夠將這些操做串聯起來。

  • public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
  • public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
  • public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> 1)
                .thenApply((a) -> {
                    System.out.println(a);//1
                    return a * 10;
                }).thenApply((a) -> {
                    System.out.println(a);//10
                    return a + 10;
                }).thenApply((a) -> {
                    System.out.println(a);//20
                    return a - 5;
                });
        System.out.println(future.get());//15
    }

這些方法不是立刻執行的,也不會阻塞,而是前一個執行完成後繼續執行下一個。

和 handle 方法的區別是,handle 會處理正常計算值和異常,不會拋出異常。而 thenApply 只會處理正常計算值,有異常則拋出。

5、純消費 thenAccept/thenAcceptBoth/thenRun

單純的去消費結果而不會返回新的值,因些計算結果爲 Void;

  • public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
  • public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
  • public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor)
public static void main(String[] args) throws ExecutionException, InterruptedException {
        
        CompletableFuture<Void> future = CompletableFuture
                .supplyAsync(() -> 1)
                .thenAccept(System.out::println) //消費 上一級返回值 1
                .thenAcceptAsync(System.out::println); //上一級沒有返回值 輸出null
                
        System.out.println(future.get()); //消費函數沒有返回值 輸出null
    }

  • public <U> CompletableFuture<Void> thenAcceptBoth(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
  • public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action)
  • public <U> CompletableFuture<Void> thenAcceptBothAsync(CompletionStage<? extends U> other, BiConsumer<? super T,? super U> action, Executor executor)

和 thenAccept 相比,參數類型多了一個 CompletionStage<? extends U> other,以上方法會接收上一個CompletionStage返回值,和當前的一個。

public static void main(String[] args) throws ExecutionException, InterruptedException {
        CompletableFuture
                .supplyAsync(() -> 1)
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 2), (a, b) -> {
                    System.out.println(a);
                    System.out.println(b);
                }).get();
    }
  • public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action)

    runAfterBoth 和以上方法不一樣,傳一個 Runnable 類型的參數,不接收上一級的返回值


更完全的:

  • public CompletableFuture<Void> thenRun(Runnable action)
  • public CompletableFuture<Void> thenRunAsync(Runnable action)
  • public CompletableFuture<Void> thenRunAsync(Runnable action, Executor executor)

以上是完全的純消費,徹底忽略計算結果

6、組合 thenCompose/thenCombine

  • public <U> CompletableFuture<U> thenCompose(Function<? super T,? extends CompletionStage<U>> fn)
  • public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn)
  • public <U> CompletableFuture<U> thenComposeAsync(Function<? super T,? extends CompletionStage<U>> fn, Executor executor)

以上接收類型爲 Function<? super T,? extends CompletionStage<U>> fn ,fn 接收上一級返回的結果,並返回一個新的 CompletableFuture

public static void main(String[] args) throws ExecutionException, InterruptedException {

        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> 1)
                .thenApply((a) -> {
                    ThreadUtil.sleep(1000);
                    return a + 10;
                })
                .thenCompose((s) -> {
                    System.out.println(s); //11
                    return CompletableFuture.supplyAsync(() -> s * 5);
                });

        System.out.println(future.get());//55
    }

  • public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  • public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
  • public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn, Executor executor)

兩個CompletionStage是並行執行的,它們之間並無前後依賴順序,other並不會等待先前的CompletableFuture執行完畢後再執行。

public static void main(String[] args) throws ExecutionException, InterruptedException {

        Random random = new Random();
        CompletableFuture<Integer> future = CompletableFuture
                .supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println("supplyAsync");
                    return 2;
                }).thenApply((a) -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println("thenApply");
                    return a * 3;
                })
                .thenCombine(CompletableFuture.supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println("thenCombineAsync");
                    return 10;
                }), (a, b) -> {
                    System.out.println(a);
                    System.out.println(b);
                    return a + b;
                });

        System.out.println(future.get());
    }

thenCombine 和 supplyAsync 不必定哪一個先哪一個後,是並行執行的。

7、acceptEither / applyToEither

  • public CompletableFuture<Void> acceptEither(CompletionStage<? extends T> other, Consumer<? super T> action)
  • public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action)
  • public CompletableFuture<Void> acceptEitherAsync(CompletionStage<? extends T> other, Consumer<? super T> action, Executor executor)

先看示例:

public static void main(String[] args) throws ExecutionException, InterruptedException {
        Random random = new Random();
        CompletableFuture
                .supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    return "A";
                })
                .acceptEither(CompletableFuture.supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    return "B";
                }), System.out::println)
                .get();
    }

以上代碼有時輸出A,有時輸出B,哪一個Future先執行完就會根據它的結果計算。

acceptEither方法是當任意一個 CompletionStage 完成的時候,action 這個消費者就會被執行。這個方法返回 CompletableFuture<Void>


  • public <U> CompletableFuture<U> applyToEither(CompletionStage<? extends T> other, Function<? super T,U> fn)
  • public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn)
  • public <U> CompletableFuture<U> applyToEitherAsync(CompletionStage<? extends T> other, Function<? super T,U> fn, Executor executor)

applyToEither 方法是當任意一個 CompletionStage 完成的時候,fn會被執行,它的返回值會看成新的CompletableFuture<U>的計算結果。

acceptEither 沒有返回值,applyToEither 有返回值

8、allOf / anyOf

  • public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)

這個方法的意思是把有方法都執行完才往下執行,沒有返回值

public static void main(String[] args) throws ExecutionException, InterruptedException {

        Random random = new Random();
        CompletableFuture.allOf(
                CompletableFuture.runAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println(1);
                }),
                CompletableFuture.runAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    System.out.println(2);
                }))
                .get();

    }

有時輸出1 2 有時輸出 2 1


  • public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs)

任務一個方法執行完都往下執行,返回一個Object類型的值

public static void main(String[] args) throws ExecutionException, InterruptedException {
        Random random = new Random();

        Object obj = CompletableFuture.anyOf(
                CompletableFuture.supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    return 1;
                }),
                CompletableFuture.supplyAsync(() -> {
                    ThreadUtil.sleep(random.nextInt(1000));
                    return 2;
                })).get();

        System.out.println(obj);
    }

輸出結果有時爲1 有時間爲 2

相關文章
相關標籤/搜索