CompletableFuture的執行線程

默認使用的線程池

不傳executor時默認使用ForkJoinPool.commonPool()html

IntStream.range(0, 15).parallel().forEach(i -> {
            System.out.println(Thread.currentThread());
        });

輸出java

Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
Thread[ForkJoinPool.commonPool-worker-3,5,main]
Thread[ForkJoinPool.commonPool-worker-1,5,main]
Thread[main,5,main]
Thread[ForkJoinPool.commonPool-worker-2,5,main]
  • commonPoolapi

This pool is statically constructed; its run state is unaffected by attempts to shutdown() or shutdownNow(). However this pool and any ongoing processing are automatically terminated upon program System.exit(int). Any program that relies on asynchronous task processing to complete before program termination should invoke commonPool().awaitQuiescence, before exit.oracle

thenRun測試實例1

不設定executor

@Test
    public void testRunOnCommonPool() throws InterruptedException {

        CompletionStage<Void> futurePrice = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("test1:1 - runAsync(runnable), job thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
                }
        );

        System.out.println("test1:flag1");

        futurePrice.thenRun(() -> {
            System.out.println("test1:2 - thenRun(runnable)), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        });

        System.out.println("test1:flag2");

        futurePrice.thenRunAsync(() -> {
            System.out.println("test1:3 - thenRunAsync(runnable), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        });

        TimeUnit.SECONDS.sleep(100);

    }

輸出異步

test1:flag1
test1:flag2
test1:1 - runAsync(runnable), job thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:2 - thenRun(runnable)), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]
test1:3 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]

設定executor

@Test
    public void testRunOnExecutors() throws InterruptedException {
        CompletionStage<Void> futurePrice = CompletableFuture.runAsync(() -> {
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("test2:1 - runAsync(runnable, executor), job thread: " + Thread.currentThread());
            //Thread[pool-1-thread-1,5,main]
        }, executor);

        System.out.println("test2:flag1");

        futurePrice.thenRunAsync(() -> {
            System.out.println("test2:2 - thenRunAsync(runnable), action thread: " + Thread.currentThread());
            //Thread[pool-1-thread-1,5,main]
        });

        System.out.println("test2:flag2");

        futurePrice.thenRun(() -> {
            System.out.println("test2:3 - thenRun(runnable), action thread: " + Thread.currentThread());
            //Thread[pool-1-thread-2,5,main]
        });

        futurePrice.thenRunAsync(() -> {
            System.out.println("test2:4 - thenRunAsync(runnable, executor), action thread: " + Thread.currentThread());
            //Thread[ForkJoinPool.commonPool-worker-1,5,main]
        }, executor);

        TimeUnit.SECONDS.sleep(100);
    }

輸出async

test2:flag1
test2:flag2
test2:1 - runAsync(runnable, executor), job thread: Thread[pool-1-thread-1,5,main]
test2:3 - thenRun(runnable), action thread: Thread[pool-1-thread-1,5,main]
test2:4 - thenRunAsync(runnable, executor), action thread: Thread[pool-1-thread-2,5,main]
test2:2 - thenRunAsync(runnable), action thread: Thread[ForkJoinPool.commonPool-worker-1,5,main]

thenRun測試實例2

沒有sleep

@Test
    public void testThenRun(){
        CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("f1 thread:"+Thread.currentThread().getName());
            return "zero";
        }, executor);
        f1.thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("then run thread:"+Thread.currentThread().getName());
                System.out.println("finished");
            }
        });
        TimeUnit.SECONDS.sleep(10);
    }
  • 使用executor的輸出ide

f1 thread:pool-1-thread-1
then run thread:main
finished
  • 不使用executor的輸出函數

f1 thread:ForkJoinPool.commonPool-worker-1
then run thread:main
finished

加上sleep

CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> {
            System.out.println("f1 thread:"+Thread.currentThread().getName());
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "zero";
        }, executor);
        f1.thenRun(new Runnable() {
            @Override
            public void run() {
                System.out.println("then run thread:"+Thread.currentThread().getName());
                System.out.println("finished");
            }
        });
        TimeUnit.SECONDS.sleep(10);
  • 使用executor的輸出測試

f1 thread:pool-1-thread-1
then run thread:pool-1-thread-1
finished
  • 不使用executor的輸出ui

f1 thread:ForkJoinPool.commonPool-worker-1
then run thread:ForkJoinPool.commonPool-worker-1
finished

小結

不帶 async 的 thenRun() 方法仍然是一個異步方法,多是使用main線程,commonPool的線程或者是executor的線程。

doc

相關文章
相關標籤/搜索