Spring Boot系列二 Spring @Async異步線程池用法總結

. TaskExecutor

Spring異步線程池的接口類,其實質是java.util.concurrent.Executorjava

Spring 已經實現的異常線程池: 
1. SimpleAsyncTaskExecutor:不是真的線程池,這個類不重用線程,每次調用都會建立一個新的線程。 
2. SyncTaskExecutor:這個類沒有實現異步調用,只是一個同步操做。只適用於不須要多線程的地方 
3. ConcurrentTaskExecutor:Executor的適配類,不推薦使用。若是ThreadPoolTaskExecutor不知足要求時,才用考慮使用這個類 
4. SimpleThreadPoolTaskExecutor:是Quartz的SimpleThreadPool的類。線程池同時被quartz和非quartz使用,才須要使用此類 
5. ThreadPoolTaskExecutor :最常使用,推薦。 其實質是對java.util.concurrent.ThreadPoolExecutor的包裝spring

2. @Async

spring對過@Async定義異步任務多線程

異步的方法有3種 
1. 最簡單的異步調用,返回值爲void 
2. 帶參數的異步調用 異步方法能夠傳入參數 
3. 異常調用返回Futureless

詳細見代碼:異步

@Component
public class AsyncDemo {
    private static final Logger log = LoggerFactory.getLogger(AsyncDemo.class);

    /**
     * 最簡單的異步調用,返回值爲void
     */
    @Async
    public void asyncInvokeSimplest() {
        log.info("asyncSimplest");
    }

    /**
     * 帶參數的異步調用 異步方法能夠傳入參數
     * 
     * @param s
     */
    @Async
    public void asyncInvokeWithParameter(String s) {
        log.info("asyncInvokeWithParameter, parementer={}", s);
    }

    /**
     * 異常調用返回Future
     * 
     * @param i
     * @return
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        log.info("asyncInvokeReturnFuture, parementer={}", i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        }
        return future;
    }

}

以上的異步方法和普通的方法調用相同async

asyncDemo.asyncInvokeSimplest();
asyncDemo.asyncInvokeWithException("test");
Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
System.out.println(future.get());

3. Spring 開啓異步配置

Spring有兩種方法啓動配置 
1. 註解 
2. XMLide

3.1 經過註解實現

要啓動異常方法還須要如下配置 
1. @EnableAsync 此註解開戶異步調用功能 
2. public AsyncTaskExecutor taskExecutor() 方法自定義本身的線程池,線程池前綴」Anno-Executor」。若是不定義,則使用系統默認的線程池。測試

@SpringBootApplication
@EnableAsync // 啓動異步調用
public class AsyncApplicationWithAnnotation {
    private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithAnnotation.class);

    /**
     * 自定義異步線程池
     * @return
     */
    @Bean
    public AsyncTaskExecutor taskExecutor() {  
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); 
        executor.setThreadNamePrefix("Anno-Executor");
        executor.setMaxPoolSize(10);  

        // 設置拒絕策略
        executor.setRejectedExecutionHandler(new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                // .....
            }
        });
        // 使用預約義的異常處理類
        // executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());

        return executor;  
    } 

    public static void main(String[] args) {
        log.info("Start AsyncApplication.. ");
        SpringApplication.run(AsyncApplicationWithAnnotation.class, args);
    }
}

以上的異常方法和普通的方法調用相同this

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithAnnotation.class)
public class AsyncApplicationWithAnnotationTests {
    @Autowired
    private AsyncDemo asyncDemo;

    @Test
    public void contextLoads() throws InterruptedException, ExecutionException {
        asyncDemo.asyncInvokeSimplest();
        asyncDemo.asyncInvokeWithParameter("test");
        Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
        System.out.println(future.get());
    }
}

執行測試用例,輸出內容以下: 
能夠看出主線程的名稱爲main; 異步方法則使用 Anno-Executor1,可見異常線程池起做用了idea

2017-03-28 20:00:07.731  INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo  : asyncSimplest
2017-03-28 20:00:07.732  INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo  : asyncInvokeWithParameter, parementer=test
2017-03-28 20:00:07.751  INFO 5144 --- [ Anno-Executor1] c.hry.spring.async.annotation.AsyncDemo  : asyncInvokeReturnFuture, parementer=100
success:100
2017-03-28 20:00:08.757  INFO 5144 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@47af7f3d: startup date [Tue Mar 28 20:00:06 CST 2017]; root of context hierarchy

3.2 經過XML實現

Bean文件配置: spring_async.xml 
1. 線程的前綴爲xmlExecutor 
2. 啓動異步線程池配置

<!-- 等價於 @EnableAsync, executor指定線程池 -->
    <task:annotation-driven executor="xmlExecutor"/>
    <!-- id指定線程池產生線程名稱的前綴 -->
    <task:executor
        id="xmlExecutor"
        pool-size="5-25"
        queue-capacity="100"
        keep-alive="120"
        rejection-policy="CALLER_RUNS"/>

線程池參數說明 
1. ‘id’ : 線程的名稱的前綴 
2. ‘pool-size’:線程池的大小。支持範圍」min-max」和固定值(此時線程池core和max sizes相同) 
3. ‘queue-capacity’ :排隊隊列長度 
○ The main idea is that when a task is submitted, the executor will first try to use a free thread if the number of active threads is currently less than the core size. 
○ If the core size has been reached, then the task will be added to the queue as long as its capacity has not yet been reached. 
○ Only then, if the queue’s capacity has been reached, will the executor create a new thread beyond the core size. 
○ If the max size has also been reached, then the executor will reject the task. 
○ By default, the queue is unbounded, but this is rarely the desired configuration because it can lead to OutOfMemoryErrors if enough tasks are added to that queue while all pool threads are busy. 
4. ‘rejection-policy’: 對拒絕的任務處理策略 
○ In the default ThreadPoolExecutor.AbortPolicy, the handler throws a runtime RejectedExecutionException upon rejection. 
○ In ThreadPoolExecutor.CallerRunsPolicy, the thread that invokes execute itself runs the task. This provides a simple feedback control mechanism that will slow down the rate that new tasks are submitted. 
○ In ThreadPoolExecutor.DiscardPolicy, a task that cannot be executed is simply dropped. 
○ In ThreadPoolExecutor.DiscardOldestPolicy, if the executor is not shut down, the task at the head of the work queue is dropped, and then execution is retried (which can fail again, causing this to be repeated.) 
5. ‘keep-alive’ : 線程保活時間(單位秒) 
setting determines the time limit (in seconds) for which threads may remain idle before being terminated. If there are more than the core number of threads currently in the pool, after waiting this amount of time without processing a task, excess threads will get terminated. A time value of zero will cause excess threads to terminate immediately after executing a task without remaining follow-up work in the task queue()

異步線程池

@SpringBootApplication
@ImportResource("classpath:/async/spring_async.xml")
public class AsyncApplicationWithXML {
    private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithXML.class);

    public static void main(String[] args) {
        log.info("Start AsyncApplication.. ");
        SpringApplication.run(AsyncApplicationWithXML.class, args);
    }
}

測試用例

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithXML.class)
public class AsyncApplicationWithXMLTest {
    @Autowired
    private AsyncDemo asyncDemo;

    @Test
    public void contextLoads() throws InterruptedException, ExecutionException {
        asyncDemo.asyncInvokeSimplest();
        asyncDemo.asyncInvokeWithParameter("test");
        Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
        System.out.println(future.get());
    }
}

運行測試用例,輸出內容以下: 
能夠看出主線程的名稱爲main; 異步方法則使用 xmlExecutor-x,可見異常線程池起做用了

2017-03-28 20:12:10.540  INFO 12948 --- [           main] c.h.s.a.xml.AsyncApplicationWithXMLTest  : Started AsyncApplicationWithXMLTest in 1.441 seconds (JVM running for 2.201)
2017-03-28 20:12:10.718  INFO 12948 --- [  xmlExecutor-2] com.hry.spring.async.xml.AsyncDemo       : asyncInvokeWithParameter, parementer=test
2017-03-28 20:12:10.721  INFO 12948 --- [  xmlExecutor-1] com.hry.spring.async.xml.AsyncDemo       : asyncSimplest
2017-03-28 20:12:10.722  INFO 12948 --- [  xmlExecutor-3] com.hry.spring.async.xml.AsyncDemo       : asyncInvokeReturnFuture, parementer=100
success:100
2017-03-28 20:12:11.729  INFO 12948 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@71809907: startup date [Tue Mar 28 20:12:09 CST 2017]; root of context hierarchy

4. 對異步方法的異常處理

在調用方法時,可能出現方法中拋出異常的狀況。在異步中主要有有兩種異常處理方法: 
1. 對於方法返回值是Futrue的異步方法: a) 一種是在調用future的get時捕獲異常; b) 在異常方法中直接捕獲異常 
2. 對於返回值是void的異步方法:經過AsyncUncaughtExceptionHandler處理異常

AsyncExceptionDemo:

@Component
public class AsyncExceptionDemo {
    private static final Logger log = LoggerFactory.getLogger(AsyncExceptionDemo.class);

    /**
     * 最簡單的異步調用,返回值爲void
     */
    @Async
    public void asyncInvokeSimplest() {
        log.info("asyncSimplest");
    }

    /**
     * 帶參數的異步調用 異步方法能夠傳入參數
     *  對於返回值是void,異常會被AsyncUncaughtExceptionHandler處理掉
     * @param s
     */
    @Async
    public void asyncInvokeWithException(String s) {
        log.info("asyncInvokeWithParameter, parementer={}", s);
        throw new IllegalArgumentException(s);
    }

    /**
     * 異常調用返回Future
     *  對於返回值是Future,不會被AsyncUncaughtExceptionHandler處理,須要咱們在方法中捕獲異常並處理
     *  或者在調用方在調用Futrue.get時捕獲異常進行處理
     * 
     * @param i
     * @return
     */
    @Async
    public Future<String> asyncInvokeReturnFuture(int i) {
        log.info("asyncInvokeReturnFuture, parementer={}", i);
        Future<String> future;
        try {
            Thread.sleep(1000 * 1);
            future = new AsyncResult<String>("success:" + i);
            throw new IllegalArgumentException("a");
        } catch (InterruptedException e) {
            future = new AsyncResult<String>("error");
        } catch(IllegalArgumentException e){
            future = new AsyncResult<String>("error-IllegalArgumentException");
        }
        return future;
    }

}

實現AsyncConfigurer接口對異常線程池更加細粒度的控制 
a) 建立線程本身的線程池 
b) 對void方法拋出的異常處理的類AsyncUncaughtExceptionHandler

/**
 * 經過實現AsyncConfigurer自定義異常線程池,包含異常處理
 * 
 * @author hry
 *
 */
@Service
public class MyAsyncConfigurer implements AsyncConfigurer{
    private static final Logger log = LoggerFactory.getLogger(MyAsyncConfigurer.class);

    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPool = new ThreadPoolTaskExecutor();  
        threadPool.setCorePoolSize(1);  
        threadPool.setMaxPoolSize(1);  
        threadPool.setWaitForTasksToCompleteOnShutdown(true);  
        threadPool.setAwaitTerminationSeconds(60 * 15);  
        threadPool.setThreadNamePrefix("MyAsync-");
        threadPool.initialize();
        return threadPool;  
    }

    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
         return new MyAsyncExceptionHandler();  
    }

    /**
     * 自定義異常處理類
     * @author hry
     *
     */
    class MyAsyncExceptionHandler implements AsyncUncaughtExceptionHandler {  

        @Override  
        public void handleUncaughtException(Throwable throwable, Method method, Object... obj) {  
            log.info("Exception message - " + throwable.getMessage());  
            log.info("Method name - " + method.getName());  
            for (Object param : obj) {  
                log.info("Parameter value - " + param);  
            }  
        }  

    } 

}
@SpringBootApplication
@EnableAsync // 啓動異步調用
public class AsyncApplicationWithAsyncConfigurer {
    private static final Logger log = LoggerFactory.getLogger(AsyncApplicationWithAsyncConfigurer.class);

    public static void main(String[] args) {
        log.info("Start AsyncApplication.. ");
        SpringApplication.run(AsyncApplicationWithAsyncConfigurer.class, args);
    }


}

測試代碼

@RunWith(SpringRunner.class)
@SpringBootTest(classes=AsyncApplicationWithAsyncConfigurer.class)
public class AsyncApplicationWithAsyncConfigurerTests {
    @Autowired
    private AsyncExceptionDemo asyncDemo;

    @Test
    public void contextLoads() throws InterruptedException, ExecutionException {
        asyncDemo.asyncInvokeSimplest();
        asyncDemo.asyncInvokeWithException("test");
        Future<String> future = asyncDemo.asyncInvokeReturnFuture(100);
        System.out.println(future.get());
    }
}

運行測試用例 
MyAsyncConfigurer 捕獲AsyncExceptionDemo 對象在調用asyncInvokeWithException的異常

2017-04-02 16:01:45.591  INFO 11152 --- [      MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo     : asyncSimplest
2017-04-02 16:01:45.605  INFO 11152 --- [      MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo     : asyncInvokeWithParameter, parementer=test
2017-04-02 16:01:45.608  INFO 11152 --- [      MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer  : Exception message - test
2017-04-02 16:01:45.608  INFO 11152 --- [      MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer  : Method name - asyncInvokeWithException
2017-04-02 16:01:45.608  INFO 11152 --- [      MyAsync-1] c.h.s.async.exception.MyAsyncConfigurer  : Parameter value - test
2017-04-02 16:01:45.608  INFO 11152 --- [      MyAsync-1] c.h.s.a.exception.AsyncExceptionDemo     : asyncInvokeReturnFuture, parementer=100
error-IllegalArgumentException
2017-04-02 16:01:46.656  INFO 11152 --- [       Thread-2] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@47af7f3d: startup date [Sun Apr 02 16:01:44 CST 2017]; root of context hierarchy

5. 源碼地址

相關文章
相關標籤/搜索