最近在代碼中使用@Async來進行異步抄單,進行壓測時出現了OOM問題,經過日誌信息看到了:unable to create new native threadreact
初步懷疑是建立的線程太多致使的,使用jstack 線程PID 分析打印的日誌,發現大量線程處於Runnable狀態,基本能夠肯定是建立線程太多致使的。緩存
出問題的服務是報案的服務,在進行抄單時在方法上使用了@Asycn這個註解,進行異步抄單,經過trace-log日誌能夠看出建立了不少的Thread,通過簡單的瞭解@Async異步配置使用的是SimpleAsyncTaskExecutor,該線程池默認來一個任務建立一個線程,在壓測的狀況下,會有大量的請求去抄單,這時會不斷建立大量的線程,極有可能壓爆服務器內存。bash
藉此機會學習一下SimpleAsyncTaskExecutor的源碼,SimpleAsyncTaskExecutor提供了限流機制,經過concurrencyLimit屬性來控制開關,當concurrencyLimit>=0時開啓限流機制,默認關閉限流機制即concurrencyLimit=-1,當關閉狀況下,會不斷建立新的線程來處理任務,核心代碼以下:服務器
@Override
public void execute(Runnable task, long startTimeout) {
Assert.notNull(task, "Runnable must not be null");
Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
this.concurrencyThrottle.beforeAccess();
doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
}
else {
doExecute(taskToUse);
}
}
複製代碼
SimpleAsyncTaskExecutor限流實現:異步
首選任務進來,會循環判斷當前執行線程數是否超過concurrencyLimit,若是超了,則當前線程調用wait方法,釋放monitor對象鎖,進入等待async
protected void beforeAccess() {
if (this.concurrencyLimit == NO_CONCURRENCY) {
throw new IllegalStateException(
"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
}
if (this.concurrencyLimit > 0) {
boolean debug = logger.isDebugEnabled();
synchronized (this.monitor) {
boolean interrupted = false;
while (this.concurrencyCount >= this.concurrencyLimit) {
if (interrupted) {
throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
"but concurrency limit still does not allow for entering");
}
if (debug) {
logger.debug("Concurrency count " + this.concurrencyCount +
" has reached limit " + this.concurrencyLimit + " - blocking");
}
try {
this.monitor.wait();
}
catch (InterruptedException ex) {
// Re-interrupt current thread, to allow other threads to react.
Thread.currentThread().interrupt();
interrupted = true;
}
}
if (debug) {
logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
}
this.concurrencyCount++;
}
}
}
複製代碼
線程任務執行完畢後,當執行線程數會減一,會調用monitor對象的notify方法,喚醒等待狀態下的線程,等待狀態下的線程會競爭monitor鎖,競爭到,會繼續執行線程任務。ide
protected void afterAccess() {
if (this.concurrencyLimit >= 0) {
synchronized (this.monitor) {
this.concurrencyCount--;
if (logger.isDebugEnabled()) {
logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
}
this.monitor.notify();
}
}
}
複製代碼
最終的解決辦法:使用自定義線程池學習
@Configuration
@Slf4j
public class AppConfig implements AsyncConfigurer {
public static final String
ASYNC_EXECUTOR_NAME = "asyncExecutor";
@Bean(name = ASYNC_EXECUTOR_NAME)
@Override
public Executor getAsyncExecutor() {
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
//核心線程數,默認值爲1
threadPoolTaskExecutor.setCorePoolSize(10);
//最大線程數
threadPoolTaskExecutor.setMaxPoolSize(20);
//緩存隊列
threadPoolTaskExecutor.setQueueCapacity(256);
//超出核心線程數以外的線程在空閒時間最大的存活時間
threadPoolTaskExecutor.setKeepAliveSeconds(60);
threadPoolTaskExecutor.setTaskDecorator(new ContextDecorator());
//線程的前綴
threadPoolTaskExecutor.setThreadNamePrefix("AsyncThread-");
//是否等待全部的線程關閉以後採起關閉線程池,默認是false
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
//等待時長
threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
//拒絕策略
threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
threadPoolTaskExecutor.initialize();
return threadPoolTaskExecutor;
}
/**
* <h2>定義異步任務異常處理類</h2>
*
* @return
*/
@Override
public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
return new AsyncExceptionHandler();
}
class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {
@Override
public void handleUncaughtException(Throwable ex, Method method, Object... params) {
log.info("AsycError:{},Method :{},Param:{}", ex.getMessage(), method.getName(), JSON.toJSON(params));
ex.printStackTrace();
//TODO 發送郵件或者發送短信
}
}
}
複製代碼
就不會出現一直建立Thread的狀況,致使OOM。ui