最近工做中須要用到異步編程實現一個多數據集併發異步查詢的功能,之前也嘗試使用過CompletableFuture
,但沒有深刻原理探究,也沒有概括總結,今天正好有時間,集中學習下。編程
本文結構爲:痛點->解決方案->應用場景->流程示例-->概括總結併發
在Java8之前,異步編程的實現大概是下邊這樣子的:app
class App { ExecutorService executor = ... ArchiveSearcher searcher = ... void showSearch(final String target) throws InterruptedException { Future<String> future = executor.submit(new Callable<String>() { public String call() { return searcher.search(target); } }); displayOtherThings(); // do other things while searching try { displayText(future.get()); // use future } catch (ExecutionException ex) { cleanup(); return; } } }
future.get()方法會阻塞直到計算完成,而後返回結果。
這樣一來,若是咱們須要在計算完成後執行後續操做,則只有兩個選擇:異步
isDone()
返回true
。但這樣會極大消耗計算資源。爲了解決這個痛點,Java8新增了CompletableFuture
類,顧名思義這是一個"可完成"的Future
。async
CompletableFuture<T>
實現了Future<T>
, CompletionStage<T>
,這樣保證了咱們能夠繼續使用Future
的方法,同時複用了CompletionStage
的各類功能。異步編程
下面就從一些常見的應用場景入手,逐個分析最適用的解決方案。固然有時對於部分場景,會有多個可選方案,這時候就須要從擴展性、可維護性、性能以及易讀性等方面綜合考慮,作出選擇。oop
Runnable
對象,並調用runAsync
方法:public static CompletableFuture<Void> runAsync(Runnable runnable) { return asyncRunStage(asyncPool, runnable); } static CompletableFuture<Void> asyncRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); e.execute(new AsyncRun(d, f)); return d; }
runAsync
方法將返回一個CompletableFuture<Void>
對象,CompletableFuture
不包含任何類型的返回結果。性能
Supplier<U>
對象,並調用supplyAsync
方法:public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) { return asyncSupplyStage(asyncPool, supplier); } static <U> CompletableFuture<U> asyncSupplyStage(Executor e, Supplier<U> f) { if (f == null) throw new NullPointerException(); CompletableFuture<U> d = new CompletableFuture<U>(); e.execute(new AsyncSupply<U>(d, f)); return d; }
supplyAsync
方法將返回一個CompletableFuture<U>
對象,該CompletableFuture
會包含一個由泛型U
指定類型的計算結果。學習
Runnable
對象,並調用thenRun
方法:public CompletableFuture<Void> thenRun(Runnable action) { return uniRunStage(null, action); } public CompletableFuture<Void> thenRunAsync(Runnable action) { return uniRunStage(asyncPool, action); } private CompletableFuture<Void> uniRunStage(Executor e, Runnable f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniRun(this, f, null)) { UniRun<T> c = new UniRun<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenRun
方法將會返回一個不包含返回值的CompletableFuture
。thenRun
方法和thenRunAsync
方法的區別是,thenRun
會在前置CompletableFuture
的線程內執行,而thenRunAsync
會在一個新線程中執行。測試
Consumer<? super T>
對象,並調用thenAccept
方法:public CompletableFuture<Void> thenAccept(Consumer<? super T> action) { return uniAcceptStage(null, action); } public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action) { return uniAcceptStage(asyncPool, action); } private CompletableFuture<Void> uniAcceptStage(Executor e, Consumer<? super T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.uniAccept(this, f, null)) { UniAccept<T> c = new UniAccept<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenAccept
方法將會返回一個不包含返回值的CompletableFuture
。thenAccept
方法和thenAcceptAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
Function<? super T,? extends U>
對象,並調用thenApply
方法:public <U> CompletableFuture<U> thenApply( Function<? super T,? extends U> fn) { return uniApplyStage(null, fn); } public <U> CompletableFuture<U> thenApplyAsync( Function<? super T,? extends U> fn) { return uniApplyStage(asyncPool, fn); } private <V> CompletableFuture<V> uniApplyStage( Executor e, Function<? super T,? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniApply(this, f, null)) { UniApply<T,V> c = new UniApply<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
thenApply
方法將會返回一個包含U
類型返回值的CompletableFuture
。thenApply
方法和thenApplyAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
CompletableFuture
的處理結果,則可使用Function<? super T, ? extends CompletionStage<U>>
對象,並調用thenCompose
方法:public <U> CompletableFuture<U> thenCompose( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(null, fn); } public <U> CompletableFuture<U> thenComposeAsync( Function<? super T, ? extends CompletionStage<U>> fn) { return uniComposeStage(asyncPool, fn); } private <V> CompletableFuture<V> uniComposeStage( Executor e, Function<? super T, ? extends CompletionStage<V>> f) { ... Object r; Throwable x; if (e == null && (r = result) != null) { // try to return function result directly if (r instanceof AltResult) { if ((x = ((AltResult)r).ex) != null) { return new CompletableFuture<V>(encodeThrowable(x, r)); } r = null; } try { @SuppressWarnings("unchecked") T t = (T) r; CompletableFuture<V> g = f.apply(t).toCompletableFuture(); Object s = g.result; if (s != null) return new CompletableFuture<V>(encodeRelay(s)); CompletableFuture<V> d = new CompletableFuture<V>(); UniRelay<V> copy = new UniRelay<V>(d, g); g.push(copy); copy.tryFire(SYNC); return d; } catch (Throwable ex) { return new CompletableFuture<V>(encodeThrowable(ex)); } } CompletableFuture<V> d = new CompletableFuture<V>(); UniCompose<T,V> c = new UniCompose<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); return d; }
thenCombine
方法將會返回一個包含CompletableFuture
類型返回值的CompletableFuture
。thenCompose
方法和thenComposeAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
所有完成後觸發,但不但願接收前邊兩個CompletableFuture
的輸出結果T
、U
,且處理完成後不返回結果,則可使用Runnable
,並調用runAfterBoth
方法public CompletableFuture<Void> runAfterBoth(CompletionStage<?> other, Runnable action) { return biRunStage(null, other, action); } public CompletableFuture<Void> runAfterBothAsync(CompletionStage<?> other, Runnable action) { return biRunStage(asyncPool, other, action); } private CompletableFuture<Void> biRunStage(Executor e, CompletionStage<?> o, Runnable f) { CompletableFuture<?> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.biRun(this, b, f, null)) { BiRun<T,?> c = new BiRun<>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
runAfterBoth
方法會返回一個不包含返回值的CompletableFuture
。runAfterBoth
方法和runAfterBothAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
所有完成後觸發,並接收前邊兩個CompletableFuture
的輸出結果T
、U
,但處理完成後不返回結果,則可使用BiConsumer<? super T, ? super U>
,並調用thenAcceptBoth
方法:public <U> CompletableFuture<Void> thenAcceptBoth( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(null, other, action); } public <U> CompletableFuture<Void> thenAcceptBothAsync( CompletionStage<? extends U> other, BiConsumer<? super T, ? super U> action) { return biAcceptStage(asyncPool, other, action); } private <U> CompletableFuture<Void> biAcceptStage( Executor e, CompletionStage<U> o, BiConsumer<? super T,? super U> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.biAccept(this, b, f, null)) { BiAccept<T,U> c = new BiAccept<T,U>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
thenAcceptBoth
方法會返回一個不包含返回值的CompletableFuture
。thenAcceptBoth
方法和thenAcceptBothAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
所有完成後觸發,並接收前邊兩個CompletableFuture
的輸出結果T
、U
,且處理完成後返回V
做爲處理結果,則可使用BiFunction<? super T,? super U,? extends V>
,並調用thenCombine
方法:public <U,V> CompletableFuture<V> thenCombine( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(null, other, fn); } public <U,V> CompletableFuture<V> thenCombineAsync( CompletionStage<? extends U> other, BiFunction<? super T,? super U,? extends V> fn) { return biApplyStage(asyncPool, other, fn); } private <U,V> CompletableFuture<V> biApplyStage( Executor e, CompletionStage<U> o, BiFunction<? super T,? super U,? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.biApply(this, b, f, null)) { BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f); bipush(b, c); c.tryFire(SYNC); } return d; }
thenCombine
方法會返回一個包含V
類型返回值的CompletableFuture
。thenCombine
方法和thenCombineAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
任意一個完成後觸發,但不但願接收前邊兩個CompletableFuture
的輸出結果T
、U
,且處理完成後不返回結果,則可使用Runnable
,並調用runAfterEither
方法public CompletableFuture<Void> runAfterEither(CompletionStage<?> other, Runnable action) { return orRunStage(null, other, action); } public CompletableFuture<Void> runAfterEitherAsync(CompletionStage<?> other, Runnable action) { return orRunStage(asyncPool, other, action); } private CompletableFuture<Void> orRunStage(Executor e, CompletionStage<?> o, Runnable f) { CompletableFuture<?> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.orRun(this, b, f, null)) { OrRun<T,?> c = new OrRun<>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
runAfterEither
方法將返回一個不包含返回值的CompletableFuture
。runAfterEither
方法和runAfterEitherAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
任意一個完成時觸發,並接收前邊兩個CompletableFuture
的輸出結果T
、U
,但處理完成後不返回結果,則可使用Consumer<? super T>
,並調用acceptEither
方法:public CompletableFuture<Void> acceptEither( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(null, other, action); } public CompletableFuture<Void> acceptEitherAsync( CompletionStage<? extends T> other, Consumer<? super T> action) { return orAcceptStage(asyncPool, other, action); } private <U extends T> CompletableFuture<Void> orAcceptStage( Executor e, CompletionStage<U> o, Consumer<? super T> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<Void> d = new CompletableFuture<Void>(); if (e != null || !d.orAccept(this, b, f, null)) { OrAccept<T,U> c = new OrAccept<T,U>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
acceptEither
方法將返回一個不包含返回值的CompletableFuture
。acceptEither
方法和acceptEitherAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
CompletableFuture
和指定的CompletableFuture
任意一個完成後觸發,並接收前邊兩個CompletableFuture
的輸出結果T
、U
,處理完成後返回V
做爲處理結果,則可使用Function<? super T, U>
,並調用applyToEither
方法:public <U> CompletableFuture<U> applyToEither( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(null, other, fn); } public <U> CompletableFuture<U> applyToEitherAsync( CompletionStage<? extends T> other, Function<? super T, U> fn) { return orApplyStage(asyncPool, other, fn); } private <U extends T,V> CompletableFuture<V> orApplyStage( Executor e, CompletionStage<U> o, Function<? super T, ? extends V> f) { CompletableFuture<U> b; if (f == null || (b = o.toCompletableFuture()) == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.orApply(this, b, f, null)) { OrApply<T,U,V> c = new OrApply<T,U,V>(e, d, this, b, f); orpush(b, c); c.tryFire(SYNC); } return d; }
applyToEither
方法將返回一個包含V
類型返回值的CompletableFuture
。applyToEither
方法和applyToEitherAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
CompletableFuture
所有執行完成後再觸發,則應該將全部待等待的CompletableFuture
任務所有傳入allOf
方法:public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs) { return andTree(cfs, 0, cfs.length - 1); } /** Recursively constructs a tree of completions. */ static CompletableFuture<Void> andTree(CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Void> d = new CompletableFuture<Void>(); CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : andTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : andTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.biRelay(a, b)) { BiRelay<?,?> c = new BiRelay<>(d, a, b); a.bipush(b, c); c.tryFire(SYNC); } return d; }
allOf
方法將會返回一個不包含返回值的CompletableFuture
,方便後置操做。
CompletableFuture
中任意一個執行完成後就觸發,則應該將全部待等待的CompletableFuture
任務所有傳入anyOf
方法:public static CompletableFuture<Object> anyOf(CompletableFuture<?>... cfs) { return orTree(cfs, 0, cfs.length - 1); } /** Recursively constructs a tree of completions. */ static CompletableFuture<Object> orTree(CompletableFuture<?>[] cfs, int lo, int hi) { CompletableFuture<Object> d = new CompletableFuture<Object>(); CompletableFuture<?> a, b; int mid = (lo + hi) >>> 1; if ((a = (lo == mid ? cfs[lo] : orTree(cfs, lo, mid))) == null || (b = (lo == hi ? a : (hi == mid+1) ? cfs[hi] : orTree(cfs, mid+1, hi))) == null) throw new NullPointerException(); if (!d.orRelay(a, b)) { OrRelay<?,?> c = new OrRelay<>(d, a, b); a.orpush(b, c); c.tryFire(SYNC); } return d; }
anyOf
方法將返回率先完成的CompletableFuture
的返回結果,方便後置操做。
BiConsumer<? super T, ? super Throwable>
對象,並調用whenComplete
方法:public CompletableFuture<T> whenComplete( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(null, action); } public CompletableFuture<T> whenCompleteAsync( BiConsumer<? super T, ? super Throwable> action) { return uniWhenCompleteStage(asyncPool, action); } private CompletableFuture<T> uniWhenCompleteStage( Executor e, BiConsumer<? super T, ? super Throwable> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = new CompletableFuture<T>(); if (e != null || !d.uniWhenComplete(this, f, null)) { UniWhenComplete<T> c = new UniWhenComplete<T>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
whenComplete
方法將會返回一個不包含返回值的CompletableFuture
。whenComplete
方法和whenCompleteAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
BiFunction<? super T, Throwable, ? extends U>
對象,並調用handle
方法:public <U> CompletableFuture<U> handle( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(null, fn); } public <U> CompletableFuture<U> handleAsync( BiFunction<? super T, Throwable, ? extends U> fn) { return uniHandleStage(asyncPool, fn); } private <V> CompletableFuture<V> uniHandleStage( Executor e, BiFunction<? super T, Throwable, ? extends V> f) { if (f == null) throw new NullPointerException(); CompletableFuture<V> d = new CompletableFuture<V>(); if (e != null || !d.uniHandle(this, f, null)) { UniHandle<T,V> c = new UniHandle<T,V>(e, d, this, f); push(c); c.tryFire(SYNC); } return d; }
handle
方法將會返回一個包含U
類型返回值的CompletableFuture
。handle
方法和handleAsync
方法的區別同thenRun
方法和thenRunAsync
方法。
Function<Throwable, ? extends T>
對象,並調用exceptionally
方法:public CompletableFuture<T> exceptionally( Function<Throwable, ? extends T> fn) { return uniExceptionallyStage(fn); } private CompletableFuture<T> uniExceptionallyStage( Function<Throwable, ? extends T> f) { if (f == null) throw new NullPointerException(); CompletableFuture<T> d = new CompletableFuture<T>(); if (!d.uniExceptionally(this, f, null)) { UniExceptionally<T> c = new UniExceptionally<T>(d, this, f); push(c); c.tryFire(SYNC); } return d; }
exceptionally
方法將會返回一個包含T
類型返回值的CompletableFuture
。
public static void main(String[] args) { System.out.println(testCF()); } private static String testCF() { Map<String, Object> results = new HashMap<>(); List<CompletableFuture<Void>> completableFutures = Lists.newArrayList(); System.out.println(Thread.currentThread() + "testCF start ...... "); for (int i = 1; i <= 5; i++) { int finalI = i; CompletableFuture<Void> completableFuture = CompletableFuture .supplyAsync( () -> { System.out.println(Thread.currentThread() + "supplyAsync Job" + finalI); try { Thread.sleep(finalI * 2000); } catch (InterruptedException e) { throw new RuntimeException(e); } // if (true) { // throw new RuntimeException("測試異常" + finalI); // } return "Job" + finalI; }) .thenAcceptAsync( (queryResult) -> { System.out.println(Thread.currentThread() + "thenAcceptAsync " + queryResult); results.put(String.valueOf(finalI), queryResult); }) .whenCompleteAsync( (aVoid, throwable) -> { if (Objects.nonNull(throwable)) { System.err.println(Thread.currentThread() + "whenCompleteAsync Job" + throwable.getMessage()); } System.out.println(Thread.currentThread() + "whenCompleteAsync Job" + finalI); }) .exceptionally( throwable -> { System.err.println("exceptionally " + throwable.getMessage()); throw new DaportalException(throwable.getMessage()); }); completableFutures.add(completableFuture); } System.out.println(Thread.currentThread() + "for loop completed ...... "); CompletableFuture[] voidCompletableFuture = {}; return CompletableFuture .allOf(completableFutures.toArray(voidCompletableFuture)) .handle( (aVoid, throwable) -> { System.out.println(Thread.currentThread() + "handle ...... "); if (Objects.nonNull(throwable)) { System.err.println("handle " + throwable.getMessage()); throw new DaportalException(throwable.getMessage()); } try { return StringUtil.MAPPER.writeValueAsString(results); } catch (JsonProcessingException e) { throw new DaportalException(e.getMessage()); } }) .join(); }
執行輸出:
Thread[main,5,main]testCF start ...... Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job1 Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]supplyAsync Job3 Thread[main,5,main]for loop completed ...... Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job1 Thread[ForkJoinPool.commonPool-worker-1,5,main]supplyAsync Job4 Thread[ForkJoinPool.commonPool-worker-2,5,main]whenCompleteAsync Job1 Thread[ForkJoinPool.commonPool-worker-2,5,main]supplyAsync Job5 Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]thenAcceptAsync Job3 Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job2 Thread[ForkJoinPool.commonPool-worker-3,5,main]whenCompleteAsync Job3 Thread[ForkJoinPool.commonPool-worker-1,5,main]thenAcceptAsync Job4 Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job4 Thread[ForkJoinPool.commonPool-worker-2,5,main]thenAcceptAsync Job5 Thread[ForkJoinPool.commonPool-worker-1,5,main]whenCompleteAsync Job5 Thread[ForkJoinPool.commonPool-worker-1,5,main]handle ...... {"1":"Job1","2":"Job2","3":"Job3","4":"Job4","5":"Job5"} Process finished with exit code 0
換行越多表明時間間隔越長
下面是整個流程中各個異步線程的工做狀況:
CompletableFuture
方法都將返回一個CompletableFuture
,以此來支持鏈式編程。async
後綴的方法會從新入棧並等待調度,並使用CAS樂觀鎖提高併發性能:/** Returns true if successfully pushed c onto stack. */ final boolean tryPushStack(Completion c) { Completion h = stack; lazySetNext(c, h); return UNSAFE.compareAndSwapObject(this, STACK, h, c);// CAS樂觀鎖 }
async
後綴的方法都會包含一個加Executor
參數的重載方法,用於指定外部線程池,默認commonPool
大小是3(這一點能夠從後續的「流程示例」得出):private static final Executor asyncPool = useCommonPool ? ForkJoinPool.commonPool() : new ThreadPerTaskExecutor(); ...... static Executor screenExecutor(Executor e) { if (!useCommonPool && e == ForkJoinPool.commonPool()) return asyncPool; if (e == null) throw new NullPointerException(); return e; }