常常出如今等待某條 SQL 執行完成後,再繼續執行下一條 SQL ,而這兩條 SQL 自己是並沒有關係的,能夠同時進行執行的。咱們但願可以兩條 SQL 同時進行處理,而不是等待其中的某一條 SQL 完成後,再繼續下一條。java
由此能夠擴展,在不少任務下,咱們須要執行兩個任務但這兩個任務並無先後的關聯關係,咱們也但願兩個任務可以同時執行,而後再將執行結果匯聚就能夠了。數據庫
future 經過提交一個 callable 任務給線程池,線程池後臺啓動其餘線程去執行,而後再調用 get() 方法獲取結果編程
private void test() {
ExecutorService executor = Executors.newCachedThreadPool();
Future<Integer> future = executor.submit(() -> sleep(1));
try {
Integer integer = future.get(3, TimeUnit.SECONDS);
System.out.println(integer);
} catch (InterruptedException e) {
// 當前線在等待中被中斷
e.printStackTrace();
} catch (ExecutionException e) {
// 任務執行中的異常
e.printStackTrace();
} catch (TimeoutException e) {
// 超時
e.printStackTrace();
}
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return 1;
}
複製代碼
該方式存在的問題,若是 sleep 執行超過 3 秒鐘,future 將沒法拿到返回結果。固然,Future 提供了一個無參的get 方法,能夠一直等待結果。不過仍是建議使用帶超時參數的 get 方法,同時定義好超時的處理方法。緩存
調用某個方法,調用方在被調用方運行的過程當中會等待,直到被調用方運行結束後返回,調用方取得被調用方的返回值並繼續運行。即便調用方和被調用方不在同一個線程中運行,調用方仍是須要等待被調用方結束才運行,這就是阻塞式調用。併發
異步 API 調用後會直接返回,將計算任務交給其餘線程來進行。其餘線程執行完成後,再將結果返回給調用方。less
使用異步 API異步
public void test(){
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
new Thread(() -> {
int sleep = sleep(1);
completableFuture.complete(sleep);
}).start();
CompletableFuture<Integer> completableFuture1 = new CompletableFuture<>();
new Thread(() -> {
int sleep = sleep(2);
completableFuture1.complete(sleep);
}).start();
Integer integer = null;
Integer integer1 = null;
try {
integer = completableFuture.get();
integer1 = completableFuture1.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
System.out.println(integer + "....CompletableFuture.." + integer1);
Instant end = Instant.now();
Duration duration = Duration.between(start, end);
long l = duration.toMillis();
System.err.println(l);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
複製代碼
異步處理async
上面代碼的問題是,若是在線程內發生了異常,如何在外部的調用中被發現,同時去處理呢?正常的狀況是,線程內發生異常,會直接被封鎖在線程內,而最終線程會被殺死,那麼 get 方法一直會阻塞。測試
此時就不該該使用 get() 方法,而是使用帶有超時參數的 get 方法,而且在線程內,將異常傳遞迴調用方。ui
new Thread(() -> {
try {
int sleep = sleep(2);
completableFuture1.complete(sleep);
} catch (Exception e) {
completableFuture1.completeExceptionally(e);
}
}).start();
複製代碼
completableFuture1.completeExceptionally(e);
將異常傳遞出來,在 ExecutionException
中會被捕獲,而後對其進行處理便可。
try {
integer = completableFuture.get();
integer1 = completableFuture1.get();
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
複製代碼
示例:
public void test(){
CompletableFuture<Integer> completableFuture = new CompletableFuture<>();
new Thread(() -> {
try {
throw new RuntimeException("故意拋出的異常...");
} catch (Exception e) {
completableFuture.completeExceptionally(e);
}
}).start();
Integer integer = null;
try {
integer = completableFuture.get(3, TimeUnit.SECONDS);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
System.out.println(integer + "....CompletableFuture.." );
Instant end = Instant.now();
Duration duration = Duration.between(start, end);
long l = duration.toMillis();
System.err.println(l);
}
複製代碼
此時會收到的異常:
java.util.concurrent.ExecutionException: java.lang.RuntimeException: 故意拋出的異常...
at java.util.concurrent.CompletableFuture.reportGet(CompletableFuture.java:357)
at java.util.concurrent.CompletableFuture.get(CompletableFuture.java:1915)
at com.example.demo.me.sjl.service.UserService.test(UserService.java:92)
at com.example.demo.me.sjl.controller.UserController.test(UserController.java:20)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at ....
複製代碼
supplyAsync
建立 CompletableFutureCompletableFuture<Integer> completableFuture = CompletableFuture.supplyAsync(() -> sleep(1));
複製代碼
相比於 new 的方式,更加優雅、簡潔,而且不用顯式的建立線程(new Thread) 操做。默認會交由 ForkJoinPoll
池中的某個執行線程運行,同時也提供了重載的方法,指定 Executor 。
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) {
return asyncSupplyStage(asyncPool, supplier);
}
複製代碼
如何肯定默認的線程數量:
若是配置了系統屬性 java.util.concurrent.ForkJoinPool.common.parallelism
則取該值,轉換成 int 做爲線程數量
String pp = System.getProperty
("java.util.concurrent.ForkJoinPool.common.parallelism");
if (pp != null)
parallelism = Integer.parseInt(pp);
複製代碼
沒有配置該值,則取 Runtime.getRuntime().availableProcessors()
值做爲線程數量
if (parallelism < 0 && // default 1 less than #cores
(parallelism = Runtime.getRuntime().availableProcessors() - 1) <= 0)
parallelism = 1;
複製代碼
parallelism 初始值爲 -1
調整線程池的大小
其中:$$N_{cppu}$$ 是處理器的核的數目,能夠經過 Runtime.getRuntime().availableProcessors
獲得
上面是一個參考公式《Java併發編程實戰》(mng.bz/979c )
這是計算出的理論值,不過咱們在使用時,須要考慮實際狀況,好比我有 5 個並行任務,那麼我須要開啓 5 個線程來分別進行執行,多了會千萬浪費,少了達不到併發的效果。此時咱們須要 5 個線程。
public void save(){
CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(1112)
.userName("施傑靈")
.password("abc1213")
.birthday("2018-08-08")
.createUser("1")
.createTime(LocalDateTime.now())
.updateUser("2")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
});
CompletableFuture<UserEntity> completableFuture1 = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(223)
.userName("施傑靈1")
.password("abc12131")
.birthday("2018-08-18")
.createUser("11")
.createTime(LocalDateTime.now())
.updateUser("21")
.updateTime(LocalDateTime.now())
.build();
if (true) {
throw new RuntimeException("故意拋出的異常...");
}
return userRepository.save(entity);
});
System.out.println(completableFuture.join());
System.out.println(completableFuture1.join());
}
複製代碼
測試結果,上面那條數據正常插入到數據庫中,下面的數據插入失敗。事務並無回滾。
將兩個異步計算合併爲一個,這兩個異步計算之間相互獨立,同時第二個又依賴於第一個的結果
public void test(){
CompletableFuture<Integer> compose = CompletableFuture.supplyAsync(() -> sleep(2))
.thenCompose(
(x) -> CompletableFuture.supplyAsync(() -> sleep(x))
);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
複製代碼
從上面的代碼中,能夠看到在進行計算的時候,是使用到了前面的返回值 x
,整個任務的運行時間是 4 秒。
public void test() {
CompletableFuture<Integer> combine = CompletableFuture.supplyAsync(() -> sleep(2))
.thenCombine(
CompletableFuture.supplyAsync(() -> sleep(1)),
(t1, t2) -> t1 + t2
);
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
複製代碼
兩個方法接收的參數是一致的,區別在於他們接收的第二個參數:BiFunction
是否會在提交到線程池中,由另一個任務以異步的方式執行。thenCombine
不會以異步方式執行 BiFunction
而 thenCombineAsync
會以異步的方式執行。
什麼時候使用 Async 後綴的方法?
當咱們進行合併的方法是一個耗時的方法時,就儘量的考慮使用 Async 後綴的方法。
咱們日常的操做是,插入數據庫時,若是兩個操做中,其中一個操做發生異常,是否會回滾?
@Transactional(rollbackFor = Exception.class)
public void save() {
CompletableFuture<UserEntity> completableFuture = CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(111)
.userName("施傑靈")
.password("abc1213")
.birthday("2018-08-08")
.createUser("1")
.createTime(LocalDateTime.now())
.updateUser("2")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
}).thenCombine(CompletableFuture.supplyAsync(() -> {
UserEntity entity = UserEntity.builder()
.id(222)
.userName("施傑靈1")
.password("abc12131")
.birthday("2018-08-18")
.createUser("11")
.createTime(LocalDateTime.now())
.updateUser("21")
.updateTime(LocalDateTime.now())
.build();
return userRepository.save(entity);
}), (a, b) -> {
System.out.println(a);
System.out.println(b);
return a;
});
UserEntity join = completableFuture.join();
System.out.println(join);
}
複製代碼
通過實際測試,第二個任務拋出異常,是會回滾的。
Java 8的CompletableFuture 經過thenAccept 方法提供了這一功能,它接收CompletableFuture 執行完畢後的返回值作參數。
public void test() {
CompletableFuture.supplyAsync(() -> sleep(2))
.thenCombineAsync(
CompletableFuture.supplyAsync(() -> sleep(1)),
(t1, t2) -> t1 + t2
).thenAccept((t) -> System.out.println(t + "------"));
}
private int sleep(int timeout) {
try {
TimeUnit.SECONDS.sleep(timeout);
} catch (InterruptedException e) {
e.printStackTrace();
}
return timeout;
}
複製代碼
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) ;
複製代碼
int corePoolSize : 核心池的大小,這個參數與後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
int maximumPoolSize : 線程池最大線程數,它表示在線程池中最多能建立多少個線程;
long keepAliveTime : 表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用,直到線程池中的線程數不大於corePoolSize:即當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize;可是若是調用了**allowCoreThreadTimeOut(boolean)**方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0;
TimeUnit unit : 參數keepAliveTime的時間單位
BlockingQueue<Runnable> workQueue : 一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇
ArrayBlockingQueue
LinkedBlockingQueue
PriorityBlockingQueue
ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
ThreadFactory threadFactory
RejectedExecutionHandler handler : 實現RejectedExecutionHandler接口,可自定義處理器