springboot線程池的使用和擴展

咱們經常使用ThreadPoolExecutor提供的線程池服務,springboot框架提供了@Async註解,幫助咱們更方便的將業務邏輯提交到線程池中異步執行,今天咱們就來實戰體驗這個線程池服務;css

本文地址:http://blog.csdn.net/boling_cavalry/article/details/79120268java

實戰環境

  1. windowns10;
  2. jdk1.8;
  3. springboot 1.5.9.RELEASE;
  4. 開發工具:IntelliJ IDEA;

實戰源碼

本次實戰的源碼能夠在個人GitHub下載,地址:git@github.com:zq2599/blog_demos.git,項目主頁:https://github.com/zq2599/blog_demosgit

這裏面有多個工程,本次用到的工程爲threadpooldemoserver,以下圖紅框所示: 
這裏寫圖片描述github

實戰步驟梳理

本次實戰的步驟以下: 
1. 建立springboot工程; 
2. 建立Service層的接口和實現; 
3. 建立controller,開發一個http服務接口,裏面會調用service層的服務; 
4. 建立線程池的配置; 
5. 將Service層的服務異步化,這樣每次調用都會都被提交到線程池異步執行; 
6. 擴展ThreadPoolTaskExecutor,在提交任務到線程池的時候能夠觀察到當前線程池的狀況;web

建立springboot工程

用IntelliJ IDEA建立一個springboot的web工程threadpooldemoserver,pom.xml內容以下:spring

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.bolingcavalry</groupId> <artifactId>threadpooldemoserver</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>threadpooldemoserver</name> <description>Demo project for Spring Boot</description> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.9.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding> <java.version>1.8</java.version> </properties> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>

建立Service層的接口和實現

建立一個service層的接口AsyncService,以下:shell

public interface AsyncService { /** * 執行異步任務 */ void executeAsync(); }

對應的AsyncServiceImpl,實現以下:apache

@Service public class AsyncServiceImpl implements AsyncService { private static final Logger logger = LoggerFactory.getLogger(AsyncServiceImpl.class); @Override public void executeAsync() { logger.info("start executeAsync"); try{ Thread.sleep(1000); }catch(Exception e){ e.printStackTrace(); } logger.info("end executeAsync"); } }

這個方法作的事情很簡單:sleep了一秒鐘;瀏覽器

建立controller

建立一個controller爲Hello,裏面定義一個http接口,作的事情是調用Service層的服務,以下:tomcat

@RestController public class Hello { private static final Logger logger = LoggerFactory.getLogger(Hello.class); @Autowired private AsyncService asyncService; @RequestMapping("/") public String submit(){ logger.info("start submit"); //調用service層的任務 asyncService.executeAsync(); logger.info("end submit"); return "success"; } }

至此,咱們已經作好了一個http請求的服務,裏面作的事情實際上是同步的,接下來咱們就開始配置springboot的線程池服務,將service層作的事情都提交到線程池中去處理;

springboot的線程池配置

建立一個配置類ExecutorConfig,用來定義如何建立一個ThreadPoolTaskExecutor,要使用@Configuration和@EnableAsync這兩個註解,表示這是個配置類,而且是線程池的配置類,以下所示:

@Configuration @EnableAsync public class ExecutorConfig { private static final Logger logger = LoggerFactory.getLogger(ExecutorConfig.class); @Bean public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); //配置核心線程數 executor.setCorePoolSize(5); //配置最大線程數 executor.setMaxPoolSize(5); //配置隊列大小 executor.setQueueCapacity(99999); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("async-service-"); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor; } }

注意,上面的方法名稱爲asyncServiceExecutor,稍後立刻用到;

將Service層的服務異步化

打開AsyncServiceImpl.java,在executeAsync方法上增長註解@Async(「asyncServiceExecutor」),asyncServiceExecutor是前面ExecutorConfig.java中的方法名,代表executeAsync方法進入的線程池是asyncServiceExecutor方法建立的,以下:

@Override @Async("asyncServiceExecutor") public void executeAsync() { logger.info("start executeAsync"); try{ Thread.sleep(1000); }catch(Exception e){ e.printStackTrace(); } logger.info("end executeAsync"); }

驗證效果

  1. 將這個springboot運行起來(pom.xml所在文件夾下執行mvn spring-boot:run);
  2. 在瀏覽器輸入:http://localhost:8080
  3. 在瀏覽器用F5按鈕快速多刷新幾回;
  4. 在springboot的控制檯看見日誌以下:
2018-01-21 22:43:18.630 INFO 14824 --- [nio-8080-exec-8] c.b.t.controller.Hello : start submit 2018-01-21 22:43:18.630 INFO 14824 --- [nio-8080-exec-8] c.b.t.controller.Hello : end submit 2018-01-21 22:43:18.929 INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 22:43:18.930 INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : start executeAsync 2018-01-21 22:43:19.005 INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 22:43:19.006 INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : start executeAsync 2018-01-21 22:43:19.175 INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 22:43:19.175 INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : start executeAsync 2018-01-21 22:43:19.326 INFO 14824 --- [async-service-4] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 22:43:19.495 INFO 14824 --- [async-service-5] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 22:43:19.930 INFO 14824 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 22:43:20.006 INFO 14824 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 22:43:20.191 INFO 14824 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : end executeAsync

 

如上日誌所示,咱們能夠看到controller的執行線程是」nio-8080-exec-8」,這是tomcat的執行線程,而service層的日誌顯示線程名爲「async-service-1」,顯然已經在咱們配置的線程池中執行了,而且每次請求中,controller的起始和結束日誌都是連續打印的,代表每次請求都快速響應了,而耗時的操做都留給線程池中的線程去異步執行;

擴展ThreadPoolTaskExecutor

雖然咱們已經用上了線程池,可是還不清楚線程池當時的狀況,有多少線程在執行,多少在隊列中等待呢?這裏我建立了一個ThreadPoolTaskExecutor的子類,在每次提交線程的時候都會將當前線程池的運行情況打印出來,代碼以下:

public class VisiableThreadPoolTaskExecutor extends ThreadPoolTaskExecutor { private static final Logger logger = LoggerFactory.getLogger(VisiableThreadPoolTaskExecutor.class); private void showThreadPoolInfo(String prefix){ ThreadPoolExecutor threadPoolExecutor = getThreadPoolExecutor(); if(null==threadPoolExecutor){ return; } logger.info("{}, {},taskCount [{}], completedTaskCount [{}], activeCount [{}], queueSize [{}]", this.getThreadNamePrefix(), prefix, threadPoolExecutor.getTaskCount(), threadPoolExecutor.getCompletedTaskCount(), threadPoolExecutor.getActiveCount(), threadPoolExecutor.getQueue().size()); } @Override public void execute(Runnable task) { showThreadPoolInfo("1. do execute"); super.execute(task); } @Override public void execute(Runnable task, long startTimeout) { showThreadPoolInfo("2. do execute"); super.execute(task, startTimeout); } @Override public Future<?> submit(Runnable task) { showThreadPoolInfo("1. do submit"); return super.submit(task); } @Override public <T> Future<T> submit(Callable<T> task) { showThreadPoolInfo("2. do submit"); return super.submit(task); } @Override public ListenableFuture<?> submitListenable(Runnable task) { showThreadPoolInfo("1. do submitListenable"); return super.submitListenable(task); } @Override public <T> ListenableFuture<T> submitListenable(Callable<T> task) { showThreadPoolInfo("2. do submitListenable"); return super.submitListenable(task); } }

如上所示,showThreadPoolInfo方法中將任務總數、已完成數、活躍線程數,隊列大小都打印出來了,而後Override了父類的execute、submit等方法,在裏面調用showThreadPoolInfo方法,這樣每次有任務被提交到線程池的時候,都會將當前線程池的基本狀況打印到日誌中;

修改ExecutorConfig.java的asyncServiceExecutor方法,將ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor()改成ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(),以下所示:

@Bean public Executor asyncServiceExecutor() { logger.info("start asyncServiceExecutor"); //使用VisiableThreadPoolTaskExecutor ThreadPoolTaskExecutor executor = new VisiableThreadPoolTaskExecutor(); //配置核心線程數 executor.setCorePoolSize(5); //配置最大線程數 executor.setMaxPoolSize(5); //配置隊列大小 executor.setQueueCapacity(99999); //配置線程池中的線程的名稱前綴 executor.setThreadNamePrefix("async-service-"); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); //執行初始化 executor.initialize(); return executor; }
  • 1

再次啓動該工程,再瀏覽器反覆刷新http://localhost:8080,看到的日誌以下:

2018-01-21 23:04:56.113 INFO 15580 --- [nio-8080-exec-1] c.b.t.e.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [99], completedTaskCount [85], activeCount [5], queueSize [9] 2018-01-21 23:04:56.113 INFO 15580 --- [nio-8080-exec-1] c.b.t.controller.Hello : end submit 2018-01-21 23:04:56.225 INFO 15580 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 23:04:56.225 INFO 15580 --- [async-service-1] c.b.t.service.impl.AsyncServiceImpl : start executeAsync 2018-01-21 23:04:56.240 INFO 15580 --- [nio-8080-exec-2] c.b.t.controller.Hello : start submit 2018-01-21 23:04:56.240 INFO 15580 --- [nio-8080-exec-2] c.b.t.e.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [100], completedTaskCount [86], activeCount [5], queueSize [9] 2018-01-21 23:04:56.240 INFO 15580 --- [nio-8080-exec-2] c.b.t.controller.Hello : end submit 2018-01-21 23:04:56.298 INFO 15580 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 23:04:56.298 INFO 15580 --- [async-service-2] c.b.t.service.impl.AsyncServiceImpl : start executeAsync 2018-01-21 23:04:56.372 INFO 15580 --- [nio-8080-exec-3] c.b.t.controller.Hello : start submit 2018-01-21 23:04:56.373 INFO 15580 --- [nio-8080-exec-3] c.b.t.e.VisiableThreadPoolTaskExecutor : async-service-, 2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9] 2018-01-21 23:04:56.373 INFO 15580 --- [nio-8080-exec-3] c.b.t.controller.Hello : end submit 2018-01-21 23:04:56.444 INFO 15580 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : end executeAsync 2018-01-21 23:04:56.445 INFO 15580 --- [async-service-3] c.b.t.service.impl.AsyncServiceImpl : start executeAsync

 

注意這一行日誌:2. do submit,taskCount [101], completedTaskCount [87], activeCount [5], queueSize [9]

這說明提交任務到線程池的時候,調用的是submit(Callable task)這個方法,當前已經提交了101個任務,完成了87個,當前有5個線程在處理任務,還剩9個任務在隊列中等待,線程池的基本狀況一路瞭然;

至此,springboot線程池服務的實戰就完成了,但願能幫您在工程中快速實現異步服務;

 

 

spring-boot 方法異步調用,自定義線程池配置使用
 

一、在主類中添加@EnableAsync註解:

@SpringBootApplication @EnableScheduling @EnableAsync public class MySpringBootApplication { private static Logger logger = LoggerFactory.getLogger(MySpringBootApplication.class); public static void main(String[] args) { SpringApplication.run(MySpringBootApplication.class, args); logger.info("My Spring Boot Application Started"); } 

二、建立一個AsyncTask類,在裏面添加兩個用@Async註解的task:

@Component public class AsyncTask { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async public Future<String> doTask1() throws InterruptedException{ logger.info("Task1 started."); long start = System.currentTimeMillis(); Thread.sleep(5000); long end = System.currentTimeMillis(); logger.info("Task1 finished, time elapsed: {} ms.", end-start); return new AsyncResult<>("Task1 accomplished!"); } @Async public Future<String> doTask2() throws InterruptedException{ logger.info("Task2 started."); long start = System.currentTimeMillis(); Thread.sleep(3000); long end = System.currentTimeMillis(); logger.info("Task2 finished, time elapsed: {} ms.", end-start); return new AsyncResult<>("Task2 accomplished!"); } }

三、萬事俱備,開始測試:

public class TaskTests extends BasicUtClass{ @Autowired private AsyncTask asyncTask; @Test public void AsyncTaskTest() throws InterruptedException, ExecutionException { Future<String> task1 = asyncTask.doTask1(); Future<String> task2 = asyncTask.doTask2(); while(true) { if(task1.isDone() && task2.isDone()) { logger.info("Task1 result: {}", task1.get()); logger.info("Task2 result: {}", task2.get()); break; } Thread.sleep(1000); } logger.info("All tasks finished."); } }

測試結果:

2016-12-13 11:12:24,850:INFO main (AsyncExecutionAspectSupport.java:245) - No TaskExecutor bean found for async processing 2016-12-13 11:12:24,864:INFO SimpleAsyncTaskExecutor-1 (AsyncTask.java:22) - Task1 started. 2016-12-13 11:12:24,865:INFO SimpleAsyncTaskExecutor-2 (AsyncTask.java:34) - Task2 started. 2016-12-13 11:12:27,869:INFO SimpleAsyncTaskExecutor-2 (AsyncTask.java:39) - Task2 finished, time elapsed: 3001 ms. 2016-12-13 11:12:29,866:INFO SimpleAsyncTaskExecutor-1 (AsyncTask.java:27) - Task1 finished, time elapsed: 5001 ms. 2016-12-13 11:12:30,853:INFO main (TaskTests.java:23) - Task1 result: Task1 accomplished! 2016-12-13 11:12:30,853:INFO main (TaskTests.java:24) - Task2 result: Task2 accomplished! 2016-12-13 11:12:30,854:INFO main (TaskTests.java:30) - All tasks finished.

能夠看到,沒有自定義的Executor,因此使用缺省的TaskExecutor 。

 

前面是最簡單的使用方法。若是想使用自定義的Executor,能夠按照以下幾步來:

一、新建一個Executor配置類,順便把@EnableAsync註解搬到這裏來:

@Configuration @EnableAsync public class ExecutorConfig { /** Set the ThreadPoolExecutor's core pool size. */ private int corePoolSize = 10; /** Set the ThreadPoolExecutor's maximum pool size. */ private int maxPoolSize = 200; /** Set the capacity for the ThreadPoolExecutor's BlockingQueue. */ private int queueCapacity = 10; @Bean public Executor mySimpleAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("MySimpleExecutor-"); executor.initialize(); return executor; } @Bean public Executor myAsync() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(corePoolSize); executor.setMaxPoolSize(maxPoolSize); executor.setQueueCapacity(queueCapacity); executor.setThreadNamePrefix("MyExecutor-"); // rejection-policy:當pool已經達到max size的時候,如何處理新任務 // CALLER_RUNS:不在新線程中執行任務,而是有調用者所在的線程來執行 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy()); executor.initialize(); return executor; } }

這裏定義了兩個不一樣的Executor,第二個從新設置了pool已經達到max size時候的處理方法;同時指定了線程名字的前綴。

二、自定義Executor的使用:

@Component public class AsyncTask { protected final Logger logger = LoggerFactory.getLogger(this.getClass()); @Async("mySimpleAsync") public Future<String> doTask1() throws InterruptedException{ logger.info("Task1 started."); long start = System.currentTimeMillis(); Thread.sleep(5000); long end = System.currentTimeMillis(); logger.info("Task1 finished, time elapsed: {} ms.", end-start); return new AsyncResult<>("Task1 accomplished!"); } @Async("myAsync") public Future<String> doTask2() throws InterruptedException{ logger.info("Task2 started."); long start = System.currentTimeMillis(); Thread.sleep(3000); long end = System.currentTimeMillis(); logger.info("Task2 finished, time elapsed: {} ms.", end-start); return new AsyncResult<>("Task2 accomplished!"); } }

就是把上面自定義Executor的類名,放進@Async註解中。

三、(測試用例不變)測試結果:

2016-12-13 10:57:11,998:INFO MySimpleExecutor-1 (AsyncTask.java:22) - Task1 started. 2016-12-13 10:57:12,001:INFO MyExecutor-1 (AsyncTask.java:34) - Task2 started. 2016-12-13 10:57:15,007:INFO MyExecutor-1 (AsyncTask.java:39) - Task2 finished, time elapsed: 3000 ms. 2016-12-13 10:57:16,999:INFO MySimpleExecutor-1 (AsyncTask.java:27) - Task1 finished, time elapsed: 5001 ms. 2016-12-13 10:57:17,994:INFO main (TaskTests.java:23) - Task1 result: Task1 accomplished! 2016-12-13 10:57:17,994:INFO main (TaskTests.java:24) - Task2 result: Task2 accomplished! 2016-12-13 10:57:17,994:INFO main (TaskTests.java:30) - All tasks finished. 2016-12-13 10:57:18,064 Thread-3 WARN Unable to register Log4j shutdown hook because JVM is shutting down. Using SimpleLogger

可見,線程名字的前綴變了,兩個task使用了不一樣的線程池了。

參考博客:http://blog.csdn.net/clementad/article/details/53607311

相關文章
相關標籤/搜索