Java異步編程-CompletableFuture

引言

最近工做中須要用到異步編程實現一個多數據集併發異步查詢的功能,之前也嘗試使用過CompletableFuture,但沒有深刻原理探究,也沒有概括總結,今天正好有時間,集中學習下。編程

本文結構爲:痛點->解決方案->應用場景->流程示例-->概括總結併發

異步編程

在Java8之前,異步編程的實現大概是下邊這樣子的:app

class App {

        ExecutorService executor = ...
        ArchiveSearcher searcher = ...

        void showSearch(final String target) throws InterruptedException {
            Future<String> future = executor.submit(new Callable<String>() {
                public String call() {
                    return searcher.search(target);
                }
            });
            displayOtherThings(); // do other things while searching
            try {
                displayText(future.get()); // use future
            } catch (ExecutionException ex) {
                cleanup();
                return;
            }
        }
    }

future.get()方法會阻塞直到計算完成,而後返回結果。
這樣一來,若是咱們須要在計算完成後執行後續操做,則只有兩個選擇:異步

  1. 阻塞,等待執行完成。但這樣一來異步就變成同步。
  2. 輪詢,直到isDone()返回true。但這樣會極大消耗計算資源。

爲了解決這個痛點,Java8新增了CompletableFuture類,顧名思義這是一個"可完成"的Futureasync

CompletableFuture

CompletableFuture<T>實現了Future<T>, CompletionStage<T>,這樣保證了咱們能夠繼續使用Future的方法,同時複用了CompletionStage的各類功能。異步編程

下面就從一些常見的應用場景入手,逐個分析最適用的解決方案。固然有時對於部分場景,會有多個可選方案,這時候就須要從擴展性、可維護性、性能以及易讀性等方面綜合考慮,作出選擇。oop

建立CompletableFuture

  • 若是不但願取得計算結果,則可使用Runnable對象,並調用runAsync方法:
public static CompletableFuture<Void> runAsync(Runnable runnable) {
        return asyncRunStage(asyncPool, runnable);
    }
    
    static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        e.execute(new AsyncRun(d, f));
        return d;
    }

runAsync方法將返回一個CompletableFuture<Void>對象,CompletableFuture不包含任何類型的返回結果。性能

  • 若是但願取得計算結果,則可使用Supplier<U>對象,並調用supplyAsync方法:
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
        return asyncSupplyStage(asyncPool, supplier);
    }
    
    static <U> CompletableFuture<U> asyncSupplyStage(Executor e,
                                                     Supplier<U> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<U> d = new CompletableFuture<U>();
        e.execute(new AsyncSupply<U>(d, f));
        return d;
    }

supplyAsync方法將返回一個CompletableFuture<U>對象,該CompletableFuture會包含一個由泛型U指定類型的計算結果。學習

單CompletableFuture後置處理

  • 若是不但願接收前置CompletableFuture結果,且不返回處理結果,那麼可使用Runnable對象,並調用thenRun方法:
public CompletableFuture<Void> thenRun(Runnable action) {
        return uniRunStage(null, action);
    }
    
    public CompletableFuture<Void> thenRunAsync(Runnable action) {
        return uniRunStage(asyncPool, action);
    }
    
    private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.uniRun(this, f, null)) {
            UniRun<T> c = new UniRun<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenRun方法將會返回一個不包含返回值的CompletableFuture
thenRun方法和thenRunAsync方法的區別是,thenRun會在前置CompletableFuture的線程內執行,而thenRunAsync會在一個新線程中執行。測試

  • 若是但願接收前置CompletableFuture結果,但不返回處理結果,則可使用Consumer<? super T>對象,並調用thenAccept方法:
public CompletableFuture<Void> thenAccept(Consumer<? super T> action) {
        return uniAcceptStage(null, action);
    }

    public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) {
        return uniAcceptStage(asyncPool, action);
    }
    private CompletableFuture<Void> uniAcceptStage(Executor e,
                                                   Consumer<? super T> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.uniAccept(this, f, null)) {
            UniAccept<T> c = new UniAccept<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenAccept方法將會返回一個不包含返回值的CompletableFuture
thenAccept方法和thenAcceptAsync方法的區別同thenRun方法和thenRunAsync方法。

  • 若是但願接收前置CompletableFuture結果,並返回處理結果,則只能使用Function<? super T,? extends U>對象,並調用thenApply方法:
public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(null, fn);
    }

    public <U> CompletableFuture<U> thenApplyAsync(
        Function<? super T,? extends U> fn) {
        return uniApplyStage(asyncPool, fn);
    }
    
    private <V> CompletableFuture<V> uniApplyStage(
        Executor e, Function<? super T,? extends V> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<V> d =  new CompletableFuture<V>();
        if (e != null || !d.uniApply(this, f, null)) {
            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenApply方法將會返回一個包含U類型返回值的CompletableFuture
thenApply方法和thenApplyAsync方法的區別同thenRun方法和thenRunAsync方法。

  • 若是但願接收前置CompletableFuture結果,並返回包含CompletableFuture的處理結果,則可使用Function<? super T, ? extends CompletionStage<U>>對象,並調用thenCompose方法:
public <U> CompletableFuture<U> thenCompose(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(null, fn);
    }

    public <U> CompletableFuture<U> thenComposeAsync(
        Function<? super T, ? extends CompletionStage<U>> fn) {
        return uniComposeStage(asyncPool, fn);
    }
    
    private <V> CompletableFuture<V> uniComposeStage(
        Executor e, Function<? super T, ? extends CompletionStage<V>> f) {
        ...
        Object r; Throwable x;
        if (e == null && (r = result) != null) {
            // try to return function result directly
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    return new CompletableFuture<V>(encodeThrowable(x, r));
                }
                r = null;
            }
            try {
                @SuppressWarnings("unchecked") T t = (T) r;
                CompletableFuture<V> g = f.apply(t).toCompletableFuture();
                Object s = g.result;
                if (s != null)
                    return new CompletableFuture<V>(encodeRelay(s));
                CompletableFuture<V> d = new CompletableFuture<V>();
                UniRelay<V> copy = new UniRelay<V>(d, g);
                g.push(copy);
                copy.tryFire(SYNC);
                return d;
            } catch (Throwable ex) {
                return new CompletableFuture<V>(encodeThrowable(ex));
            }
        }
        CompletableFuture<V> d = new CompletableFuture<V>();
        UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f);
        push(c);
        c.tryFire(SYNC);
        return d;
    }

thenCombine方法將會返回一個包含CompletableFuture類型返回值的CompletableFuture
thenCompose方法和thenComposeAsync方法的區別同thenRun方法和thenRunAsync方法。

雙CompletableFuture聯合處理

  • 若是但願當前置CompletableFuture和指定的CompletableFuture所有完成後觸發,但不但願接收前邊兩個CompletableFuture的輸出結果TU,且處理完成後不返回結果,則可使用Runnable,並調用runAfterBoth方法
public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other,
                                                Runnable action) {
        return biRunStage(null, other, action);
    }

    public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other,
                                                     Runnable action) {
        return biRunStage(asyncPool, other, action);
    }

    private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o,
                                               Runnable f) {
        CompletableFuture<?> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.biRun(this, b, f, null)) {
            BiRun<T,?> c = new BiRun<>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

runAfterBoth方法會返回一個不包含返回值的CompletableFuture
runAfterBoth方法和runAfterBothAsync方法的區別同thenRun方法和thenRunAsync方法。

  • 若是但願當前置CompletableFuture和指定的CompletableFuture所有完成後觸發,並接收前邊兩個CompletableFuture的輸出結果TU,但處理完成後不返回結果,則可使用BiConsumer<? super T, ? super U>,並調用thenAcceptBoth方法:
public <U> CompletableFuture<Void> thenAcceptBoth(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(null, other, action);
    }

    public <U> CompletableFuture<Void> thenAcceptBothAsync(
        CompletionStage<? extends U> other,
        BiConsumer<? super T, ? super U> action) {
        return biAcceptStage(asyncPool, other, action);
    }

    private <U> CompletableFuture<Void> biAcceptStage(
        Executor e, CompletionStage<U> o,
        BiConsumer<? super T,? super U> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.biAccept(this, b, f, null)) {
            BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenAcceptBoth方法會返回一個不包含返回值的CompletableFuture
thenAcceptBoth方法和thenAcceptBothAsync方法的區別同thenRun方法和thenRunAsync方法。

  • 若是但願當前置CompletableFuture和指定的CompletableFuture所有完成後觸發,並接收前邊兩個CompletableFuture的輸出結果TU,且處理完成後返回V做爲處理結果,則可使用BiFunction<? super T,? super U,? extends V>,並調用thenCombine方法:
public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(null, other, fn);
    }

    public <U,V> CompletableFuture<V> thenCombineAsync(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
        return biApplyStage(asyncPool, other, fn);
    }

    private <U,V> CompletableFuture<V> biApplyStage(
        Executor e, CompletionStage<U> o,
        BiFunction<? super T,? super U,? extends V> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
        if (e != null || !d.biApply(this, b, f, null)) {
            BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);
            bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

thenCombine方法會返回一個包含V類型返回值的CompletableFuture
thenCombine方法和thenCombineAsync方法的區別同thenRun方法和thenRunAsync方法。

  • 若是但願當前置CompletableFuture和指定的CompletableFuture任意一個完成後觸發,但不但願接收前邊兩個CompletableFuture的輸出結果TU,且處理完成後不返回結果,則可使用Runnable,並調用runAfterEither方法
public CompletableFuture<Void> runAfterEither(CompletionStage<?> other,
                                                  Runnable action) {
        return orRunStage(null, other, action);
    }

    public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other,
                                                       Runnable action) {
        return orRunStage(asyncPool, other, action);
    }

    private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o,
                                               Runnable f) {
        CompletableFuture<?> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.orRun(this, b, f, null)) {
            OrRun<T,?> c = new OrRun<>(e, d, this, b, f);
            orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

runAfterEither方法將返回一個不包含返回值的CompletableFuture
runAfterEither方法和runAfterEitherAsync方法的區別同thenRun方法和thenRunAsync方法。

  • 若是但願當前置CompletableFuture和指定的CompletableFuture任意一個完成時觸發,並接收前邊兩個CompletableFuture的輸出結果TU,但處理完成後不返回結果,則可使用Consumer<? super T>,並調用acceptEither方法:
public CompletableFuture<Void> acceptEither(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(null, other, action);
    }

    public CompletableFuture<Void> acceptEitherAsync(
        CompletionStage<? extends T> other, Consumer<? super T> action) {
        return orAcceptStage(asyncPool, other, action);
    }

    private <U extends T> CompletableFuture<Void> orAcceptStage(
        Executor e, CompletionStage<U> o, Consumer<? super T> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        if (e != null || !d.orAccept(this, b, f, null)) {
            OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f);
            orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

acceptEither方法將返回一個不包含返回值的CompletableFuture
acceptEither方法和acceptEitherAsync方法的區別同thenRun方法和thenRunAsync方法。

  • 若是但願當前置CompletableFuture和指定的CompletableFuture任意一個完成後觸發,並接收前邊兩個CompletableFuture的輸出結果TU,處理完成後返回V做爲處理結果,則可使用Function<? super T, U>,並調用applyToEither方法:
public <U> CompletableFuture<U> applyToEither(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(null, other, fn);
    }

    public <U> CompletableFuture<U> applyToEitherAsync(
        CompletionStage<? extends T> other, Function<? super T, U> fn) {
        return orApplyStage(asyncPool, other, fn);
    }

    private <U extends T,V> CompletableFuture<V> orApplyStage(
        Executor e, CompletionStage<U> o,
        Function<? super T, ? extends V> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
        if (e != null || !d.orApply(this, b, f, null)) {
            OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f);
            orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

applyToEither方法將返回一個包含V類型返回值的CompletableFuture
applyToEither方法和applyToEitherAsync方法的區別同thenRun方法和thenRunAsync方法。

多CompletableFuture聯合處理

  • 若是但願若干CompletableFuture所有執行完成後再觸發,則應該將全部待等待的CompletableFuture任務所有傳入allOf方法:
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) {
        return andTree(cfs, 0, cfs.length - 1);
    }
    
    /** Recursively constructs a tree of completions. */
    static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs,
                                           int lo, int hi) {
        CompletableFuture<Void> d = new CompletableFuture<Void>();
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  andTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  andTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.biRelay(a, b)) {
            BiRelay<?,?> c = new BiRelay<>(d, a, b);
            a.bipush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

allOf方法將會返回一個不包含返回值的CompletableFuture,方便後置操做。

  • 若是但願若干CompletableFuture中任意一個執行完成後就觸發,則應該將全部待等待的CompletableFuture任務所有傳入anyOf方法:
public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) {
        return orTree(cfs, 0, cfs.length - 1);
    }
    
    /** Recursively constructs a tree of completions. */
    static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs,
                                            int lo, int hi) {
        CompletableFuture<Object> d = new CompletableFuture<Object>();
        CompletableFuture<?> a, b;
        int mid = (lo + hi) >>> 1;
        if ((a = (lo == mid ? cfs[lo] :
                  orTree(cfs, lo, mid))) == null ||
            (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] :
                  orTree(cfs, mid+1, hi)))  == null)
            throw new NullPointerException();
        if (!d.orRelay(a, b)) {
            OrRelay<?,?> c = new OrRelay<>(d, a, b);
            a.orpush(b, c);
            c.tryFire(SYNC);
        }
        return d;
    }

anyOf方法將返回率先完成的CompletableFuture的返回結果,方便後置操做。

CompletableFuture完成處理

  • 若是但願接收前置CompletableFuture結果和可能發生的異常,但不打算返回處理結果,則可使用BiConsumer<? super T, ? super Throwable>對象,並調用whenComplete方法:
public CompletableFuture<T> whenComplete(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(null, action);
    }

    public CompletableFuture<T> whenCompleteAsync(
        BiConsumer<? super T, ? super Throwable> action) {
        return uniWhenCompleteStage(asyncPool, action);
    }
    
    private CompletableFuture<T> uniWhenCompleteStage(
        Executor e, BiConsumer<? super T, ? super Throwable> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<T> d = new CompletableFuture<T>();
        if (e != null || !d.uniWhenComplete(this, f, null)) {
            UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

whenComplete方法將會返回一個不包含返回值的CompletableFuture
whenComplete方法和whenCompleteAsync方法的區別同thenRun方法和thenRunAsync方法。

  • 若是但願接收前置CompletableFuture結果和可能發生的異常,並打算返回處理結果,則可使用BiFunction<? super T, Throwable, ? extends U>對象,並調用handle方法:
public <U> CompletableFuture<U> handle(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(null, fn);
    }

    public <U> CompletableFuture<U> handleAsync(
        BiFunction<? super T, Throwable, ? extends U> fn) {
        return uniHandleStage(asyncPool, fn);
    }
    
    private <V> CompletableFuture<V> uniHandleStage(
        Executor e, BiFunction<? super T, Throwable, ? extends V> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
        if (e != null || !d.uniHandle(this, f, null)) {
            UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

handle方法將會返回一個包含U類型返回值的CompletableFuture
handle方法和handleAsync方法的區別同thenRun方法和thenRunAsync方法。

CompletableFuture異常處理

  • 若是但願處理前置CompletableFuture可能發生的異常,並打算返回處理結果,則可使用Function<Throwable, ? extends T>對象,並調用exceptionally方法:
public CompletableFuture<T> exceptionally(
        Function<Throwable, ? extends T> fn) {
        return uniExceptionallyStage(fn);
    }
    
    private CompletableFuture<T> uniExceptionallyStage(
        Function<Throwable, ? extends T> f) {
        if (f == null) throw new NullPointerException();
        CompletableFuture<T> d = new CompletableFuture<T>();
        if (!d.uniExceptionally(this, f, null)) {
            UniExceptionally<T> c = new UniExceptionally<T>(d, this, f);
            push(c);
            c.tryFire(SYNC);
        }
        return d;
    }

exceptionally方法將會返回一個包含T類型返回值的CompletableFuture

流程示例

public static void main(String[] args) {
        System.out.println(testCF());
    }



    private static String testCF() {
        Map<String, Object> results = new HashMap<>();
        List<CompletableFuture<Void>> completableFutures = Lists.newArrayList();

        System.out.println(Thread.currentThread()
            + "testCF start ...... ");

        for (int i = 1; i <= 5; i++) {

            int finalI = i;
            CompletableFuture<Void> completableFuture = CompletableFuture
                .supplyAsync(
                    () -> {
                        System.out.println(Thread.currentThread()
                            + "supplyAsync Job" + finalI);
                        try {
                            Thread.sleep(finalI * 2000);
                        } catch (InterruptedException e) {
                            throw new RuntimeException(e);
                        }
//                        if (true) {
//                            throw new RuntimeException("測試異常" + finalI);
//                        }
                        return "Job" + finalI;
                    })
                .thenAcceptAsync(
                    (queryResult) -> {
                        System.out.println(Thread.currentThread()
                            + "thenAcceptAsync " + queryResult);
                        results.put(String.valueOf(finalI), queryResult);
                    })
                .whenCompleteAsync(
                    (aVoid, throwable) -> {
                        if (Objects.nonNull(throwable)) {
                            System.err.println(Thread.currentThread()
                                + "whenCompleteAsync Job" + throwable.getMessage());
                        }
                        System.out.println(Thread.currentThread()
                            + "whenCompleteAsync Job" + finalI);
                    })
                .exceptionally(
                    throwable -> {
                        System.err.println("exceptionally " + throwable.getMessage());
                        throw new DaportalException(throwable.getMessage());
                    });

            completableFutures.add(completableFuture);
        }

        System.out.println(Thread.currentThread()
            + "for loop completed  ...... ");

        CompletableFuture[] voidCompletableFuture = {};
        return CompletableFuture
            .allOf(completableFutures.toArray(voidCompletableFuture))
            .handle(
                (aVoid, throwable) -> {
                    System.out.println(Thread.currentThread()
                        + "handle ...... ");
                    if (Objects.nonNull(throwable)) {
                        System.err.println("handle " + throwable.getMessage());
                        throw new DaportalException(throwable.getMessage());
                    }
                    try {
                        return StringUtil.MAPPER.writeValueAsString(results);
                    } catch (JsonProcessingException e) {
                        throw new DaportalException(e.getMessage());
                    }
                })
            .join();

    }

執行輸出:

Thread[main,5,main]testCF start ...... 
Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job1
Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job2
Thread[ForkJoinPool.commonPool-worker-3,5,main]supplyAsync Job3
Thread[main,5,main]for loop completed  ...... 



Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job1
Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job4




Thread[ForkJoinPool.commonPool-worker-2,5,main]whenCompleteAsync Job1
Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job5




Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job2
Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job3
Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job2
Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job3








Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job4
Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job4








Thread[ForkJoinPool.commonPool-worker-2,5,main]thenAcceptAsync Job5
Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job5
Thread[ForkJoinPool.commonPool-worker-1,5,main]handle ...... 
{"1":"Job1","2":"Job2","3":"Job3","4":"Job4","5":"Job5"}


Process finished with exit code 0

換行越多表明時間間隔越長

下面是整個流程中各個異步線程的工做狀況:

CompletableFuture流程 (2).png

總結

以上全部CompletableFuture方法都將返回一個CompletableFuture,以此來支持鏈式編程。
全部帶async後綴的方法會從新入棧並等待調度,並使用CAS樂觀鎖提高併發性能:
/** Returns true if successfully pushed c onto stack. */
    final boolean tryPushStack(Completion c) {
        Completion h = stack;
        lazySetNext(c, h);
        return UNSAFE.compareAndSwapObject(this, STACK, h, c);// CAS樂觀鎖
    }
全部帶async後綴的方法都會包含一個加Executor參數的重載方法,用於指定外部線程池,默認commonPool大小是3(這一點能夠從後續的「流程示例」得出):
private static final Executor asyncPool = useCommonPool ?
        ForkJoinPool.commonPool() : new ThreadPerTaskExecutor();
    
    ......
    
    static Executor screenExecutor(Executor e) {
        if (!useCommonPool && e == ForkJoinPool.commonPool())
            return asyncPool;
        if (e == null) throw new NullPointerException();
        return e;
    }
相關文章
相關標籤/搜索