java 8 CompletableFuture (1)

組合式異步編程java

 

package com.example.demo.future;

import org.junit.jupiter.api.Test;

import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

public class FutureTest {

    /**
     * 運程數據
     *
     * @return
     */
    private static String data() {
        try {

            Thread.sleep(1020); //擬處理遠程處理時間
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return String.format("{id:%s,orderNo:'%s'}", 
               Math.round(Math.random() * 10000), UUID.randomUUID().toString());
    }

    /**
     * 獲取遠程的訂單
     *
     * @return
     */
    Future<String> getRemoteOrder() {
        ExecutorService executorService = Executors.newCachedThreadPool();
        return executorService.submit(FutureTest::data);
    }

    /**
     * 第二種實現
     *
     * @return
     */
    Future<String> getRemoteOrderII() {
        CompletableFuture<String> future = new CompletableFuture<>();
        new Thread(() -> {
            try {
                future.complete(data());  //正常獲取值
            } catch (Exception e) {
                future.completeExceptionally(e);    //以異常拋出結束
            }
        }).start();
        return future;
    }

    /**
     * 第三種實現
     * 這種方法徹底等價於第二種 ,低層會交由 ForkJoinPool 去執行。你也能夠使用本身的 Executor
     *
     * @return
     */
    Future<String> getRemoteOrderIII() {
        return CompletableFuture.supplyAsync(FutureTest::data);
    }

    @Test
    public void test1() throws InterruptedException, ExecutionException, TimeoutException {

        // 傳統方法
        Future<String> userDataFuture = getRemoteOrder();
        userDataFuture.isDone();  //是否請求完
        //   orderFuture.get();   //禁止這樣寫,防止線程非正常結束後,這裏將會無限阻塞
        String i = userDataFuture.get(10, TimeUnit.SECONDS); //最多阻塞10秒,就算未完成,也返回結果


        //CompletableFuture 方法
        Future<String> orderFuture = getRemoteOrderII();
        orderFuture.get(5, TimeUnit.SECONDS);  //最多阻塞10秒,就算未完成,也返回結果


    }

    /**
     * 併發測試1
     */
    @Test
    public void test2() {
        //順序進行
        long beginTime = System.currentTimeMillis();

        List<String> orders = IntStream.range(1, 5)
                .mapToObj(i -> data())
                .collect(Collectors.toList());

        long endTime = System.currentTimeMillis();

        System.out.println("串行發費時間 :" + (endTime - beginTime) + " ms"); //4090 ms

        //並行
        beginTime = System.currentTimeMillis();

        orders = IntStream.range(1, 5)
                .parallel()
                .mapToObj(i -> data())
                .collect(Collectors.toList());

        endTime = System.currentTimeMillis();

        System.out.println("並行發費時間 :" + (endTime - beginTime) + " ms"); //1027 ms


        //並行,第二種方式
        beginTime = System.currentTimeMillis();

        List<CompletableFuture<String>> futures = IntStream.range(1, 5)
                .mapToObj(i -> CompletableFuture.supplyAsync(FutureTest::data))
                .collect(Collectors.toList());

        orders = futures.stream().map(CompletableFuture::join) 
 //CompletableFuture.join 與 CompletableFuture.get 同樣,只是join不會拋出任何檢測到的異常
                .collect(Collectors.toList());

        endTime = System.currentTimeMillis();


        System.out.println("並行II發費時間 :" + (endTime - beginTime) + " ms"); //1023 ms
    }

   

}

 

1.並行第二種式第一種都是使用ForkJoin,因此速度都差很少。但第二種能夠設置你本身的線程執行器,改變線程池的數量
  2.線程池大小與處理器的利用率之比能夠使用下面的公式進行估算:
  N(threads) = N(cpu) * U(cpu) * (1+W/C)
   N(cpu): 是處理器的核的數目,能夠 經過Runtime.getRuntime().availableProcessors() 獲得
   U(cpu):  是指望的cpu利用率(該值應該介於 0 - 1之間)
   W/C :    是等待時間 與計算時間的比率
相關文章
相關標籤/搜索