CompletableFuture

CompletableFuture

  • 建立異步計算,並獲取計算結果
  • 使用非阻塞操做提高吞量
  • 設計和實現異步 API
  • 以異步的方式使用同步的 API
  • 對兩個或多個異步操做進行流水線和合並操做
  • 處理異步操做的完成狀態

現狀

常常出如今等待某條 SQL 執行完成後,再繼續執行下一條 SQL ,而這兩條 SQL 自己是並沒有關係的,能夠同時進行執行的。咱們但願可以兩條 SQL 同時進行處理,而不是等待其中的某一條 SQL 完成後,再繼續下一條。java

由此能夠擴展,在不少任務下,咱們須要執行兩個任務但這兩個任務並無先後的關聯關係,咱們也但願兩個任務可以同時執行,而後再將執行結果匯聚就能夠了。數據庫

Future

Future 的功能

future 經過提交一個 callable 任務給線程池,線程池後臺啓動其餘線程去執行,而後再調用 get() 方法獲取結果編程

private void test() {
  ExecutorService executor = Executors.newCachedThreadPool();
  Future<Integer> future = executor.submit(() -> sleep(1));
  try {
    Integer integer = future.get(3, TimeUnit.SECONDS);
    System.out.println(integer);
  } catch (InterruptedException e) {
    // 當前線在等待中被中斷
    e.printStackTrace();
  } catch (ExecutionException e) {
    // 任務執行中的異常
    e.printStackTrace();
  } catch (TimeoutException e) {
    // 超時
    e.printStackTrace();
  }
}

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return 1;
}
複製代碼

該方式存在的問題,若是 sleep 執行超過 3 秒鐘,future 將沒法拿到返回結果。固然,Future 提供了一個無參的get 方法,能夠一直等待結果。不過仍是建議使用帶超時參數的 get 方法,同時定義好超時的處理方法。緩存

Future 不具有的功能

  • 將兩個異步計算合併爲一個,這兩個異步計算之間相互獨立,同時第二個又依賴於第一個的結果。
  • 等待 Future 集合中的全部任務都完成。
  • 僅等待 Future 集合中最快結束的任務完成,並返回它的結果。
  • 經過編程方式完成一個 Future 任務的執行。
  • 應對 Future 的完成事件,即當 Future 的完成事件發生時會收到通知,並能使用 Future 計算的結果進行下一步的操做,不僅是簡單地阻塞等待操做的結果。

CompletableFuture

  • 提供異步 API
  • 同步變異步
  • 以響應式方式處理異步操做的完成事件

同步

調用某個方法,調用方在被調用方運行的過程當中會等待,直到被調用方運行結束後返回,調用方取得被調用方的返回值並繼續運行。即便調用方和被調用方不在同一個線程中運行,調用方仍是須要等待被調用方結束才運行,這就是阻塞式調用。併發

異步

異步 API 調用後會直接返回,將計算任務交給其餘線程來進行。其餘線程執行完成後,再將結果返回給調用方。less

使用異步 API異步

public void test(){
  CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
  new Thread(() -> {
    int sleep = sleep(1);
    completableFuture.complete(sleep);
  }).start();
  CompletableFuture<Integer> completableFuture1 = new CompletableFuture<>();
  new Thread(() -> {
    int sleep = sleep(2);
    completableFuture1.complete(sleep);
  }).start();
  Integer integer = null;
  Integer integer1 = null;
  try {
    integer = completableFuture.get();
    integer1 = completableFuture1.get();
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  }
  System.out.println(integer + "....CompletableFuture.." + integer1);

  Instant end = Instant.now();

  Duration duration = Duration.between(start, end);
  long l = duration.toMillis();
  System.err.println(l);
}	

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }

  return timeout;
}
複製代碼

異步處理async

上面代碼的問題是,若是在線程內發生了異常,如何在外部的調用中被發現,同時去處理呢?正常的狀況是,線程內發生異常,會直接被封鎖在線程內,而最終線程會被殺死,那麼 get 方法一直會阻塞。測試

此時就不該該使用 get() 方法,而是使用帶有超時參數的 get 方法,而且在線程內,將異常傳遞迴調用方。ui

new Thread(() -> {
  try {
    int sleep = sleep(2);
    completableFuture1.complete(sleep);
  } catch (Exception e) {
    completableFuture1.completeExceptionally(e);
  }
}).start();
複製代碼

completableFuture1.completeExceptionally(e); 將異常傳遞出來,在 ExecutionException 中會被捕獲,而後對其進行處理便可。

try {
  integer = completableFuture.get();
  integer1 = completableFuture1.get();
} catch (InterruptedException e) {
  e.printStackTrace();
} catch (ExecutionException e) {
  e.printStackTrace();
}
複製代碼

示例:

public void test(){
  CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
  new Thread(() -> {
    try {
      throw new RuntimeException("故意拋出的異常...");
    } catch (Exception e) {
      completableFuture.completeExceptionally(e);
    }
  }).start();
  Integer integer = null;
  try {
    integer = completableFuture.get(3, TimeUnit.SECONDS);
  } catch (InterruptedException e) {
    e.printStackTrace();
  } catch (ExecutionException e) {
    e.printStackTrace();
  } catch (TimeoutException e) {
    e.printStackTrace();
  }
  System.out.println(integer + "....CompletableFuture.." );
  Instant end = Instant.now();
  Duration duration = Duration.between(start, end);
  long l = duration.toMillis();
  System.err.println(l);
}
複製代碼

此時會收到的異常:

java.util.concurrent.ExecutionException: java.lang.RuntimeException: 故意拋出的異常...
	at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
	at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
	at com.example.demo.me.sjl.service.UserService.test(UserService.java:92)
	at com.example.demo.me.sjl.controller.UserController.test(UserController.java:20)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at ....
複製代碼

使用工廠方法 supplyAsync 建立 CompletableFuture

CompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> sleep(1));
複製代碼

相比於 new 的方式,更加優雅、簡潔,而且不用顯式的建立線程(new Thread) 操做。默認會交由 ForkJoinPoll 池中的某個執行線程運行,同時也提供了重載的方法,指定 Executor 。

public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
  return asyncSupplyStage(asyncPool, supplier);
}
複製代碼

如何肯定默認的線程數量:

  • 若是配置了系統屬性 java.util.concurrent.ForkJoinPool.common.parallelism 則取該值,轉換成 int 做爲線程數量

    String pp = System.getProperty
                    ("java.util.concurrent.ForkJoinPool.common.parallelism");
    if (pp != null)
                    parallelism = Integer.parseInt(pp);
    複製代碼
  • 沒有配置該值,則取 Runtime.getRuntime().availableProcessors() 值做爲線程數量

    if (parallelism < 0 && // default 1 less than #cores
                (parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
                parallelism = 1;
    複製代碼

    parallelism 初始值爲 -1

調整線程池的大小

N_{threads} = N_{cpu} * U_{cpu} * (1+W/C)
  • 其中:$$N_{cppu}$$ 是處理器的核的數目,能夠經過 Runtime.getRuntime().availableProcessors 獲得

  • U_{cpu}$$ 是指望的 CPU 利用率,該值介於 0-1之間

上面是一個參考公式《Java併發編程實戰》(mng.bz/979c )

這是計算出的理論值,不過咱們在使用時,須要考慮實際狀況,好比我有 5 個並行任務,那麼我須要開啓 5 個線程來分別進行執行,多了會千萬浪費,少了達不到併發的效果。此時咱們須要 5 個線程。

插入數據庫時

public void save(){
    CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
      UserEntity entity = UserEntity.builder()
        .id(1112)
        .userName("施傑靈")
        .password("abc1213")
        .birthday("2018-08-08")
        .createUser("1")
        .createTime(LocalDateTime.now())
        .updateUser("2")
        .updateTime(LocalDateTime.now())
        .build();
      return userRepository.save(entity);
    });

    CompletableFuture<UserEntity> completableFuture1 = CompletableFuture.supplyAsync(() -> {
      UserEntity entity = UserEntity.builder()
        .id(223)
        .userName("施傑靈1")
        .password("abc12131")
        .birthday("2018-08-18")
        .createUser("11")
        .createTime(LocalDateTime.now())
        .updateUser("21")
        .updateTime(LocalDateTime.now())
        .build();
      if (true) {
        throw new RuntimeException("故意拋出的異常...");
      }
      return userRepository.save(entity);
    });

    System.out.println(completableFuture.join());
    System.out.println(completableFuture1.join());
}
複製代碼

測試結果,上面那條數據正常插入到數據庫中,下面的數據插入失敗。事務並無回滾。

將兩個異步計算合併爲一個,依賴 (thenCompose)

將兩個異步計算合併爲一個,這兩個異步計算之間相互獨立,同時第二個又依賴於第一個的結果

public void test(){
  CompletableFuture<Integer> compose = CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCompose(
    	(x) -> CompletableFuture.supplyAsync(() -> sleep(x))
  	);
}

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return timeout;
}
複製代碼

從上面的代碼中,能夠看到在進行計算的時候,是使用到了前面的返回值 x ,整個任務的運行時間是 4 秒。

將兩個異步計算合併爲一個,不管是否依賴 (thenCombine)

public void test() {
  CompletableFuture<Integer> combine = CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCombine(
    	CompletableFuture.supplyAsync(() -> sleep(1)),
    	(t1, t2) -> t1 + t2
  );
}

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }
  return timeout;
}
複製代碼
  • thenCombine
  • thenCombineAsync

兩個方法接收的參數是一致的,區別在於他們接收的第二個參數:BiFunction 是否會在提交到線程池中,由另一個任務以異步的方式執行。thenCombine 不會以異步方式執行 BiFunctionthenCombineAsync 會以異步的方式執行。

什麼時候使用 Async 後綴的方法?

當咱們進行合併的方法是一個耗時的方法時,就儘量的考慮使用 Async 後綴的方法。

在插入數據庫時

咱們日常的操做是,插入數據庫時,若是兩個操做中,其中一個操做發生異常,是否會回滾?

@Transactional(rollbackFor = Exception.class)
public void save() {
  CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
    UserEntity entity = UserEntity.builder()
      .id(111)
      .userName("施傑靈")
      .password("abc1213")
      .birthday("2018-08-08")
      .createUser("1")
      .createTime(LocalDateTime.now())
      .updateUser("2")
      .updateTime(LocalDateTime.now())
      .build();
    return userRepository.save(entity);
  }).thenCombine(CompletableFuture.supplyAsync(() -> {
    UserEntity entity = UserEntity.builder()
      .id(222)
      .userName("施傑靈1")
      .password("abc12131")
      .birthday("2018-08-18")
      .createUser("11")
      .createTime(LocalDateTime.now())
      .updateUser("21")
      .updateTime(LocalDateTime.now())
      .build();
    return userRepository.save(entity);
  }), (a, b) -> {
    System.out.println(a);
    System.out.println(b);
    return a;
  });

  UserEntity join = completableFuture.join();
  System.out.println(join);
}

複製代碼

通過實際測試,第二個任務拋出異常,是會回滾的。

CompletableFuture 的 Completion 事件

Java 8的CompletableFuture 經過thenAccept 方法提供了這一功能,它接收CompletableFuture 執行完畢後的返回值作參數。

public void test() {
  CompletableFuture.supplyAsync(() -> sleep(2))
    .thenCombineAsync(
    CompletableFuture.supplyAsync(() -> sleep(1)),
    (t1, t2) -> t1 + t2
  ).thenAccept((t) -> System.out.println(t + "------"));
}

private int sleep(int timeout) {
  try {
    TimeUnit.SECONDS.sleep(timeout);
  } catch (InterruptedException e) {
    e.printStackTrace();
  }

  return timeout;
}
複製代碼

線程池(ThreadPoolExecutor)

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ;
複製代碼
  • int corePoolSize : 核心池的大小,這個參數與後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;

  • int maximumPoolSize : 線程池最大線程數,它表示在線程池中最多能建立多少個線程;

  • long keepAliveTime : 表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用,直到線程池中的線程數不大於corePoolSize:即當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize;可是若是調用了**allowCoreThreadTimeOut(boolean)**方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0;

  • TimeUnit unit : 參數keepAliveTime的時間單位

  • BlockingQueue<Runnable> workQueue : 一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇

    • ArrayBlockingQueue

    • LinkedBlockingQueue

  • PriorityBlockingQueue

    • SynchronousQueue

    ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • ThreadFactory threadFactory

  • RejectedExecutionHandler handler : 實現RejectedExecutionHandler接口,可自定義處理器

相關文章
相關標籤/搜索