@Async異步任務與線程池

寫在前面:本篇文章是關於使用@Async進行異步任務,而且關於線程池作了一個初步的梳理和總結,包括遇到過的一些坑html

在工做中用到的一些線程池

如下代碼已作脫敏處理java

1.newCachedThreadPoolredis

private void startTask(List<String> usersList){
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.submit(()->{
		//do someting
        });
    }

複製代碼

2.newScheduledThreadPool算法

@Configuration
public class ScheduleConfig implements SchedulingConfigurer {

    @Override
    public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
        //固然了,這裏設置的線程池是corePoolSize也是很關鍵了,本身根據業務需求設定
        taskRegistrar.setScheduler(Executors.newScheduledThreadPool(10));
    }

}

複製代碼

若是在idea中安裝了阿里規範插件,就會發現上面兩種建立線程池的方式都會報紅。緣由是:spring

線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。 說明:Executors返回的線程池對象的弊端以下:數據庫

  1. FixedThreadPool和SingleThreadPool:json

    容許的請求隊列長度爲Integer.MAX_VALUE,可能會堆積大量的請求,從而致使OOM。c#

  2. CachedThreadPool:緩存

    容許的建立線程數量爲Integer.MAX_VALUE,可能會建立大量的線程,從而致使OOM。markdown

其實這裏CachedThreadPool和newScheduledThreadPool是同樣的,都是由於最大線程數被設置成了Integer.MAX_VALUE。

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }
複製代碼
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
複製代碼

在源碼中能夠看的出newCachedThreadPool使用的是synchronousqueue隊列,也能夠看做是一個長度爲1的BlockingQueue因此,再加上最大容許線程數爲Integer.MAX_VALUE,就致使可能會建立大量線程致使OOM。

同理ScheduledThreadPoolExecutor使用的是DelayedWorkQueue,初始化大小爲16。當隊列滿後就會建立新線程,就致使可能會建立大量線程致使OOM。

咱們不妨實際來測試一下,以newCachedThreadPool爲例,jvm參數-Xms64m -Xmx192m -Xss1024K -XX:MetaspaceSize=64m -XX:MaxMetaspaceSize=128m。

@PostMapping("/newCachedThreadPoolExample")
    @ResponseBody
    public void newCachedThreadPoolExample(){
        ExecutorService executorService = Executors.newCachedThreadPool();
        while (true){
            executorService.submit(()->{
                log.info("submit:"+LocalDateTime.now());
                try {
                    Thread.sleep(1000);
                }catch (InterruptedException e){
                    e.printStackTrace();
                }
            });
        }

    }

複製代碼

剛啓動時的狀況:

x1.png

請求接口後就開始爆炸

x2.png 而後就開始卡着不動了

x3.png

比較尷尬的是一直沒有出現報錯OOM的狀況,就直接卡死了。

x4.png 總結

以上的線程池雖然能夠在外部限制的狀況下避免OOM等狀況,可是仍是建議儘可能根據本身的業務狀況自定義線程池。

使用@Async快速建立一個異步任務

1. application.yml

這裏是線程池相關配置,先不詳細說,同理能夠在代碼裏面配置config。

線程池緩衝隊列的選擇

以上發生的問題大多數都和線程池的緩衝隊列有關,選擇一個符合本身業務特色的緩衝隊列也十分重要。

x5'.png

spring:
  task:
    execution:
      pool:
        # 最大線程數
        max-size: 16
        # 核心線程數
        core-size: 16
        # 存活時間
        keep-alive: 10s
        # 隊列大小
        queue-capacity: 100
        # 是否容許核心線程超時
        allow-core-thread-timeout: true
      # 線程名稱前綴
      thread-name-prefix: async-task-

複製代碼

2.ThreadpoolApplication

這裏須要在 Application上添加 @EnableAsync註解,開啓異步任務。若是是選擇在代碼裏面寫config,則須要在config文件上添加@EnableAsync註解。

@EnableAsync
@SpringBootApplication
public class ThreadpoolApplication {

    public static void main(String[] args) {
        SpringApplication.run(ThreadpoolApplication.class, args);
    }

}
複製代碼

3.AsyncTask

編寫一個異步任務處理類,在須要開啓異步的方法上面添加@Async

@Component
@Slf4j
public class AsyncTask {
    @Async
    public void asyncRun() throws InterruptedException {
        Thread.sleep(10);
        log.info(Thread.currentThread().getName()+":處理完成");
    }
}

複製代碼

4.AsyncService

編寫一個調用異步方法的service

@Service
@Slf4j
public class AsyncService {
    @Autowired
    private AsyncTask asyncTask;

    public void asyncSimpleExample() {
        try {
            log.info("service start");
            asyncTask.asyncRun();
            log.info("service end");
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }


}
複製代碼

5.AsyncController

編寫一個Controller去調用AsyncService

/** * @author kurtl */
@Controller
@RequestMapping("/")
public class AsyncController {
    @Autowired
    private AsyncService asyncService;
    @PostMapping("/asyncSimpleExample")
    @ResponseBody
    public void asyncSimpleExample(){
        asyncService.asyncSimpleExample();
    }
}

複製代碼

最後請求這個接口

x6.png

能夠看到,先輸出了asyncSimpleExample裏面打印的service start與service end,表示service方法先執行完畢了,而異步方法則在調用後進行了一個sleep,service沒有同步等待sleep完成,而是直接返回,表示這個是異步任務。至此咱們已經經過@Async成功建立的異步任務。

關於@Async和@EnableAsync的原理

我的以爲源碼中很重要的一部分就是源碼中的註釋,閱讀註釋也能夠幫你快速瞭解源碼的做用等,全部我會把重要的註釋稍微翻譯一下

1.@Async源碼

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface Async {

	/** * A qualifier value for the specified asynchronous operation(s). * <p>May be used to determine the target executor to be used when executing * the asynchronous operation(s), matching the qualifier value (or the bean * name) of a specific {@link java.util.concurrent.Executor Executor} or * {@link org.springframework.core.task.TaskExecutor TaskExecutor} * bean definition. * <p>When specified on a class-level {@code @Async} annotation, indicates that the * given executor should be used for all methods within the class. Method-level use * of {@code Async#value} always overrides any value set at the class level. * @since 3.1.2 */

    /** * 在這些註釋中有三個很是重要的部分 * 1.使用@Async的方法只能返回Void 或者 Future類型 * 2.代表了@Async是經過org.springframework.core.task.TaskExecutor * 或者java.util.concurrent.Executor來建立線程池 * 3.寫了@Async的做用範圍 在類上使用@Async會覆蓋方法上的@Async */

	String value() default "";

}

複製代碼

2.@EnableAsync源碼

/** * Enables Spring's asynchronous method execution capability, similar to functionality * found in Spring's {@code <task:*>} XML namespace. * * <p>To be used together with @{@link Configuration Configuration} classes as follows, * enabling annotation-driven async processing for an entire Spring application context: * * <pre class="code"> * &#064;Configuration * &#064;EnableAsync * public class AppConfig { * * }</pre> * 這裏表示須要聯合@Configuration註解一塊兒使用,因此@EnableAsync應該 * 添加在線程池Config或者SpringBootApplication 上 * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous} * annotation, or any custom annotation specified via the {@link #annotation} attribute. * The aspect is added transparently for any registered bean, for instance via this * configuration: * * <pre class="code"> * &#064;Configuration * public class AnotherAppConfig { * * &#064;Bean * public MyAsyncBean asyncBean() { * return new MyAsyncBean(); * } * }</pre> * * <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} * 默認狀況下spring會先搜索TaskExecutor類型的bean或者名字爲 * taskExecutor的Executor類型的bean,都不存在使用 * SimpleAsyncTaskExecutor執行器可是這個SimpleAsyncTaskExecutor實際 * 上是有很大的坑的,建議是自定義一個線程池,這個後面會說 * will be used to process async method invocations. Besides, annotated methods having * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @author Sam Brannen * @since 3.1 * @see Async * @see AsyncConfigurer * @see AsyncConfigurationSelector */
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

	/** * Indicate the 'async' annotation type to be detected at either class * or method level. * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. * <p>This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. */
	Class<? extends Annotation> annotation() default Annotation.class;

	/** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>. * <p>The default is {@code false}. * <p>Note that setting this attribute to {@code true} will affect <em>all</em> * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another &mdash; for example, in tests. * * 這個字段用來表示,是否要建立基於CGLIB的代理,實際上在高版本 * 的spring 上(大概3.x)是自動選擇使用jdk動態代理仍是CGLIB. * 設置爲true時,其它spring管理的bean也會升級到CGLIB代理 */
	boolean proxyTargetClass() default false;

	/** * Indicate how async advice should be applied. * <p><b>The default is {@link AdviceMode#PROXY}.</b> * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. * 這個字段用來標識異步通知的模式,默認PROXY,當這個字段爲 * PROXY的時候,在同一個類中,非異步方法調用異步方法,會致使異 * 步不生效,相反若是,想實現同一個類非異步方法調用異步方法就應 * 該設置爲ASPECTJ */
	AdviceMode mode() default AdviceMode.PROXY;

	/** * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. * 標明異步註解bean處理器應該遵循的執行順序,默認最低的優先級 *(Integer.MAX_VALUE,值越小優先級越高) */
	int order() default Ordered.LOWEST_PRECEDENCE;

}


複製代碼

在上面的源碼中,其實最核心的代碼只有一句,@Import(AsyncConfigurationSelector.class),引入了相關的配置。

/** * Selects which implementation of {@link AbstractAsyncConfiguration} should * be used based on the value of {@link EnableAsync#mode} on the importing * {@code @Configuration} class. * * @author Chris Beams * @author Juergen Hoeller * @since 3.1 * @see EnableAsync * @see ProxyAsyncConfiguration */
public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

	private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =
			"org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";


	/** * Returns {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} * for {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, * respectively. */
	/** * 這整個方法其實就是一個選擇器和ImportSelector接口的selectImports()方法很像,基於不一樣的代理模式,加載不一樣的配置類 */
	@Override
	@Nullable
	public String[] selectImports(AdviceMode adviceMode) {

		switch (adviceMode) {
			case PROXY:
				return new String[] {ProxyAsyncConfiguration.class.getName()};
			case ASPECTJ:
				return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};
			default:
				return null;
		}
	}

}

複製代碼

接下來咱們看看默認的ProxyAsyncConfiguration.class

/** * {@code @Configuration} class that registers the Spring infrastructure beans necessary * to enable proxy-based asynchronous method execution. * * @author Chris Beams * @author Stephane Nicoll * @author Juergen Hoeller * @since 3.1 * @see EnableAsync * @see AsyncConfigurationSelector */
@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
//繼承了AbstractAsyncConfiguration類
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

	@Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
	@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
	public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
		Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
		//初始化AsyncAnnotationBeanPostProcessor類型的bean
		AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
		//設置執行器和異常處理器
		bpp.configure(this.executor, this.exceptionHandler);
		Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
		//設置annotation
		if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
			bpp.setAsyncAnnotationType(customAsyncAnnotation);
		}
		//設置註解屬性
		bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
		bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
		return bpp;
	}

}



複製代碼

這一個類繼承了AbstractAsyncConfiguration類,其實也就作了一件事初始化AsyncAnnotationBeanPostProcessor,@Async註解的就是經過AsyncAnnotationBeanPostProcessor這個後置處理器生成一個代理對象來實現異步的,咱們先看繼承的config。

/** * Abstract base {@code Configuration} class providing common structure for enabling * Spring's asynchronous method execution capability. * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @since 3.1 * @see EnableAsync */
@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {

	@Nullable
	protected AnnotationAttributes enableAsync; //;//enableAsync的註解屬性

	@Nullable
	protected Supplier<Executor> executor; //線程執行器

	@Nullable
	protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler; //異常處理器 和上面的代碼對應


	@Override
	//設置註解屬性
	public void setImportMetadata(AnnotationMetadata importMetadata) {
		this.enableAsync = AnnotationAttributes.fromMap(
				importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));
		if (this.enableAsync == null) {
			throw new IllegalArgumentException(
					"@EnableAsync is not present on importing class " + importMetadata.getClassName());
		}
	}

	/** * Collect any {@link AsyncConfigurer} beans through autowiring. */
	@Autowired(required = false)
	//設置執行器和異常處理器
	void setConfigurers(Collection<AsyncConfigurer> configurers) {
		if (CollectionUtils.isEmpty(configurers)) {
			return;
		}
		if (configurers.size() > 1) {
			throw new IllegalStateException("Only one AsyncConfigurer may exist");
		}
		AsyncConfigurer configurer = configurers.iterator().next();
		this.executor = configurer::getAsyncExecutor;
		this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;
	}

}


複製代碼

整個代碼的結構其實很是明確,咱們回到上一個類,看他設置的bean AsyncAnnotationBeanPostProcessor。這個bean很複雜,因此乾脆先生成類圖。弄清楚baen的生命週期。AsyncAnnotationBeanPostProcessor是一個後置處理器,因此咱們先找父類AbstractAdvisingBeanPostProcessor中。

x9.png

/** * Base class for {@link BeanPostProcessor} implementations that apply a * Spring AOP {@link Advisor} to specific beans. * * @author Juergen Hoeller * @since 3.2 */
@SuppressWarnings("serial")
public abstract class AbstractAdvisingBeanPostProcessor extends ProxyProcessorSupport implements BeanPostProcessor {

	@Nullable
	protected Advisor advisor;

	protected boolean beforeExistingAdvisors = false;

	private final Map<Class<?>, Boolean> eligibleBeans = new ConcurrentHashMap<>(256);



	public void setBeforeExistingAdvisors(boolean beforeExistingAdvisors) {
		this.beforeExistingAdvisors = beforeExistingAdvisors;
	}


	@Override
	public Object postProcessBeforeInitialization(Object bean, String beanName) {
		return bean;
	}

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) {
		 // 沒有通知,或者是AopInfrastructureBean,那麼不進行代理
		if (this.advisor == null || bean instanceof AopInfrastructureBean) {
			// Ignore AOP infrastructure such as scoped proxies.
			return bean;
		}
		// 添加advisor
		if (bean instanceof Advised) {
			Advised advised = (Advised) bean;
			if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {
				// Add our local Advisor to the existing proxy's Advisor chain...
				// 這裏經過beforeExistingAdvisors決定是將通知添加到全部通知以前仍是添加到全部通知以後
				// 默認false 在@Async中被設置爲true
				if (this.beforeExistingAdvisors) {
					advised.addAdvisor(0, this.advisor);
				}
				else {
					advised.addAdvisor(this.advisor);
				}
				return bean;
			}
		}
		//構造ProxyFactory代理工廠
		if (isEligible(bean, beanName)) {
			ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);
			//添加代理的接口
			if (!proxyFactory.isProxyTargetClass()) {
				evaluateProxyInterfaces(bean.getClass(), proxyFactory);
			}
			//設置切面
			proxyFactory.addAdvisor(this.advisor);
			customizeProxyFactory(proxyFactory);
			//返回代理類
			return proxyFactory.getProxy(getProxyClassLoader());
		}

		// No proxy needed.
		return bean;
	}

	//isEligible用於判斷這個類或者這個類中的某個方法是否含有註解
	protected boolean isEligible(Object bean, String beanName) {
		return isEligible(bean.getClass());
	}


}


複製代碼

在上面代碼中能夠看出來,proxyFactory.addAdvisor(this.advisor);這裏持有一個AsyncAnnotationAdvisor類的對象advisor:buildAdvice()方法生成通知,buildPointcut生成切點。定位到這個類的buildPointcut方法中,看看他的切點匹配規則。

@SuppressWarnings("serial")
public class AsyncAnnotationAdvisor extends AbstractPointcutAdvisor implements BeanFactoryAware {

	private Advice advice;

	private Pointcut pointcut;


	/** * Create a new {@code AsyncAnnotationAdvisor} for bean-style configuration. */
	public AsyncAnnotationAdvisor() {
		this((Supplier<Executor>) null, (Supplier<AsyncUncaughtExceptionHandler>) null);
	}


	@SuppressWarnings("unchecked")
	public AsyncAnnotationAdvisor( @Nullable Executor executor, @Nullable AsyncUncaughtExceptionHandler exceptionHandler) {

		this(SingletonSupplier.ofNullable(executor), SingletonSupplier.ofNullable(exceptionHandler));
	}


	@SuppressWarnings("unchecked")
	public AsyncAnnotationAdvisor( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		Set<Class<? extends Annotation>> asyncAnnotationTypes = new LinkedHashSet<>(2);
		asyncAnnotationTypes.add(Async.class);
		try {
			asyncAnnotationTypes.add((Class<? extends Annotation>)
					ClassUtils.forName("javax.ejb.Asynchronous", AsyncAnnotationAdvisor.class.getClassLoader()));
		}
		catch (ClassNotFoundException ex) {
			// If EJB 3.1 API not present, simply ignore.
		}
		this.advice = buildAdvice(executor, exceptionHandler);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}



	public void setAsyncAnnotationType(Class<? extends Annotation> asyncAnnotationType) {
		Assert.notNull(asyncAnnotationType, "'asyncAnnotationType' must not be null");
		Set<Class<? extends Annotation>> asyncAnnotationTypes = new HashSet<>();
		asyncAnnotationTypes.add(asyncAnnotationType);
		this.pointcut = buildPointcut(asyncAnnotationTypes);
	}

	/** * Set the {@code BeanFactory} to be used when looking up executors by qualifier. */
	@Override
	public void setBeanFactory(BeanFactory beanFactory) {
		if (this.advice instanceof BeanFactoryAware) {
			((BeanFactoryAware) this.advice).setBeanFactory(beanFactory);
		}
	}


	@Override
	public Advice getAdvice() {
		return this.advice;
	}

	@Override
	public Pointcut getPointcut() {
		return this.pointcut;
	}

	//構建通知,一個簡單的攔截器
	protected Advice buildAdvice( @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {

		AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);
		interceptor.configure(executor, exceptionHandler);
		return interceptor;
	}


	protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
		ComposablePointcut result = null;
		for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {
			// 根據cpc和mpc 匹配器進行匹配
			//檢查類上是否有@Async註解
			Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);
			//檢查方法是是否有@Async註解。
			Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);
			if (result == null) {
				result = new ComposablePointcut(cpc);
			}
			else {
				result.union(cpc);
			}
			result = result.union(mpc);
		}
		return (result != null ? result : Pointcut.TRUE);
	}

}


複製代碼

再找到它的通知邏輯buildAdvice,就是一個攔截器,生成AnnotationAsyncExecutionInterceptor對象,對於Interceptor,關注它的核心方法invoke就好了。它的父類AsyncExecutionInterceptor重寫了AsyncExecutionInterceptor接口的invoke方法。代碼以下

public class AsyncExecutionInterceptor extends AsyncExecutionAspectSupport implements MethodInterceptor, Ordered {


	public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor) {
		super(defaultExecutor);
	}

	public AsyncExecutionInterceptor(@Nullable Executor defaultExecutor, AsyncUncaughtExceptionHandler exceptionHandler) {
		super(defaultExecutor, exceptionHandler);
	}



	@Override
	@Nullable
	//
	public Object invoke(final MethodInvocation invocation) throws Throwable {
		Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
		Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
		final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
		// 獲取到一個線程池
		AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
		if (executor == null) {
			throw new IllegalStateException(
					"No executor specified and no default executor set on AsyncExecutionInterceptor either");
		}
		 // 而後將這個方法封裝成一個 Callable對象傳入到線程池中執行
		Callable<Object> task = () -> {
			try {
				Object result = invocation.proceed();
				if (result instanceof Future) {
					//阻塞等待執行完畢獲得結果
					return ((Future<?>) result).get();
				}
			}
			catch (ExecutionException ex) {
				handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
			}
			catch (Throwable ex) {
				handleError(ex, userDeclaredMethod, invocation.getArguments());
			}
			return null;
		};

		return doSubmit(task, executor, invocation.getMethod().getReturnType());
	}


	@Override
	@Nullable
	protected String getExecutorQualifier(Method method) {
		return null;
	}


	@Override
	@Nullable
	protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
		Executor defaultExecutor = super.getDefaultExecutor(beanFactory);
		return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor());
	}

	@Override
	public int getOrder() {
		return Ordered.HIGHEST_PRECEDENCE;
	}

}


複製代碼

能夠看到,invoke首先是包裝了一個Callable的對象,而後傳入doSubmit,因此代碼的核心就在doSubmit這個方法中。

@Nullable
	protected Object doSubmit(Callable<Object> task, AsyncTaskExecutor executor, Class<?> returnType) {
	//先判斷是否存在CompletableFuture這個類,優先使用CompletableFuture執行任務
		if (CompletableFuture.class.isAssignableFrom(returnType)) {
			return CompletableFuture.supplyAsync(() -> {
				try {
					return task.call();
				}
				catch (Throwable ex) {
					throw new CompletionException(ex);
				}
			}, executor);
		}
		else if (ListenableFuture.class.isAssignableFrom(returnType)) {
			return ((AsyncListenableTaskExecutor) executor).submitListenable(task);
		}
		else if (Future.class.isAssignableFrom(returnType)) {
			return executor.submit(task);
		}
		else {
			executor.submit(task);
			return null;
		}
	}

複製代碼

這裏主要是判斷不一樣的返回值,最終都走進了submit方法,而submit根據線程池的不一樣,其實現也有區別,下面是SimpleAsyncTaskExecutor的實現方式。

/** * Template method for the actual execution of a task. * <p>The default implementation creates a new Thread and starts it. * @param task the Runnable to execute * @see #setThreadFactory * @see #createThread * @see java.lang.Thread#start() */
	protected void doExecute(Runnable task) {
		Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
		thread.start();
	}


複製代碼

@Async的默認線程池

1.使用@Async必定要定義線程池

在上面的源碼中寫的很清楚,默認狀況下spring會先搜索TaskExecutor類型的bean或者名字爲taskExecutor的Executor類型的bean,都不存在使 SimpleAsyncTaskExecutor執行器。可是這個SimpleAsyncTaskExecutor不是真的線程池,這個類不重用線程,每次調用都會建立一個新的線程。頗有可能致使OOM。

@SuppressWarnings("serial")
public class SimpleAsyncTaskExecutor extends CustomizableThreadCreator implements AsyncListenableTaskExecutor, Serializable {

	/** * Permit any number of concurrent invocations: that is, don't throttle concurrency. * @see ConcurrencyThrottleSupport#UNBOUNDED_CONCURRENCY */
	public static final int UNBOUNDED_CONCURRENCY = ConcurrencyThrottleSupport.UNBOUNDED_CONCURRENCY;

	/** * Switch concurrency 'off': that is, don't allow any concurrent invocations. * @see ConcurrencyThrottleSupport#NO_CONCURRENCY */
	public static final int NO_CONCURRENCY = ConcurrencyThrottleSupport.NO_CONCURRENCY;


	/** Internal concurrency throttle used by this executor. */
	private final ConcurrencyThrottleAdapter concurrencyThrottle = new ConcurrencyThrottleAdapter();

	@Nullable
	private ThreadFactory threadFactory;

	@Nullable
	private TaskDecorator taskDecorator;


	/** * Create a new SimpleAsyncTaskExecutor with default thread name prefix. */
	public SimpleAsyncTaskExecutor() {
		super();
	}

	/** * Create a new SimpleAsyncTaskExecutor with the given thread name prefix. * @param threadNamePrefix the prefix to use for the names of newly created threads */
	public SimpleAsyncTaskExecutor(String threadNamePrefix) {
		super(threadNamePrefix);
	}

	/** * Create a new SimpleAsyncTaskExecutor with the given external thread factory. * @param threadFactory the factory to use for creating new Threads */
	public SimpleAsyncTaskExecutor(ThreadFactory threadFactory) {
		this.threadFactory = threadFactory;
	}


	/** * Specify an external factory to use for creating new Threads, * instead of relying on the local properties of this executor. * <p>You may specify an inner ThreadFactory bean or also a ThreadFactory reference * obtained from JNDI (on a Java EE 6 server) or some other lookup mechanism. * @see #setThreadNamePrefix * @see #setThreadPriority */
	public void setThreadFactory(@Nullable ThreadFactory threadFactory) {
		this.threadFactory = threadFactory;
	}

	/** * Return the external factory to use for creating new Threads, if any. */
	@Nullable
	public final ThreadFactory getThreadFactory() {
		return this.threadFactory;
	}


	public final void setTaskDecorator(TaskDecorator taskDecorator) {
		this.taskDecorator = taskDecorator;
	}


	//這裏能夠設置最大線程數,經過限流去限制線程數
	public void setConcurrencyLimit(int concurrencyLimit) {
		this.concurrencyThrottle.setConcurrencyLimit(concurrencyLimit);
	}

	/** * Return the maximum number of parallel accesses allowed. */
	public final int getConcurrencyLimit() {
		return this.concurrencyThrottle.getConcurrencyLimit();
	}

	/** * Return whether this throttle is currently active. * @return {@code true} if the concurrency limit for this instance is active * @see #getConcurrencyLimit() * @see #setConcurrencyLimit */
	public final boolean isThrottleActive() {
		return this.concurrencyThrottle.isThrottleActive();
	}


	/** * Executes the given task, within a concurrency throttle * if configured (through the superclass's settings). * @see #doExecute(Runnable) */
	@Override
	public void execute(Runnable task) {
		execute(task, TIMEOUT_INDEFINITE);
	}

	/** * Executes the given task, within a concurrency throttle * if configured (through the superclass's settings). * <p>Executes urgent tasks (with 'immediate' timeout) directly, * bypassing the concurrency throttle (if active). All other * tasks are subject to throttling. * @see #TIMEOUT_IMMEDIATE * @see #doExecute(Runnable) */
	//
	@Override
	public void execute(Runnable task, long startTimeout) {
		Assert.notNull(task, "Runnable must not be null");
		Runnable taskToUse = (this.taskDecorator != null ? this.taskDecorator.decorate(task) : task);
		if (isThrottleActive() && startTimeout > TIMEOUT_IMMEDIATE) {
			this.concurrencyThrottle.beforeAccess();
			doExecute(new ConcurrencyThrottlingRunnable(taskToUse));
		}
		else {
			doExecute(taskToUse);
		}
	}

	@Override
	public Future<?> submit(Runnable task) {
		FutureTask<Object> future = new FutureTask<>(task, null);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

	@Override
	public <T> Future<T> submit(Callable<T> task) {
		FutureTask<T> future = new FutureTask<>(task);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

	@Override
	public ListenableFuture<?> submitListenable(Runnable task) {
		ListenableFutureTask<Object> future = new ListenableFutureTask<>(task, null);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

	@Override
	public <T> ListenableFuture<T> submitListenable(Callable<T> task) {
		ListenableFutureTask<T> future = new ListenableFutureTask<>(task);
		execute(future, TIMEOUT_INDEFINITE);
		return future;
	}

	/** * Template method for the actual execution of a task. * <p>The default implementation creates a new Thread and starts it. * @param task the Runnable to execute * @see #setThreadFactory * @see #createThread * @see java.lang.Thread#start() */
	//判斷是否有工廠,沒有的話調用父類建立線程
	protected void doExecute(Runnable task) {
		Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
		thread.start();
	}


	/** * Subclass of the general ConcurrencyThrottleSupport class, * making {@code beforeAccess()} and {@code afterAccess()} * visible to the surrounding class. */
	private static class ConcurrencyThrottleAdapter extends ConcurrencyThrottleSupport {

		@Override
		protected void beforeAccess() {
			super.beforeAccess();
		}

		@Override
		protected void afterAccess() {
			super.afterAccess();
		}
	}


	/** * This Runnable calls {@code afterAccess()} after the * target Runnable has finished its execution. */
	private class ConcurrencyThrottlingRunnable implements Runnable {

		private final Runnable target;

		public ConcurrencyThrottlingRunnable(Runnable target) {
			this.target = target;
		}

		@Override
		public void run() {
			try {
				this.target.run();
			}
			finally {
				concurrencyThrottle.afterAccess();
			}
		}
	}

}

複製代碼

最主要的就是這段代碼

/** * Template method for the actual execution of a task. * <p>The default implementation creates a new Thread and starts it. * @param task the Runnable to execute * @see #setThreadFactory * @see #createThread * @see java.lang.Thread#start() */
	//判斷是否有工廠,沒有的話調用父類建立線程
	protected void doExecute(Runnable task) {
		Thread thread = (this.threadFactory != null ? this.threadFactory.newThread(task) : createThread(task));
		thread.start();
	}

複製代碼

這裏並非用線程池,而是直接建立新的線程,因此會大量建立線程致使OOM。其實這個類是能夠經過setConcurrencyLimit設置最大線程數,經過synchronized和wati and notify去進行限流,這裏不展開講。因此結論是在使用@Async必定要設置線程池。

@Async異步失效

如下代碼已作脫敏處理

在看公司代碼的時候,發現這樣一段代碼

public UserVO saveUser(HttpServletRequest request, String source) {
        String token = RequestUtils.getToken(request);
        String uid = checkUserLoginReturnUid(token);
        log.info("註冊登陸, token : {}, uid : {}", token, uid);
        //獲取用戶信息
        User User = getLoginUser(uid);
        if(User == null){
            User = new User();
            //獲取用戶信息
            Map<String,String> userMap = redisTemplateMain.getUserMapByToken(token);
            //保存用戶
            saveUser(User, userMap, source);
            sendUserSystem(Integer.valueOf(userMap.get("id")));
        }
        //用戶信息放進緩存
        setAuth2Redis(User);
        return setUser2Redis(User);
    }


    //通知用戶系統,咱們這邊成功註冊了一個用戶
    @Async
    public void sendUserSystem(Integer userId){
        Map<String,Object> map = new HashMap<>();
        map.put("mainUid", userId);
        map.put("source", "");
        String json = HttpUtil.post(property.userRegisterSendSystem, map);
        log.info("sendUserSystem userId : {}, json : {}", userId, json);
    }

複製代碼

在以前咱們看源碼的時候已經知道了,因爲@Async的AdviceMode默認爲PROXY,因此當調用方和被調用方是在同一個類中,沒法產生切面,@Async沒有被Spring容器管理。 因此這個方法跑了這麼久一直是同步。

咱們能夠寫一個方法去測試一下。

public void asyncInvalid() {
        try {
            log.info("service start");
            asyncInvalidExample();
            log.info("service end");
        }catch (InterruptedException e){
            e.printStackTrace();
        }
    }


    @Async
    public void asyncInvalidExample() throws InterruptedException{
        Thread.sleep(10);
        log.info(Thread.currentThread().getName()+":處理完成");
    }


複製代碼

調用結果很明顯,沒有進行異步操做,而是同步。

x7.1.png

線程池拒絕致使線程丟失

既然線程池都已一個緩衝隊列來保存未被消費的任務,那麼就必定存在隊列被塞滿,致使線程丟失的狀況。咱們寫一段代碼模擬一下。

配置文件

spring:
  task:
    execution:
      pool:
        # 最大線程數
        max-size: 16
        # 核心線程數
        core-size: 16
        # 存活時間
        keep-alive: 10s
        # 隊列大小
        queue-capacity: 100
        # 是否容許核心線程超時
        allow-core-thread-timeout: true
      # 線程名稱前綴
      thread-name-prefix: async-task-

複製代碼

異步方法

@Async
    public void asyncRefuseRun() throws InterruptedException {
        Thread.sleep(5000000);
    }
複製代碼

調用方法

public void asyncRefuseRun() {
        for (int t = 0;t<2000;t++){
            log.info(""+t);
            try {
                asyncTask.asyncRefuseRun();
            }catch (InterruptedException e){
                e.printStackTrace();
            }
        }
    }

複製代碼

這裏我循環了2000個線程,理論上來講當線程到達maxPoolSize + queueCapacity時會進行拒絕,也就是16+100。

x8.png

到了116的時候果真拋出了異常java.util.concurrent.RejectedExecutionException。證實線程執行了它的拒絕策略。

要理解線程池的拒絕策略,先來看看它的接口。

public interface RejectedExecutionHandler {
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}

複製代碼

當線程池出現拒絕的狀況,就會來調用你設置的拒絕策略,將當前提交的任務以及線程池實例自己傳遞給你處理。這裏建議根據本身的業務場景,去實現拒絕策略。

固然若是JDK內置的實現能夠知足當前業務,能夠直接用jdk實現的。

AbortPolicy(停止策略)

這個停止策略就是咱們剛剛演示的,觸發拒絕策略後,直接停止任務,拋出異常,這個也是ThreadPoolExecutor默認實現。

/** * A handler for rejected tasks that throws a * {@code RejectedExecutionException}. */
    public static class AbortPolicy implements RejectedExecutionHandler {
        /** * Creates an {@code AbortPolicy}. */
        public AbortPolicy() { }

        /** * Always throws RejectedExecutionException. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task * @throws RejectedExecutionException always */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            throw new RejectedExecutionException("Task " + r.toString() +
                                                 " rejected from " +
                                                 e.toString());
        }
    }


複製代碼

DiscardPolicy(丟棄策略)

/** * A handler for rejected tasks that silently discards the * rejected task. */
    public static class DiscardPolicy implements RejectedExecutionHandler {
        /** * Creates a {@code DiscardPolicy}. */
        public DiscardPolicy() { }

        /** * Does nothing, which has the effect of discarding task r. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        }
    }

複製代碼

很明顯,啥也不幹,就是一個空實現。

DiscardOldestPolicy(棄老策略)

/** * A handler for rejected tasks that discards the oldest unhandled * request and then retries {@code execute}, unless the executor * is shut down, in which case the task is discarded. */
    public static class DiscardOldestPolicy implements RejectedExecutionHandler {
        /** * Creates a {@code DiscardOldestPolicy} for the given executor. */
        public DiscardOldestPolicy() { }

        /** * Obtains and ignores the next task that the executor * would otherwise execute, if one is immediately available, * and then retries execution of task r, unless the executor * is shut down, in which case task r is instead discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                e.getQueue().poll();
                e.execute(r);
            }
        }
    }

複製代碼

若是線程池未關閉,就彈出隊列頭部的元素,而後嘗試執行。實際上仍是會丟棄任務,若是頭部元素執行失敗,就丟棄了。區別是優先丟棄的是老的元素。

CallerRunsPolicy(調用者運行策略)

/** * A handler for rejected tasks that runs the rejected task * directly in the calling thread of the {@code execute} method, * unless the executor has been shut down, in which case the task * is discarded. */
    public static class CallerRunsPolicy implements RejectedExecutionHandler {
        /** * Creates a {@code CallerRunsPolicy}. */
        public CallerRunsPolicy() { }

        /** * Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. * * @param r the runnable task requested to be executed * @param e the executor attempting to execute this task */
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (!e.isShutdown()) {
                r.run();
            }
        }
    }

複製代碼

當觸發拒絕策略時,判斷線程池有沒有關閉,沒有關閉就由提交任務的當前線程處理。可是當大量提交後就會阻塞線程,致使性能下降。

hutool中的線程池拒絕策略實現

hutool做爲咱們常用的一個工具類,也有線程池工具,咱們不如來看看它是如何實現的。

/** * 構建ThreadPoolExecutor * * @param builder {@link ExecutorBuilder} * @return {@link ThreadPoolExecutor} */
	private static ThreadPoolExecutor build(ExecutorBuilder builder) {
		final int corePoolSize = builder.corePoolSize;
		final int maxPoolSize = builder.maxPoolSize;
		final long keepAliveTime = builder.keepAliveTime;
		final BlockingQueue<Runnable> workQueue;
		if (null != builder.workQueue) {
			workQueue = builder.workQueue;
		} else {
			// corePoolSize爲0則要使用SynchronousQueue避免無限阻塞
			workQueue = (corePoolSize <= 0) ? new SynchronousQueue<>() : new LinkedBlockingQueue<>(DEFAULT_QUEUE_CAPACITY);
		}
		final ThreadFactory threadFactory = (null != builder.threadFactory) ? builder.threadFactory : Executors.defaultThreadFactory();
		RejectedExecutionHandler handler = ObjectUtil.defaultIfNull(builder.handler, new ThreadPoolExecutor.AbortPolicy());

		final ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(//
				corePoolSize, //
				maxPoolSize, //
				keepAliveTime, TimeUnit.NANOSECONDS, //
				workQueue, //
				threadFactory, //
				handler//
		);
		if (null != builder.allowCoreThreadTimeOut) {
			threadPoolExecutor.allowCoreThreadTimeOut(builder.allowCoreThreadTimeOut);
		}
		return threadPoolExecutor;
	}

複製代碼

能夠很清晰的看到,會判斷是否傳入線程池拒絕策略,若是沒有就用默認的AbortPolicy。

dubbo中的拒絕策略

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                        " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                        " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack() {
       //省略實現
    }
}

複製代碼

dubbo的策略實現主要就是想讓開發人員知道拒絕任務的狀況以及緣由。它先輸出了線程池的詳細設置參數,以及線程池當前的狀態,還有當前拒絕任務的信息。而後又輸出了當前線程堆棧詳情在dumpJStack中實現,最後拋出RejectedExecutionException。

Netty中的線程池拒絕策略

private static final class NewThreadRunsPolicy implements RejectedExecutionHandler {
        NewThreadRunsPolicy() {
            super();
        }

        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            try {
                final Thread t = new Thread(r, "Temporary task executor");
                t.start();
            } catch (Throwable e) {
                throw new RejectedExecutionException(
                        "Failed to start a new thread", e);
            }
        }
    }

複製代碼

Netty的線程池拒絕策略很像CallerRunsPolicy(調用者運行策略),都是不會直接丟棄任務而是繼續處理任務,不一樣的地方是CallerRunsPolicy(調用者運行策略)是在調用線程繼續處理,而Netty是new了一個新線程去處理。

activeMq中的線程池拒絕策略

new RejectedExecutionHandler() {
                @Override
                public void rejectedExecution(final Runnable r, final ThreadPoolExecutor executor) {
                    try {
                        executor.getQueue().offer(r, 60, TimeUnit.SECONDS);
                    } catch (InterruptedException e) {
                        throw new RejectedExecutionException("Interrupted waiting for BrokerService.worker");
                    }

                    throw new RejectedExecutionException("Timed Out while attempting to enqueue Task.");
                }
            });

複製代碼

activeMq中的策略屬於最大努力執行任務型,當觸發拒絕策略時,在嘗試一分鐘的時間從新將任務塞進任務隊列,當一分鐘超時還沒成功時,就拋出異常。

監控線程池

在開發中,咱們線程池的運行狀態,線程狀態,對咱們來講都很是重要。因此咱們應該把線程池監控起來。 咱們能夠經過擴展beforeExecute、afterExecute和terminated這三個方法去在執行前或執行後增長一些新的操做。用來記錄線程池的狀況。

方法 含義
shutdown() 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計已執行任務、正在執行任務、未執行任務數量
shutdownNow() 任務執行以前,記錄任務開始時間,startTimes這個HashMap以任務的hashCode爲key,開始時間爲值
beforeExecute(Thread t, Runnable r) 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計已執行任務、正在執行任務、未執行任務數量
afterExecute(Runnable r, Throwable t) 任務執行以後,計算任務結束時間。統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、已完成任務數量、任務總數、隊列裏緩存的任務數量、池中存在的最大線程數、最大容許的線程數、線程空閒時間、線程池是否關閉、線程池是否終止信息
package com.example.threadpool;

import lombok.extern.slf4j.Slf4j;

import java.util.Date;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;

/** * @author kurtl */
@Slf4j
public class ThreadPoolMonitor extends ThreadPoolExecutor {


    /** * 保存任務開始執行的時間,當任務結束時,用任務結束時間減去開始時間計算任務執行時間 */
    private final ConcurrentHashMap<String, Date> startTimes;

    /** * 線程池名稱,通常以業務名稱命名,方便區分 */
    private final String poolName;

    /** * 調用父類的構造方法,並初始化HashMap和線程池名稱 * * @param corePoolSize 線程池核心線程數 * @param maximumPoolSize 線程池最大線程數 * @param keepAliveTime 線程的最大空閒時間 * @param unit 空閒時間的單位 * @param workQueue 保存被提交任務的隊列 * @param poolName 線程池名稱 */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, String poolName) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
                Executors.defaultThreadFactory(), poolName);
    }


    /** * 調用父類的構造方法,並初始化HashMap和線程池名稱 * * @param corePoolSize 線程池核心線程數 * @param maximumPoolSize 線程池最大線程數 * @param keepAliveTime 線程的最大空閒時間 * @param unit 空閒時間的單位 * @param workQueue 保存被提交任務的隊列 * @param threadFactory 線程工廠 * @param poolName 線程池名稱 */
    public ThreadPoolMonitor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, String poolName) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory);
        this.startTimes = new ConcurrentHashMap<>();
        this.poolName = poolName;
    }

    /** * 線程池延遲關閉時(等待線程池裏的任務都執行完畢),統計線程池狀況 */
    @Override
    public void shutdown() {
        // 統計已執行任務、正在執行任務、未執行任務數量
        log.info("{} Going to shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        super.shutdown();
    }

    /** * 線程池當即關閉時,統計線程池狀況 */
    @Override
    public List<Runnable> shutdownNow() {
        // 統計已執行任務、正在執行任務、未執行任務數量
        log.info("{} Going to immediately shutdown. Executed tasks: {}, Running tasks: {}, Pending tasks: {}",
                this.poolName, this.getCompletedTaskCount(), this.getActiveCount(), this.getQueue().size());
        return super.shutdownNow();
    }

    /** * 任務執行以前,記錄任務開始時間 */
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        startTimes.put(String.valueOf(r.hashCode()), new Date());
    }

    /** * 任務執行以後,計算任務結束時間 */
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        Date startDate = startTimes.remove(String.valueOf(r.hashCode()));
        Date finishDate = new Date();
        long diff = finishDate.getTime() - startDate.getTime();
        // 統計任務耗時、初始線程數、核心線程數、正在執行的任務數量、
        // 已完成任務數量、任務總數、隊列裏緩存的任務數量、池中存在的最大線程數、
        // 最大容許的線程數、線程空閒時間、線程池是否關閉、線程池是否終止
        log.info("{}-pool-monitor: " +
                        "Duration: {} ms, PoolSize: {}, CorePoolSize: {}, Active: {}, " +
                        "Completed: {}, Task: {}, Queue: {}, LargestPoolSize: {}, " +
                        "MaximumPoolSize: {}, KeepAliveTime: {}, isShutdown: {}, isTerminated: {}",
                this.poolName,
                diff, this.getPoolSize(), this.getCorePoolSize(), this.getActiveCount(),
                this.getCompletedTaskCount(), this.getTaskCount(), this.getQueue().size(), this.getLargestPoolSize(),
                this.getMaximumPoolSize(), this.getKeepAliveTime(TimeUnit.MILLISECONDS), this.isShutdown(), this.isTerminated());
    }

    /** * 建立固定線程池,代碼源於Executors.newFixedThreadPool方法,這裏增長了poolName * * @param nThreads 線程數量 * @param poolName 線程池名稱 * @return ExecutorService對象 */
    public static ExecutorService newFixedThreadPool(int nThreads, String poolName) {
        return new ThreadPoolMonitor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(), poolName);
    }

    /** * 建立緩存型線程池,代碼源於Executors.newCachedThreadPool方法,這裏增長了poolName * * @param poolName 線程池名稱 * @return ExecutorService對象 */
    public static ExecutorService newCachedThreadPool(String poolName) {
        return new ThreadPoolMonitor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<>(), poolName);
    }

    /** * 生成線程池所用的線程,只是改寫了線程池默認的線程工廠,傳入線程池名稱,便於問題追蹤 */
    static class EventThreadFactory implements ThreadFactory {
        private static final AtomicInteger POOL_NUMBER = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        /** * 初始化線程工廠 * * @param poolName 線程池名稱 */
        EventThreadFactory(String poolName) {
            SecurityManager s = System.getSecurityManager();
            group = Objects.nonNull(s) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
            namePrefix = poolName + "-pool-" + POOL_NUMBER.getAndIncrement() + "-thread-";
        }

        @Override
        public Thread newThread(Runnable r) {
            Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }
}


複製代碼

線程池負載關注的核心問題是:基於當前線程池參數分配的資源夠不夠。對於這個問題,咱們能夠從事前和事中兩個角度來看。事前,線程池定義了「活躍度」這個概念,來讓用戶在發生Reject異常以前可以感知線程池負載問題,線程池活躍度計算公式爲:線程池活躍度 = activeCount/maximumPoolSize。這個公式表明當活躍線程數趨向於maximumPoolSize的時候,表明線程負載趨高。事中,也能夠從兩方面來看線程池的過載斷定條件,一個是發生了Reject異常,一個是隊列中有等待任務(支持定製閾值)。以上兩種狀況發生了都會觸發告警,告警信息會經過大象推送給服務所關聯的負責人。 ——美團技術文檔

核心線程數 最大線程數 如何配置

如何合理的配置線程池參數,比較廣泛的說法是。

IO密集型 = 2Ncpu(能夠測試後本身控制大小,2Ncpu通常沒問題)(常出現於線程中:數據庫數據交互、文件上傳下載、網絡數據傳輸等等)

計算密集型 = Ncpu(常出現於線程中:複雜算法)

而這種方案沒有考慮多線程池的狀況,實際使用上也有偏離。

x7.png

圖來自美團技術博客

因此參數的設置應該根據本身實際的應用場景定製。

多線程池的使用

通常在實際業務中,咱們會定義不一樣的線程池來處理不一樣的業務。利用咱們以前完成的ThreadPoolMonitor能夠很快的定義不一樣的線程。

ThreadPoolConfig

@EnableAsync
@Configuration
public class ThreadPoolConfig {

    @Bean
    public ThreadPoolExecutor test01(){
        return new ThreadPoolMonitor(16,32,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test01");
    }

    @Bean
    public ThreadPoolExecutor test02(){
        return new ThreadPoolMonitor(8,16,60, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100),"test02");
    }
}


複製代碼

TODO

1.動態線程池 2.基於任務的線程池監控

做者水平有限,如有錯誤遺漏,請指出。

參考文章

1.Java線程池實現原理及其在美團業務中的實踐

2.Java併發(六)線程池監控

3.一次Java線程池誤用(newFixedThreadPool)引起的線上血案和總結

相關文章
相關標籤/搜索