本文已經收錄自 springboot-guide : github.com/Snailclimb/… (Spring Boot 核心知識點整理。 基於 Spring Boot 2.19+。)java
經過本文你能夠了解到下面這些知識點:git
ThreadPoolTaskExecutor
飽和策略;異步編程在處理耗時操做以及多任務處理的場景下很是有用,咱們能夠更好的讓咱們的系統利用好機器的 CPU 和 內存,提升它們的利用率。多線程設計模式有不少種,Future模式是多線程開發中很是常見的一種設計模式,本文也是基於這種模式來講明 SpringBoot 對於異步編程的知識。github
實戰以前我先簡單介紹一下 Future 模式的核心思想 吧!。spring
Future 模式的核心思想是 異步調用 。當咱們執行一個方法時,假如這個方法中有多個耗時的任務須要同時去作,並且又不着急等待這個結果時可讓客戶端當即返回而後,後臺慢慢去計算任務。固然你也能夠選擇等這些任務都執行完了,再返回給客戶端。這個在 Java 中都有很好的支持,我在後面的示例程序中會詳細對比這兩種方式的區別。數據庫
若是咱們須要在 SpringBoot 實現異步編程的話,經過 Spring 提供的兩個註解會讓這件事情變的很是簡單。編程
@EnableAsync
:經過在配置類或者Main類上加@EnableAsync開啓對異步方法的支持。@Async
能夠做用在類上或者方法上,做用在類上表明這個類的全部方法都是異步方法。不少人對於 TaskExecutor 不是太瞭解,因此咱們花一點篇幅先介紹一下這個東西。從名字就能看出它是任務的執行者,它領導執行着線程來處理任務,就像司令官同樣,而咱們的線程就比如一隻只軍隊同樣,這些軍隊能夠異步對敵人進行打擊👊。設計模式
Spring 提供了TaskExecutor
接口做爲任務執行者的抽象,它和java.util.concurrent
包下的Executor
接口很像。稍微不一樣的 TaskExecutor
接口用到了 Java 8 的語法@FunctionalInterface
聲明這個接口是一個函數式接口。springboot
org.springframework.core.task.TaskExecutor
bash
@FunctionalInterface
public interface TaskExecutor extends Executor {
void execute(Runnable var1);
}
複製代碼
若是沒有自定義Executor, Spring 將建立一個 SimpleAsyncTaskExecutor
並使用它。多線程
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.annotation.AsyncConfigurer;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import java.util.concurrent.Executor;
/** @author shuang.kou */
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
private static final int CORE_POOL_SIZE = 6;
private static final int MAX_POOL_SIZE = 10;
private static final int QUEUE_CAPACITY = 100;
@Bean
public Executor taskExecutor() {
// Spring 默認配置是核心線程數大小爲1,最大線程容量大小不受限制,隊列容量也不受限制。
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
// 核心線程數
executor.setCorePoolSize(CORE_POOL_SIZE);
// 最大線程數
executor.setMaxPoolSize(MAX_POOL_SIZE);
// 隊列大小
executor.setQueueCapacity(QUEUE_CAPACITY);
// 當最大池已滿時,此策略保證不會丟失任務請求,可是可能會影響應用程序總體性能。
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.setThreadNamePrefix("My ThreadPoolTaskExecutor-");
executor.initialize();
return executor;
}
}
複製代碼
ThreadPoolTaskExecutor
常見概念:
通常狀況下不會將隊列大小設爲:Integer.MAX_VALUE
,也不會將核心線程數和最大線程數設爲一樣的大小,這樣的話最大線程數的設置都沒什麼意義了,你也沒法肯定當前 CPU 和內存利用率具體狀況如何。
若是隊列已滿而且當前同時運行的線程數達到最大線程數的時候,若是再有新任務過來會發生什麼呢?
Spring 默認使用的是 ThreadPoolExecutor.AbortPolicy
。在Spring的默認狀況下,ThreadPoolExecutor
將拋出 RejectedExecutionException
來拒絕新來的任務 ,這表明你將丟失對這個任務的處理。 對於可伸縮的應用程序,建議使用 ThreadPoolExecutor.CallerRunsPolicy
。當最大池被填滿時,此策略爲咱們提供可伸縮隊列。
ThreadPoolTaskExecutor
飽和策略定義:
若是當前同時運行的線程數量達到最大線程數量時,ThreadPoolTaskExecutor
定義一些策略:
RejectedExecutionException
來拒絕新任務的處理。下面模擬一個查找對應字符開頭電影的方法,咱們給這個方法加上了@Async
註解來告訴 Spring 它是一個異步的方法。另外,這個方法的返回值 CompletableFuture.completedFuture(results)
這表明咱們須要返回結果,也就是說程序必須把任務執行完成以後再返回給用戶。
請留意completableFutureTask
方法中的第一行打印日誌這句代碼,後面分析程序中會用到,很重要!
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.stream.Collectors;
/** @author shuang.kou */
@Service
public class AsyncService {
private static final Logger logger = LoggerFactory.getLogger(AsyncService.class);
private List<String> movies =
new ArrayList<>(
Arrays.asList(
"Forrest Gump",
"Titanic",
"Spirited Away",
"The Shawshank Redemption",
"Zootopia",
"Farewell ",
"Joker",
"Crawl"));
/** 示範使用:找到特定字符/字符串開頭的電影 */
@Async
public CompletableFuture<List<String>> completableFutureTask(String start) {
// 打印日誌
logger.warn(Thread.currentThread().getName() + "start this task!");
// 找到特定字符/字符串開頭的電影
List<String> results =
movies.stream().filter(movie -> movie.startsWith(start)).collect(Collectors.toList());
// 模擬這是一個耗時的任務
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
//返回一個已經用給定值完成的新的CompletableFuture。
return CompletableFuture.completedFuture(results);
}
}
複製代碼
/** @author shuang.kou */
@RestController
@RequestMapping("/async")
public class AsyncController {
@Autowired
AsyncService asyncService;
@GetMapping("/movies")
public String completableFutureTask() throws ExecutionException, InterruptedException {
//開始時間
long start = System.currentTimeMillis();
// 開始執行大量的異步任務
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<List<String>>> completableFutureList =
words.stream()
.map(word -> asyncService.completableFutureTask(word))
.collect(Collectors.toList());
// CompletableFuture.join()方法能夠獲取他們的結果並將結果鏈接起來
List<List<String>> results = completableFutureList.stream().map(CompletableFuture::join).collect(Collectors.toList());
// 打印結果以及運行程序運行花費時間
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
return results.toString();
}
}
複製代碼
請求這個接口,控制檯打印出下面的內容:
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-1] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-6] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-5] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-5start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-4] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-3] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 13:50:17.007 WARN 18793 --- [lTaskExecutor-2] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-2start this task!
Elapsed time: 1010
複製代碼
首先咱們能夠看處處理全部任務花費的時間大概是 1 s。這與咱們自定義的 ThreadPoolTaskExecutor
有關,咱們配置的核心線程數是 6 ,而後經過經過下面的代碼模擬分配了 6 個任務給系統執行。這樣每一個線程都會被分配到一個任務,每一個任務執行花費時間是 1 s ,因此處理 6 個任務的總花費時間是 1 s。
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
List<CompletableFuture<List<String>>> completableFutureList =
words.stream()
.map(word -> asyncService.completableFutureTask(word))
.collect(Collectors.toList());
複製代碼
你能夠本身驗證一下,試着去把核心線程數的數量改成 3 ,再次請求這個接口你會發現處理全部任務花費的時間大概是 2 s。
另外,從上面的運行結果能夠看出,當全部任務執行完成以後才返回結果。這種狀況對應於咱們須要返回結果給客戶端請求的狀況下,假如咱們不須要返回任務執行結果給客戶端的話呢? 就好比咱們上傳一個大文件到系統,上傳以後只要大文件格式符合要求咱們就上傳成功。普通狀況下咱們須要等待文件上傳完畢再返回給用戶消息,可是這樣會很慢。採用異步的話,當用戶上傳以後就立馬返回給用戶消息,而後系統再默默去處理上傳任務。這樣也會增長一點麻煩,由於文件可能會上傳失敗,因此係統也須要一點機制來補償這個問題,好比當上傳遇到問題的時候,發消息通知用戶。
下面會演示一下客戶端不須要返回結果的狀況:
將completableFutureTask
方法變爲 void 類型
@Async
public void completableFutureTask(String start) {
......
//這裏多是系統對任務執行結果的處理,好比存入到數據庫等等......
//doSomeThingWithResults(results);
}
複製代碼
Controller 代碼修改以下:
@GetMapping("/movies")
public String completableFutureTask() throws ExecutionException, InterruptedException {
// Start the clock
long start = System.currentTimeMillis();
// Kick of multiple, asynchronous lookups
List<String> words = Arrays.asList("F", "T", "S", "Z", "J", "C");
words.stream()
.forEach(word -> asyncService.completableFutureTask(word));
// Wait until they are all done
// Print results, including elapsed time
System.out.println("Elapsed time: " + (System.currentTimeMillis() - start));
return "Done";
}
複製代碼
請求這個接口,控制檯打印出下面的內容:
Elapsed time: 0
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-4] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-4start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-3] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-3start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-2] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-2start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-1] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-1start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-6] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-6start this task!
2019-10-01 14:02:44.052 WARN 19051 --- [lTaskExecutor-5] g.j.a.service.AsyncService : My ThreadPoolTaskExecutor-5start this task!
複製代碼
能夠看到系統會直接返回給用戶結果,而後系統才真正開始執行任務。