java8的CompletableFuture使用實例

thenApply(等待並轉化future)

@Test
    public void testThen() throws ExecutionException, InterruptedException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            return "zero";
        }, executor);

        CompletableFuture<Integer> f2 = f1.thenApply(new Function<String, Integer>() {

            @Override
            public Integer apply(String t) {
                System.out.println(2);
                return Integer.valueOf(t.length());
            }
        });

        CompletableFuture<Double> f3 = f2.thenApply(r -> r * 2.0);
        System.out.println(f3.get());
    }

thenAccept與thenRun(監聽future完成)

/**
     * future完成處理,可獲取結果
     */
    @Test
    public void testThenAccept(){
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            return "zero";
        }, executor);
        f1.thenAccept(e -> {
            System.out.println("get result:"+e);
        });
    }

    /**
     * future完成處理
     */
    @Test
    public void testThenRun(){
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            return "zero";
        }, executor);
        f1.thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("finished");
            }
        });
    }

thenCompose(flatMap future)

/**
     * compose至關於flatMap,避免CompletableFuture<CompletableFuture<String>>這種
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testThenCompose() throws ExecutionException, InterruptedException {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            return "zero";
        }, executor);
        CompletableFuture<CompletableFuture<String>> f4 = f1.thenApply(CompletableFutureTest::calculate);
        System.out.println("f4.get:"+f4.get().get());

        CompletableFuture<String> f5 = f1.thenCompose(CompletableFutureTest::calculate);
        System.out.println("f5.get:"+f5.get());

        System.out.println(f1.get());
    }

    public static CompletableFuture<String> calculate(String input) {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
            System.out.println(input);
            return input + "---" + input.length();
        }, executor);
        return future;
    }

thenCombine與thenAcceptBoth

  • thenCombine(組合兩個future,有返回值)html

/**
     * thenCombine用於組合兩個併發的任務,產生新的future有返回值
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testThenCombine() throws ExecutionException, InterruptedException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f1 start to sleep at:"+System.currentTimeMillis());
                Thread.sleep(1000);
                System.out.println("f1 finish sleep at:"+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "zero";
        }, executor);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f2 start to sleep at:"+System.currentTimeMillis());
                Thread.sleep(3000);
                System.out.println("f2 finish sleep at:"+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "hello";
        }, executor);

        CompletableFuture<String> reslutFuture =
                f1.thenCombine(f2, new BiFunction<String, String, String>() {

                    @Override
                    public String apply(String t, String u) {
                        System.out.println("f3 start to combine at:"+System.currentTimeMillis());
                        return t.concat(u);
                    }
                });

        System.out.println(reslutFuture.get());//zerohello
        System.out.println("finish combine at:"+System.currentTimeMillis());
    }
  • thenAcceptBoth(組合兩個future,沒有返回值)java

/**
     * thenAcceptBoth用於組合兩個併發的任務,產生新的future沒有返回值
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testThenAcceptBoth() throws ExecutionException, InterruptedException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f1 start to sleep at:"+System.currentTimeMillis());
                TimeUnit.SECONDS.sleep(1);
                System.out.println("f1 stop sleep at:"+System.currentTimeMillis());
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return "zero";
        }, executor);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f2 start to sleep at:"+System.currentTimeMillis());
                TimeUnit.SECONDS.sleep(3);
                System.out.println("f2 stop sleep at:"+System.currentTimeMillis());
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return "hello";
        }, executor);

        CompletableFuture<Void> reslutFuture = f1.thenAcceptBoth(f2, new BiConsumer<String, String>() {
            @Override
            public void accept(String t, String u) {
                System.out.println("f3 start to accept at:"+System.currentTimeMillis());
                System.out.println(t + " over");
                System.out.println(u + " over");
            }
        });

        System.out.println(reslutFuture.get());
        System.out.println("finish accept at:"+System.currentTimeMillis());

    }

applyToEither與acceptEither

  • applyToEither(取2個future中最早返回的,有返回值)編程

/**
     * 當任意一個CompletionStage 完成的時候,fn 會被執行,它的返回值會當作新的CompletableFuture<U>的計算結果
     * @throws ExecutionException
     * @throws InterruptedException
     */
    @Test
    public void testApplyToEither() throws ExecutionException, InterruptedException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f1 start to sleep at:"+System.currentTimeMillis());
                TimeUnit.SECONDS.sleep(5);
                System.out.println("f1 stop sleep at:"+System.currentTimeMillis());
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return "fromF1";
        }, executor);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f2 start to sleep at:"+System.currentTimeMillis());
                TimeUnit.SECONDS.sleep(2);
                System.out.println("f2 stop sleep at:"+System.currentTimeMillis());
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return "fromF2";
        }, executor);

        CompletableFuture<String> reslutFuture = f1.applyToEither(f2,i -> i.toString());
        System.out.println(reslutFuture.get()); //should not be null , wait for complete
    }
  • acceptEither(取2個future中最早返回的,無返回值)併發

/**
     * 取其中返回最快的一個
     * 當任意一個CompletionStage 完成的時候,action 這個消費者就會被執行。這個方法返回 CompletableFuture<Void>
     */
    @Test
    public void testAcceptEither() throws ExecutionException, InterruptedException {
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f1 start to sleep at:"+System.currentTimeMillis());
                TimeUnit.SECONDS.sleep(3);
                System.out.println("f1 stop sleep at:"+System.currentTimeMillis());
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return "zero";
        }, executor);
        CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f2 start to sleep at:"+System.currentTimeMillis());
                TimeUnit.SECONDS.sleep(5);
                System.out.println("f2 stop sleep at:"+System.currentTimeMillis());
            } catch (Exception e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
            return "hello";
        }, executor);

        CompletableFuture<Void> reslutFuture = f1.acceptEither(f2,r -> {
            System.out.println("quicker result:"+r);
        });
        reslutFuture.get(); //should be null , wait for complete

    }

allOf與anyOf

  • allOf(等待全部future返回)app

/**
     * 等待多個future返回
     */
    @Test
    public void testAllOf() throws InterruptedException {
        List<CompletableFuture<String>> futures = IntStream.range(1,10)
                .mapToObj(i ->
                        longCost(i)).collect(Collectors.toList());
        final CompletableFuture<Void> allCompleted = CompletableFuture.allOf(futures.toArray(new CompletableFuture[]{}));
        allCompleted.thenRun(() -> {
            futures.stream().forEach(future -> {
                try {
                    System.out.println("get future at:"+System.currentTimeMillis()+", result:"+future.get());
                } catch (InterruptedException | ExecutionException e) {
                    e.printStackTrace();
                }
            });

        });
        Thread.sleep(100000); //wait
    }
  • anyOf(取多個future當中最快的一個返回)ide

/**
     * 等待多個future當中最快的一個返回
     * @throws InterruptedException
     */
    @Test
    public void testAnyOf() throws InterruptedException {
        List<CompletableFuture<String>> futures = IntStream.range(1,10)
                .mapToObj(i ->
                        longCost(i)).collect(Collectors.toList());
        final CompletableFuture<Object> firstCompleted = CompletableFuture.anyOf(futures.toArray(new CompletableFuture[]{}));
        firstCompleted.thenAccept((Object result) -> {
            System.out.println("get at:"+System.currentTimeMillis()+",first result:"+result);
        });
    }

    private CompletableFuture<String> longCost(long i){
        return CompletableFuture.supplyAsync(() -> {
            try {
                System.out.println("f"+i+" start to sleep at:"+System.currentTimeMillis());
                Thread.sleep(3000);
                System.out.println("f"+i+" stop sleep at:"+System.currentTimeMillis());
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return String.valueOf(i);
        },executor);
    }

doc

相關文章
相關標籤/搜索