Java8新特性 - CompletableFuture

一、CompletableFuture:java

    基本的方式建立CompletableFuture<T>對象api

package future;

import java.util.Optional;
import java.util.Random;
import java.util.concurrent.CompletableFuture;

public class CompletableAction {

    private static Random random = new Random();

    public static void main(String[] args) {

        CompletableFuture<Double> completableFuture = new CompletableFuture<>();

        new Thread(() -> {
            Double i = get();
            completableFuture.complete(i);
        }).start();

        // 使用get方法,會進入阻塞,所以必須等到輸出隨機數,再輸出 123123
        /*try {
            Optional.ofNullable(completableFuture.get()).ifPresent(System.out::println);
            System.out.println("123123");
        } catch (InterruptedException | ExecutionException e) {
            e.printStackTrace();
        }*/

        // whenComplete() 方法不會進入阻塞,所以先輸出 123123,再輸出隨機數
        completableFuture.whenComplete((v, t) -> {
            Optional.ofNullable(v).ifPresent(System.out::println);
            Optional.ofNullable(t).ifPresent(Throwable::printStackTrace);
        });
        System.out.println("123123");
    }

    public static Double get() {
        try {
            Thread.sleep(3000L);
            return (double) random.nextInt(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }
}

輸出的結果:app

 

經過CompletableFuture.supplyAsync(Supplier<T> supplier) 或者 CompletableFuture.supplyAsync(Supplier<T> supplier, Executor executor) 方法進行建立CompletableFuture對象實例dom

package future;

import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;

public class CompletableAction2 {

    public static void main(String[] args) {

        AtomicBoolean atomicBoolean = new AtomicBoolean(false);

        ExecutorService executorService = Executors.newFixedThreadPool(2);
        ExecutorService executorService2 = Executors.newFixedThreadPool(2, (runnable) -> {
            Thread thread = new Thread(runnable);
            return thread;
        });

        // supplyAsync(Supplier<T> supplier, Executor executor) 方法,
        // 採用了 executorService.shutdown()方法 和 AtomicBoolean原子操做判斷是否執行完,而後退出線程
        CompletableFuture.supplyAsync(CompletableAction::get, executorService2)
                .whenComplete((v, t) -> {
                    Optional.ofNullable(v).ifPresent(System.out::println);
                    Optional.ofNullable(t).ifPresent(Throwable::printStackTrace);
                    atomicBoolean.set(true);
                });
        System.out.println(123123);

        while (!atomicBoolean.get()) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        executorService2.shutdown();

        //================================================================

        // supplyAsync() 方法是一個守護線程,
        // main線程執行完,直接關閉線程,所以supplyAsync()方法的內容來不及打印輸出
        /*CompletableFuture.supplyAsync(CompletableAction::get)
                .whenComplete((v, t) -> {
                    Optional.ofNullable(v).ifPresent(System.out::println);
                    Optional.ofNullable(t).ifPresent(Throwable::printStackTrace);
                });
        System.out.println(123123);*/

        // main線程運行停在這裏,沒與關閉
        /*try {
            Thread.currentThread().join();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }*/
    }
}

 

    CompletableFuture<T>的 thenApply() 方法和 join() 方法使用atom

package future;

import java.util.Arrays;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

public class CompletableAction3 {

    public static void main(String[] args) {

        List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);

        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
        ExecutorService executorService = Executors.newFixedThreadPool(2, (r) -> {
            Thread thread = new Thread(r);
            return thread;
        });

        /*CompletableFuture.supplyAsync(CompletableAction::get, executorService).whenComplete((v, t) -> {
            atomicBoolean.set(true);
            Optional.ofNullable(v).ifPresent(System.out::println);
            Optional.ofNullable(t).ifPresent(Throwable::printStackTrace);
        });

        while (!atomicBoolean.get()) {
            executorService.shutdown();
        }*/

        // 原始方式使用
        Stream<CompletableFuture<Double>> completableFutureStream = integers.stream().map(i -> {
            return CompletableFuture.supplyAsync(() -> CompletableAction3.getProductById(i), executorService);
        });

        Stream<Double> doubleStream = completableFutureStream.map(d -> {
            Double multiply = null;
            try {
                multiply = CompletableAction3.multiply(d.get());
            } catch (InterruptedException | ExecutionException e) {
                e.printStackTrace();
            }
            return multiply;
        });

        List<Double> doubleList = doubleStream.collect(toList());
        Optional.of(doubleList).ifPresent(System.out::println);


        // CompletableFuture join()  thenApply() 方法使用
        Stream<CompletableFuture<Double>> completableFutureStream2 = integers.stream().map(i -> {
            return CompletableFuture.supplyAsync(() -> CompletableAction3.getProductById(i), executorService);
        });

        Stream<CompletableFuture<Double>> completableFutureStream1 = completableFutureStream.map(future -> {
            return future.thenApply(CompletableAction3::multiply);
        });
        Stream<Double> doubleStream2 = completableFutureStream1.map(CompletableFuture::join);

        List<Double> doubleList2 = doubleStream.collect(toList());
        Optional.of(doubleList2).ifPresent(System.out::println);
    }

    // 模擬取到數據以後進行相關業務操做,這裏模擬取到的數據 * 10
    public static Double multiply(Double d) {
        try {
            Thread.sleep(3000);
            return d * 10d;
        } catch (InterruptedException e) {
            e.printStackTrace();
            return null;
        }
    }

    // 模擬根據 ID 查詢商品
    public static Double getProductById(int i) {
        Double aDouble = CompletableAction.get();
        System.out.println(aDouble);
        return aDouble;
    }
}

 

    經常使用api:spa

    

package future;

import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.stream.Stream;

import static java.util.stream.Collectors.toList;

public class CompletableAction4 {

    public static void main(String[] args) throws InterruptedException {

        ExecutorService executorService = Executors.newFixedThreadPool(2, (r) -> {
           Thread thread = new Thread(r);
           return thread;
        });

        // thenApply()supplyAsync()whenComplete() 方法
        CompletableFuture.supplyAsync(() -> 1d)
                .thenApply(CompletableAction3::multiply)
                .whenComplete((v, t) -> {
                    System.out.println(v);
                });

        // thenRun() 方法
        CompletableFuture.supplyAsync(() -> 1)
                .thenRun(() -> {
                    System.out.println("======== use the thenRun() method");
                });

        // thenAccept() 方法
        CompletableFuture.supplyAsync(() -> 1)
                .thenAccept(r -> {
                    System.out.println("======== use the thenAccept() method: " + r);
                });

        // thenCompose() 方法
        CompletableFuture.supplyAsync(() -> 1)
                .thenApply(r -> r * 10d)
                .thenCompose(r -> CompletableFuture.supplyAsync(() -> r))
                .thenAccept(r -> {
                    System.out.println("======== use the thenCompose() method: " + r);
                });

        // thenCombine() 方法, 有返回值
        CompletableFuture.supplyAsync(() -> 1)
                .thenCombine(CompletableFuture.supplyAsync(() -> 2d), (r1, r2) -> {
                    System.out.println("======== use the thenCombine() method: " + (r1 + r2));
                    return r1 + r2;
                });

        // thenAcceptBoth() 方法,無返回值。和 then Combine() 方法相似,
        CompletableFuture.supplyAsync(()-> 1)
                .thenAcceptBoth(CompletableFuture.supplyAsync(() -> 3d), (r1, r2) -> {
                    System.out.println("======== use the thenAcceptBoth() method: " + (r1 + r2));
                });

        // runAfterBoth() 方法,直接執行完。只有兩個 CompletableFuture 對象實例化完成才返回
        CompletableFuture.supplyAsync(() -> {
//            try {
//                Thread.sleep(2000);
//                System.out.println("sleep 2s");
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            return 1;
        }).runAfterBoth(CompletableFuture.supplyAsync(() -> {
//            try {
//                Thread.sleep(3000);
//                System.out.println("sleep 3s");
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            return 2;
        }), () -> System.out.println("======== use the runAfterBoth() method"));

        // runAfterEither() 方法,直接執行完。只要有一個 CompletableFuture 對象實例化完成就直接返回
        CompletableFuture.supplyAsync(() -> {
//            try {
//                Thread.sleep(2000);
//                System.out.println("sleep 2s");
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            return 1;
        }).runAfterEither(CompletableFuture.supplyAsync(() -> {
//            try {
//                Thread.sleep(3000);
//                System.out.println("sleep 3s");
//            } catch (InterruptedException e) {
//                e.printStackTrace();
//            }
            return 2;
        }), () -> System.out.println("======== use the runAfterEither() method"));

        // acceptEither() 方法
        CompletableFuture.supplyAsync(() -> 1)
                .acceptEither(CompletableFuture.supplyAsync(() -> 2), (r1) -> {
                    System.out.println("======== use the acceptEither() method" + r1);
                });

        // acceptEither() 方法
        CompletableFuture.supplyAsync(() -> 1)
                .applyToEither(CompletableFuture.supplyAsync(() -> 2), (r1) -> r1 * 10d)
                .whenComplete((v, t) -> {
                    System.out.println("======== use the applyToEither() method" + v);
                });

        // allOf() 方法  必須等全部的實例化完,才返回。前提條件是線程沒結束
        List<Integer> integers = Arrays.asList(1, 2, 3, 4, 5);
        Stream<CompletableFuture<Integer>> completableFutureStream = integers.stream().map(i -> CompletableFuture.supplyAsync(() -> {
            System.out.println(i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return i;
        }));
        List<CompletableFuture<Integer>> completableFutureList = completableFutureStream.collect(toList());
        CompletableFuture[] completableFutures = completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]);

        CompletableFuture.allOf(completableFutures)
                .thenApply(r -> 10d)
                .whenComplete((v, t) -> {
            System.out.println("======== use the allOf() method" + v);
            t.printStackTrace();
        });

        // anyOf() 方法, 只要有一個實例化完直接返回,前提條件是線程沒結束
        List<Integer> integers2 = Arrays.asList(1, 2, 3, 4, 5);
        Stream<CompletableFuture<Integer>> completableFutureStream2 = integers2.stream().map(i -> CompletableFuture.supplyAsync(() -> {
            System.out.println(i);
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return i;
        }));
        List<CompletableFuture<Integer>> completableFutureList2 = completableFutureStream2.collect(toList());
        CompletableFuture[] completableFutures2 = completableFutureList.toArray(new CompletableFuture[completableFutureList2.size()]);

        CompletableFuture.anyOf(completableFutures2)
                .thenApply(r -> 10d)
                .whenComplete((v, t) -> {
                    System.out.println("======== use the anyOf() method" + v);
                });

        Thread.currentThread().join();
    }
}

相關文章
相關標籤/搜索