組合式異步編程java
package com.example.demo.future; import org.junit.jupiter.api.Test; import java.util.List; import java.util.UUID; import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; public class FutureTest { /** * 運程數據 * * @return */ private static String data() { try { Thread.sleep(1020); //擬處理遠程處理時間 } catch (InterruptedException e) { e.printStackTrace(); } return String.format("{id:%s,orderNo:'%s'}", Math.round(Math.random() * 10000), UUID.randomUUID().toString()); } /** * 獲取遠程的訂單 * * @return */ Future<String> getRemoteOrder() { ExecutorService executorService = Executors.newCachedThreadPool(); return executorService.submit(FutureTest::data); } /** * 第二種實現 * * @return */ Future<String> getRemoteOrderII() { CompletableFuture<String> future = new CompletableFuture<>(); new Thread(() -> { try { future.complete(data()); //正常獲取值 } catch (Exception e) { future.completeExceptionally(e); //以異常拋出結束 } }).start(); return future; } /** * 第三種實現 * 這種方法徹底等價於第二種 ,低層會交由 ForkJoinPool 去執行。你也能夠使用本身的 Executor * * @return */ Future<String> getRemoteOrderIII() { return CompletableFuture.supplyAsync(FutureTest::data); } @Test public void test1() throws InterruptedException, ExecutionException, TimeoutException { // 傳統方法 Future<String> userDataFuture = getRemoteOrder(); userDataFuture.isDone(); //是否請求完 // orderFuture.get(); //禁止這樣寫,防止線程非正常結束後,這裏將會無限阻塞 String i = userDataFuture.get(10, TimeUnit.SECONDS); //最多阻塞10秒,就算未完成,也返回結果 //CompletableFuture 方法 Future<String> orderFuture = getRemoteOrderII(); orderFuture.get(5, TimeUnit.SECONDS); //最多阻塞10秒,就算未完成,也返回結果 } /** * 併發測試1 */ @Test public void test2() { //順序進行 long beginTime = System.currentTimeMillis(); List<String> orders = IntStream.range(1, 5) .mapToObj(i -> data()) .collect(Collectors.toList()); long endTime = System.currentTimeMillis(); System.out.println("串行發費時間 :" + (endTime - beginTime) + " ms"); //4090 ms //並行 beginTime = System.currentTimeMillis(); orders = IntStream.range(1, 5) .parallel() .mapToObj(i -> data()) .collect(Collectors.toList()); endTime = System.currentTimeMillis(); System.out.println("並行發費時間 :" + (endTime - beginTime) + " ms"); //1027 ms //並行,第二種方式 beginTime = System.currentTimeMillis(); List<CompletableFuture<String>> futures = IntStream.range(1, 5) .mapToObj(i -> CompletableFuture.supplyAsync(FutureTest::data)) .collect(Collectors.toList()); orders = futures.stream().map(CompletableFuture::join) //CompletableFuture.join 與 CompletableFuture.get 同樣,只是join不會拋出任何檢測到的異常 .collect(Collectors.toList()); endTime = System.currentTimeMillis(); System.out.println("並行II發費時間 :" + (endTime - beginTime) + " ms"); //1023 ms } }
1.並行第二種式第一種都是使用ForkJoin,因此速度都差很少。但第二種能夠設置你本身的線程執行器,改變線程池的數量 2.線程池大小與處理器的利用率之比能夠使用下面的公式進行估算: N(threads) = N(cpu) * U(cpu) * (1+W/C) N(cpu): 是處理器的核的數目,能夠 經過Runtime.getRuntime().availableProcessors() 獲得 U(cpu): 是指望的cpu利用率(該值應該介於 0 - 1之間) W/C : 是等待時間 與計算時間的比率