Spring@Async異步線程的用法

最近在代碼中使用@Async來進行異步抄單,進行壓測時出現了OOM問題,經過日誌信息看到了:unable to create new native threadreact

1、內存溢出的三種類型:

  • 一、第一種OutOfMemoryError:PermGen space,發生這種問題的緣由是程序中使用了大量的jar或class
  • 二、第二種OutOfMemoryError:Java heap space ,發生這種問題的緣由是Java虛擬機建立的對象太多
  • 三、第三種OutOfMemoryError:unable to create new native thread,建立線程太多,佔用內存多大

2、問題分析

初步分析

初步懷疑是建立的線程太多致使的,使用jstack 線程PID 分析打印的日誌,發現大量線程處於Runnable狀態,基本能夠肯定是建立線程太多致使的。緩存

代碼分析

出問題的服務是報案的服務,在進行抄單時在方法上使用了@Asycn這個註解,進行異步抄單,經過trace-log日誌能夠看出建立了不少的Thread,通過簡單的瞭解@Async異步配置使用的是SimpleAsyncTaskExecutor,該線程池默認來一個任務建立一個線程,在壓測的狀況下,會有大量的請求去抄單,這時會不斷建立大量的線程,極有可能壓爆服務器內存。bash

藉此機會學習一下SimpleAsyncTaskExecutor的源碼,SimpleAsyncTaskExecutor提供了限流機制,經過concurrencyLimit屬性來控制開關,當concurrencyLimit>=0時開啓限流機制,默認關閉限流機制即concurrencyLimit=-1,當關閉狀況下,會不斷建立新的線程來處理任務,核心代碼以下:服務器

@Override
	public void execute(Runnable task, long startTimeout) {
		Assert.notNull(task, "Runnable must not be null");
		Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
		if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
			this.concurrencyThrottle.beforeAccess();
			doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
		}
		else {
			doExecute(taskToUse);
		}
	}
複製代碼

SimpleAsyncTaskExecutor限流實現:異步

首選任務進來,會循環判斷當前執行線程數是否超過concurrencyLimit,若是超了,則當前線程調用wait方法,釋放monitor對象鎖,進入等待async

protected void beforeAccess() {
		if (this.concurrencyLimit == NO_CONCURRENCY) {
			throw new IllegalStateException(
					"Currently no invocations allowed - concurrency limit set to NO_CONCURRENCY");
		}
		if (this.concurrencyLimit > 0) {
			boolean debug = logger.isDebugEnabled();
			synchronized (this.monitor) {
				boolean interrupted = false;
				while (this.concurrencyCount >= this.concurrencyLimit) {
					if (interrupted) {
						throw new IllegalStateException("Thread was interrupted while waiting for invocation access, " +
								"but concurrency limit still does not allow for entering");
					}
					if (debug) {
						logger.debug("Concurrency count " + this.concurrencyCount +
								" has reached limit " + this.concurrencyLimit + " - blocking");
					}
					try {
						this.monitor.wait();
					}
					catch (InterruptedException ex) {
						// Re-interrupt current thread, to allow other threads to react.
						Thread.currentThread().interrupt();
						interrupted = true;
					}
				}
				if (debug) {
					logger.debug("Entering throttle at concurrency count " + this.concurrencyCount);
				}
				this.concurrencyCount++;
			}
		}
	}
複製代碼

線程任務執行完畢後,當執行線程數會減一,會調用monitor對象的notify方法,喚醒等待狀態下的線程,等待狀態下的線程會競爭monitor鎖,競爭到,會繼續執行線程任務。ide

protected void afterAccess() {
		if (this.concurrencyLimit >= 0) {
			synchronized (this.monitor) {
				this.concurrencyCount--;
				if (logger.isDebugEnabled()) {
					logger.debug("Returning from throttle at concurrency count " + this.concurrencyCount);
				}
				this.monitor.notify();
			}
		}
	}
複製代碼

最終的解決辦法:使用自定義線程池學習

@Configuration
@Slf4j
public class AppConfig implements AsyncConfigurer {
    public static final String
            ASYNC_EXECUTOR_NAME = "asyncExecutor";


    @Bean(name = ASYNC_EXECUTOR_NAME)
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        //核心線程數,默認值爲1
        threadPoolTaskExecutor.setCorePoolSize(10);
        //最大線程數
        threadPoolTaskExecutor.setMaxPoolSize(20);
        //緩存隊列
        threadPoolTaskExecutor.setQueueCapacity(256);
        //超出核心線程數以外的線程在空閒時間最大的存活時間
        threadPoolTaskExecutor.setKeepAliveSeconds(60);
        threadPoolTaskExecutor.setTaskDecorator(new ContextDecorator());
        //線程的前綴
        threadPoolTaskExecutor.setThreadNamePrefix("AsyncThread-");
        //是否等待全部的線程關閉以後採起關閉線程池,默認是false
        threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
        //等待時長
        threadPoolTaskExecutor.setAwaitTerminationSeconds(60);
        //拒絕策略
        threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }


    /**
     * <h2>定義異步任務異常處理類</h2>
     *
     * @return
     */
    @Override
    public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
        return new AsyncExceptionHandler();
    }

    class AsyncExceptionHandler implements AsyncUncaughtExceptionHandler {

        @Override
        public void handleUncaughtException(Throwable ex, Method method, Object... params) {
            log.info("AsycError:{},Method :{},Param:{}", ex.getMessage(), method.getName(), JSON.toJSON(params));
            ex.printStackTrace();
            //TODO 發送郵件或者發送短信
        }
    }
}
複製代碼

就不會出現一直建立Thread的狀況,致使OOM。ui

相關文章
相關標籤/搜索