一、CompletableFuture:java
基本的方式建立CompletableFuture<T>對象api
package future; import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; public class CompletableAction { private static Random random = new Random(); public static void main(String[] args) { CompletableFuture<Double> completableFuture = new CompletableFuture<>(); new Thread(() -> { Double i = get(); completableFuture.complete(i); }).start(); // 使用get方法,會進入阻塞,所以必須等到輸出隨機數,再輸出 123123 /*try { Optional.ofNullable(completableFuture.get()).ifPresent(System.out::println); System.out.println("123123"); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); }*/ // whenComplete() 方法不會進入阻塞,所以先輸出 123123,再輸出隨機數 completableFuture.whenComplete((v, t) -> { Optional.ofNullable(v).ifPresent(System.out::println); Optional.ofNullable(t).ifPresent(Throwable::printStackTrace); }); System.out.println("123123"); } public static Double get() { try { Thread.sleep(3000L); return (double) random.nextInt(10000); } catch (InterruptedException e) { e.printStackTrace(); return null; } } }
輸出的結果:app
經過CompletableFuture.supplyAsync(Supplier<T> supplier) 或者 CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor) 方法進行建立CompletableFuture對象實例dom
package future; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; public class CompletableAction2 { public static void main(String[] args) { AtomicBoolean atomicBoolean = new AtomicBoolean(false); ExecutorService executorService = Executors.newFixedThreadPool(2); ExecutorService executorService2 = Executors.newFixedThreadPool(2, (runnable) -> { Thread thread = new Thread(runnable); return thread; }); // supplyAsync(Supplier<T> supplier, Executor executor) 方法, // 採用了 executorService.shutdown()方法 和 AtomicBoolean原子操做判斷是否執行完,而後退出線程 CompletableFuture.supplyAsync(CompletableAction::get, executorService2) .whenComplete((v, t) -> { Optional.ofNullable(v).ifPresent(System.out::println); Optional.ofNullable(t).ifPresent(Throwable::printStackTrace); atomicBoolean.set(true); }); System.out.println(123123); while (!atomicBoolean.get()) { try { Thread.sleep(100); } catch (InterruptedException e) { e.printStackTrace(); } } executorService2.shutdown(); //================================================================ // supplyAsync() 方法是一個守護線程, // 當main線程執行完,直接關閉線程,所以supplyAsync()方法的內容來不及打印輸出 /*CompletableFuture.supplyAsync(CompletableAction::get) .whenComplete((v, t) -> { Optional.ofNullable(v).ifPresent(System.out::println); Optional.ofNullable(t).ifPresent(Throwable::printStackTrace); }); System.out.println(123123);*/ // 讓main線程運行停在這裏,沒與關閉 /*try { Thread.currentThread().join(); } catch (InterruptedException e) { e.printStackTrace(); }*/ } }
CompletableFuture<T>的 thenApply() 方法和 join() 方法使用atom
package future; import java.util.Arrays; import java.util.List; import java.util.Optional; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; public class CompletableAction3 { public static void main(String[] args) { List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5); AtomicBoolean atomicBoolean = new AtomicBoolean(false); ExecutorService executorService = Executors.newFixedThreadPool(2, (r) -> { Thread thread = new Thread(r); return thread; }); /*CompletableFuture.supplyAsync(CompletableAction::get, executorService).whenComplete((v, t) -> { atomicBoolean.set(true); Optional.ofNullable(v).ifPresent(System.out::println); Optional.ofNullable(t).ifPresent(Throwable::printStackTrace); }); while (!atomicBoolean.get()) { executorService.shutdown(); }*/ // 原始方式使用 Stream<CompletableFuture<Double>> completableFutureStream = integers.stream().map(i -> { return CompletableFuture.supplyAsync(() -> CompletableAction3.getProductById(i), executorService); }); Stream<Double> doubleStream = completableFutureStream.map(d -> { Double multiply = null; try { multiply = CompletableAction3.multiply(d.get()); } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return multiply; }); List<Double> doubleList = doubleStream.collect(toList()); Optional.of(doubleList).ifPresent(System.out::println); // CompletableFuture的 join() 和 thenApply() 方法使用 Stream<CompletableFuture<Double>> completableFutureStream2 = integers.stream().map(i -> { return CompletableFuture.supplyAsync(() -> CompletableAction3.getProductById(i), executorService); }); Stream<CompletableFuture<Double>> completableFutureStream1 = completableFutureStream.map(future -> { return future.thenApply(CompletableAction3::multiply); }); Stream<Double> doubleStream2 = completableFutureStream1.map(CompletableFuture::join); List<Double> doubleList2 = doubleStream.collect(toList()); Optional.of(doubleList2).ifPresent(System.out::println); } // 模擬取到數據以後進行相關業務操做,這裏模擬取到的數據 * 10 public static Double multiply(Double d) { try { Thread.sleep(3000); return d * 10d; } catch (InterruptedException e) { e.printStackTrace(); return null; } } // 模擬根據 ID 查詢商品 public static Double getProductById(int i) { Double aDouble = CompletableAction.get(); System.out.println(aDouble); return aDouble; } }
經常使用api:spa
package future; import java.util.Arrays; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.stream.Stream; import static java.util.stream.Collectors.toList; public class CompletableAction4 { public static void main(String[] args) throws InterruptedException { ExecutorService executorService = Executors.newFixedThreadPool(2, (r) -> { Thread thread = new Thread(r); return thread; }); // thenApply()、supplyAsync()、whenComplete() 方法 CompletableFuture.supplyAsync(() -> 1d) .thenApply(CompletableAction3::multiply) .whenComplete((v, t) -> { System.out.println(v); }); // thenRun() 方法 CompletableFuture.supplyAsync(() -> 1) .thenRun(() -> { System.out.println("========》 use the thenRun() method"); }); // thenAccept() 方法 CompletableFuture.supplyAsync(() -> 1) .thenAccept(r -> { System.out.println("========》 use the thenAccept() method: " + r); }); // thenCompose() 方法 CompletableFuture.supplyAsync(() -> 1) .thenApply(r -> r * 10d) .thenCompose(r -> CompletableFuture.supplyAsync(() -> r)) .thenAccept(r -> { System.out.println("========》 use the thenCompose() method: " + r); }); // thenCombine() 方法, 有返回值 CompletableFuture.supplyAsync(() -> 1) .thenCombine(CompletableFuture.supplyAsync(() -> 2d), (r1, r2) -> { System.out.println("========》 use the thenCombine() method: " + (r1 + r2)); return r1 + r2; }); // thenAcceptBoth() 方法,無返回值。和 then Combine() 方法相似, CompletableFuture.supplyAsync(()-> 1) .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 3d), (r1, r2) -> { System.out.println("========》 use the thenAcceptBoth() method: " + (r1 + r2)); }); // runAfterBoth() 方法,直接執行完。只有兩個 CompletableFuture 對象實例化完成才返回 CompletableFuture.supplyAsync(() -> { // try { // Thread.sleep(2000); // System.out.println("sleep 2s"); // } catch (InterruptedException e) { // e.printStackTrace(); // } return 1; }).runAfterBoth(CompletableFuture.supplyAsync(() -> { // try { // Thread.sleep(3000); // System.out.println("sleep 3s"); // } catch (InterruptedException e) { // e.printStackTrace(); // } return 2; }), () -> System.out.println("========》 use the runAfterBoth() method")); // runAfterEither() 方法,直接執行完。只要有一個 CompletableFuture 對象實例化完成就直接返回 CompletableFuture.supplyAsync(() -> { // try { // Thread.sleep(2000); // System.out.println("sleep 2s"); // } catch (InterruptedException e) { // e.printStackTrace(); // } return 1; }).runAfterEither(CompletableFuture.supplyAsync(() -> { // try { // Thread.sleep(3000); // System.out.println("sleep 3s"); // } catch (InterruptedException e) { // e.printStackTrace(); // } return 2; }), () -> System.out.println("========》 use the runAfterEither() method")); // acceptEither() 方法 CompletableFuture.supplyAsync(() -> 1) .acceptEither(CompletableFuture.supplyAsync(() -> 2), (r1) -> { System.out.println("========》 use the acceptEither() method:" + r1); }); // acceptEither() 方法 CompletableFuture.supplyAsync(() -> 1) .applyToEither(CompletableFuture.supplyAsync(() -> 2), (r1) -> r1 * 10d) .whenComplete((v, t) -> { System.out.println("========》 use the applyToEither() method:" + v); }); // allOf() 方法 必須等全部的實例化完,才返回。前提條件是線程沒結束 List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5); Stream<CompletableFuture<Integer>> completableFutureStream = integers.stream().map(i -> CompletableFuture.supplyAsync(() -> { System.out.println(i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return i; })); List<CompletableFuture<Integer>> completableFutureList = completableFutureStream.collect(toList()); CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]); CompletableFuture.allOf(completableFutures) .thenApply(r -> 10d) .whenComplete((v, t) -> { System.out.println("========》 use the allOf() method:" + v); t.printStackTrace(); }); // anyOf() 方法, 只要有一個實例化完直接返回,前提條件是線程沒結束 List<Integer> integers2 = Arrays.asList(1, 2, 3, 4, 5); Stream<CompletableFuture<Integer>> completableFutureStream2 = integers2.stream().map(i -> CompletableFuture.supplyAsync(() -> { System.out.println(i); try { Thread.sleep(1000); } catch (InterruptedException e) { e.printStackTrace(); } return i; })); List<CompletableFuture<Integer>> completableFutureList2 = completableFutureStream2.collect(toList()); CompletableFuture[] completableFutures2 = completableFutureList.toArray(new CompletableFuture[completableFutureList2.size()]); CompletableFuture.anyOf(completableFutures2) .thenApply(r -> 10d) .whenComplete((v, t) -> { System.out.println("========》 use the anyOf() method:" + v); }); Thread.currentThread().join(); } }