Spring中異步註解@Async的使用、原理及使用時可能致使的問題

前言

其實最近都在研究事務相關的內容,之因此寫這麼一篇文章是由於前面寫了一篇關於循環依賴的文章:程序員

面試必殺技,講一講Spring中的循環依賴web

而後,不少同窗碰到了下面這個問題,添加了Spring提供的一個異步註解@Async循環依賴沒法被解決了,下面是一些讀者的留言跟羣裏同窗碰到的問題:面試

image-20200719200303749
image-20200719200303749
image-20200719200244719
image-20200719200244719

本着講一個知識點就要講明白、講透徹的原則,我決定單獨寫一篇這樣的文章對@Async這個註解作一下詳細的介紹,這個註解帶來的問題遠遠不止循環依賴這麼簡單,若是對它不夠熟悉的話建議慎用。spring

文章要點

image-20200719201511174
image-20200719201511174

@Async的基本使用

這個註解的做用在於可讓被標註的方法異步執行,可是有兩個前提條件緩存

  1. 配置類上添加 @EnableAsync註解
  2. 須要異步執行的方法的所在類由Spring管理
  3. 須要異步執行的方法上添加了 @Async註解

咱們經過一個Demo體會下這個註解的做用吧服務器

第一步,配置類上開啓異步:微信

@EnableAsync
@Configuration @ComponentScan("com.dmz.spring.async") public class Config {  } 複製代碼

第二步,異步

@Component  // 這個類自己要被Spring管理
public class DmzAsyncService {   @Async // 添加註解表示這個方法要異步執行  public void testAsync(){  try {  TimeUnit.SECONDS.sleep(1);  } catch (InterruptedException e) {  e.printStackTrace();  }  System.out.println("testAsync invoked");  } } 複製代碼

第三步,測試異步執行async

public class Main {
 public static void main(String[] args) {  AnnotationConfigApplicationContext ac = new AnnotationConfigApplicationContext(Config.class);  DmzAsyncService bean = ac.getBean(DmzAsyncService.class);  bean.testAsync();  System.out.println("main函數執行完成");  } } // 程序執行結果以下: // main函數執行完成 // testAsync invoked 複製代碼

經過上面的例子咱們能夠發現,DmzAsyncService中的testAsync方法是異步執行的,那麼這背後的原理是什麼呢?咱們接着分析編輯器

原理分析

咱們在分析某一個技術的時候,最重要的事情是,必定必定要找到代碼的入口,像Spring這種都很明顯,入口一定是在@EnableAsync這個註解上面,咱們來看看這個註解幹了啥事(本文基於5.2.x版本)

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME) @Documented // 這裏是重點,導入了一個ImportSelector @Import(AsyncConfigurationSelector.class) public @interface EnableAsync {   // 這個配置可讓程序員配置須要被檢查的註解,默認狀況下檢查的就是@Async註解  Class<? extends Annotation> annotation() default Annotation.class;   // 默認使用jdk代理  boolean proxyTargetClass() default false;   // 默認使用Spring AOP  AdviceMode mode() default AdviceMode.PROXY;   // 在後續分析咱們會發現,這個註解實際往容器中添加了一個  // AsyncAnnotationBeanPostProcessor,這個後置處理器實現了Ordered接口  // 這個配置主要表明了AsyncAnnotationBeanPostProcessor執行的順序  int order() default Ordered.LOWEST_PRECEDENCE; }  複製代碼

上面這個註解作的最重要的事情就是導入了一個AsyncConfigurationSelector,這個類的源碼以下:

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {
  private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME =  "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration";   @Override  @Nullable  public String[] selectImports(AdviceMode adviceMode) {  switch (adviceMode) {  // 默認會使用SpringAOP進行代理  case PROXY:  return new String[] {ProxyAsyncConfiguration.class.getName()};  case ASPECTJ:  return new String[] {ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME};  default:  return null;  }  }  } 複製代碼

這個類的做用是像容器中註冊了一個ProxyAsyncConfiguration,這個類的繼承關係以下:

image-20200719220316319
image-20200719220316319

咱們先看下它的父類AbstractAsyncConfiguration,其源碼以下:

@Configuration
public abstract class AbstractAsyncConfiguration implements ImportAware {   @Nullable  protected AnnotationAttributes enableAsync;   @Nullable  protected Supplier<Executor> executor;   @Nullable  protected Supplier<AsyncUncaughtExceptionHandler> exceptionHandler;   // 這裏主要就是檢查將其導入的類上是否有EnableAsync註解  // 若是沒有的話就報錯  @Override  public void setImportMetadata(AnnotationMetadata importMetadata) {  this.enableAsync = AnnotationAttributes.fromMap(  importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false));  if (this.enableAsync == null) {  throw new IllegalArgumentException(  "@EnableAsync is not present on importing class " + importMetadata.getClassName());  }  }   // 將容器中配置的AsyncConfigurer注入  // 異步執行嘛,因此咱們能夠配置使用的線程池  // 另外也能夠配置異常處理器  @Autowired(required = false)  void setConfigurers(Collection<AsyncConfigurer> configurers) {  if (CollectionUtils.isEmpty(configurers)) {  return;  }  if (configurers.size() > 1) {  throw new IllegalStateException("Only one AsyncConfigurer may exist");  }  AsyncConfigurer configurer = configurers.iterator().next();  this.executor = configurer::getAsyncExecutor;  this.exceptionHandler = configurer::getAsyncUncaughtExceptionHandler;  }  } 複製代碼

再來看看ProxyAsyncConfiguration這個類的源碼

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {   @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)  @Role(BeanDefinition.ROLE_INFRASTRUCTURE)  public AsyncAnnotationBeanPostProcessor asyncAdvisor() {  AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();  // 將經過AsyncConfigurer配置好的線程池跟異常處理器設置到這個後置處理器中  bpp.configure(this.executor, this.exceptionHandler);  Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");  if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {  bpp.setAsyncAnnotationType(customAsyncAnnotation);  }  bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));  bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));  return bpp;  }  } 複製代碼

這個類自己是一個配置類,它的做用是向容器中添加一個AsyncAnnotationBeanPostProcessor。到這一步咱們基本上就能夠明白了,@Async註解的就是經過AsyncAnnotationBeanPostProcessor這個後置處理器生成一個代理對象來實現異步的,接下來咱們就具體看看AsyncAnnotationBeanPostProcessor是如何生成代理對象的,咱們主要關注一下幾點便可:

  1. 是在生命週期的哪一步完成的代理?
  2. 切點的邏輯是怎麼樣的?它會對什麼樣的類進行攔截?
  3. 通知的邏輯是怎麼樣的?是如何實現異步的?

基於上面幾個問題,咱們進行逐一分析

是在生命週期的哪一步完成的代理?

咱們抓住重點,AsyncAnnotationBeanPostProcessor是一個後置處理器器,按照咱們對Spring的瞭解,大機率是在這個後置處理器的postProcessAfterInitialization方法中完成了代理,直接定位到這個方法,這個方法位於父類AbstractAdvisingBeanPostProcessor中,具體代碼以下:

public Object postProcessAfterInitialization(Object bean, String beanName) {
 // 沒有通知,或者是AOP的基礎設施類,那麼不進行代理  if (this.advisor == null || bean instanceof AopInfrastructureBean) {  return bean;  }   // 對已經被代理的類,再也不生成代理,只是將通知添加到代理類的邏輯中  // 這裏經過beforeExistingAdvisors決定是將通知添加到全部通知以前仍是添加到全部通知以後  // 在使用@Async註解的時候,beforeExistingAdvisors被設置成了true  // 意味着整個方法及其攔截邏輯都會異步執行  if (bean instanceof Advised) {  Advised advised = (Advised) bean;  if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) {  if (this.beforeExistingAdvisors) {  advised.addAdvisor(0, this.advisor);  }  else {  advised.addAdvisor(this.advisor);  }  return bean;  }  }   // 判斷須要對哪些Bean進行來代理  if (isEligible(bean, beanName)) {  ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName);  if (!proxyFactory.isProxyTargetClass()) {  evaluateProxyInterfaces(bean.getClass(), proxyFactory);  }  proxyFactory.addAdvisor(this.advisor);  customizeProxyFactory(proxyFactory);  return proxyFactory.getProxy(getProxyClassLoader());  }  return bean; } 複製代碼

果不其然,確實是在這個方法中完成的代理。接着咱們就要思考,切點的過濾規則是什麼呢?

切點的邏輯是怎麼樣的?

其實也不難猜到確定就是類上添加了@Async註解或者類中含有被@Async註解修飾的方法。基於此,咱們看看這個isEligible這個方法的實現邏輯,這個方位位於AbstractBeanFactoryAwareAdvisingPostProcessor中,也是AsyncAnnotationBeanPostProcessor的父類,對應代碼以下:

// AbstractBeanFactoryAwareAdvisingPostProcessor的isEligible方法
// 調用了父類 protected boolean isEligible(Object bean, String beanName) {  return (!AutoProxyUtils.isOriginalInstance(beanName, bean.getClass()) &&  super.isEligible(bean, beanName)); }  protected boolean isEligible(Object bean, String beanName) {  return isEligible(bean.getClass()); }  protected boolean isEligible(Class<?> targetClass) {  Boolean eligible = this.eligibleBeans.get(targetClass);  if (eligible != null) {  return eligible;  }  if (this.advisor == null) {  return false;  }  // 這裏完成的判斷  eligible = AopUtils.canApply(this.advisor, targetClass);  this.eligibleBeans.put(targetClass, eligible);  return eligible; } 複製代碼

實際上最後就是根據advisor來肯定是否要進行代理,在Spring中AOP相關的API及源碼解析,原來AOP是這樣子的這篇文章中咱們提到過,advisor實際就是一個綁定了切點的通知,那麼AsyncAnnotationBeanPostProcessor這個advisor是何時被初始化的呢?咱們直接定位到AsyncAnnotationBeanPostProcessorsetBeanFactory方法,其源碼以下:

public void setBeanFactory(BeanFactory beanFactory) {
 super.setBeanFactory(beanFactory);   // 在這裏new了一個AsyncAnnotationAdvisor  AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler);  if (this.asyncAnnotationType != null) {  advisor.setAsyncAnnotationType(this.asyncAnnotationType);  }  advisor.setBeanFactory(beanFactory);  // 完成了初始化  this.advisor = advisor; } 複製代碼

咱們來看看AsyncAnnotationAdvisor中的切點匹配規程是怎麼樣的,直接定位到這個類的buildPointcut方法中,其源碼以下:

protected Pointcut buildPointcut(Set<Class<? extends Annotation>> asyncAnnotationTypes) {
 ComposablePointcut result = null;  for (Class<? extends Annotation> asyncAnnotationType : asyncAnnotationTypes) {  // 就是根據這兩個匹配器進行匹配的  Pointcut cpc = new AnnotationMatchingPointcut(asyncAnnotationType, true);  Pointcut mpc = new AnnotationMatchingPointcut(null, asyncAnnotationType, true);  if (result == null) {  result = new ComposablePointcut(cpc);  }  else {  result.union(cpc);  }  result = result.union(mpc);  }  return (result != null ? result : Pointcut.TRUE); } 複製代碼

代碼很簡單,就是根據cpc跟mpc兩個匹配器來進行匹配的,第一個是檢查類上是否有@Async註解,第二個是檢查方法是是否有@Async註解。

那麼,到如今爲止,咱們已經知道了它在什麼時候建立代理,會爲何對象建立代理,最後咱們還須要解決一個問題,代理的邏輯是怎麼樣的,異步究竟是如何實現的?

通知的邏輯是怎麼樣的?是如何實現異步的?

前面也提到了advisor是一個綁定了切點的通知,前面分析了它的切點,那麼如今咱們就來看看它的通知邏輯,直接定位到AsyncAnnotationAdvisor中的buildAdvice方法,源碼以下:

protected Advice buildAdvice(  @Nullable Supplier<Executor> executor, @Nullable Supplier<AsyncUncaughtExceptionHandler> exceptionHandler) {   AnnotationAsyncExecutionInterceptor interceptor = new AnnotationAsyncExecutionInterceptor(null);  interceptor.configure(executor, exceptionHandler);  return interceptor; } 複製代碼

簡單吧,加了一個攔截器而已,對於interceptor類型的對象,咱們關注它的核心方法invoke就好了,代碼以下:

public Object invoke(final MethodInvocation invocation) throws Throwable {
 Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);  Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);  final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);   // 異步執行嘛,先獲取到一個線程池  AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);  if (executor == null) {  throw new IllegalStateException(  "No executor specified and no default executor set on AsyncExecutionInterceptor either");  }   // 而後將這個方法封裝成一個 Callable對象傳入到線程池中執行  Callable<Object> task = () -> {  try {  Object result = invocation.proceed();  if (result instanceof Future) {  return ((Future<?>) result).get();  }  }  catch (ExecutionException ex) {  handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());  }  catch (Throwable ex) {  handleError(ex, userDeclaredMethod, invocation.getArguments());  }  return null;  };  // 將任務提交到線程池  return doSubmit(task, executor, invocation.getMethod().getReturnType()); } 複製代碼

致使的問題及解決方案

問題1:循環依賴報錯

就像在這張圖裏這個讀者問的問題,image-20200719200303749

分爲兩點回答:

第一:循環依賴爲何不能被解決?

這個問題其實很簡單,在《面試必殺技,講一講Spring中的循環依賴》這篇文章中我從兩個方面分析了循環依賴的處理流程

  1. 簡單對象間的循環依賴處理
  2. AOP對象間的循環依賴處理

按照這種思路,@Async註解致使的循環依賴應該屬於AOP對象間的循環依賴,也應該能被處理。可是,重點來了,解決AOP對象間循環依賴的核心方法是三級緩存,以下:

image-20200706105535307
image-20200706105535307

在三級緩存緩存了一個工廠對象,這個工廠對象會調用getEarlyBeanReference方法來獲取一個早期的代理對象的引用,其源碼以下:

protected Object getEarlyBeanReference(String beanName, RootBeanDefinition mbd, Object bean) {
 Object exposedObject = bean;  if (!mbd.isSynthetic() && hasInstantiationAwareBeanPostProcessors()) {  for (BeanPostProcessor bp : getBeanPostProcessors()) {  // 看到這個判斷了嗎,經過@EnableAsync導入的後置處理器  // AsyncAnnotationBeanPostProcessor根本就不是一個SmartInstantiationAwareBeanPostProcessor  // 這就意味着即便咱們經過AsyncAnnotationBeanPostProcessor建立了一個代理對象  // 可是早期暴露出去的用於給別的Bean進行注入的那個對象仍是原始對象  if (bp instanceof SmartInstantiationAwareBeanPostProcessor) {  SmartInstantiationAwareBeanPostProcessor ibp = (SmartInstantiationAwareBeanPostProcessor) bp;  exposedObject = ibp.getEarlyBeanReference(exposedObject, beanName);  }  }  }  return exposedObject; } 複製代碼

看完上面的代碼循環依賴的問題就很明顯了,由於早期暴露的對象跟最終放入容器中的對象不是同一個,因此報錯了。報錯的具體位置我在你知道Spring是怎麼將AOP應用到Bean的生命週期中的嗎? 文章末尾已經分析過了,本文再也不贅述

image-20200720152830307
image-20200720152830307

解決方案

就以上面讀者給出的Demo爲例,只須要在爲B注入A時添加一個@Lazy註解便可

@Component
public class B implements BService {   @Autowired  @Lazy  private A a;   public void doSomething() {  } } 複製代碼

這個註解的做用在於,當爲B注入A時,會爲A生成一個代理對象注入到B中,當真正調用代理對象的方法時,底層會調用getBean(a)去建立A對象,而後調用方法,這個註解的處理時機是在org.springframework.beans.factory.support.DefaultListableBeanFactory#resolveDependency方法中,處理這個註解的代碼位於org.springframework.context.annotation.ContextAnnotationAutowireCandidateResolver#buildLazyResolutionProxy,這些代碼其實都在我以前的文章中分析過了

Spring雜談 | Spring中的AutowireCandidateResolver

談談Spring中的對象跟Bean,你知道Spring怎麼建立對象的嗎?

因此本文再也不作詳細分析

問題2:默認線程池不會複用線程

我以爲這是這個註解最坑的地方,沒有之一!咱們來看看它默認使用的線程池是哪一個,在前文的源碼分析中,咱們能夠看到決定要使用線程池的方法是org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor。其源碼以下:

protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
 AsyncTaskExecutor executor = this.executors.get(method);  if (executor == null) {  Executor targetExecutor;  // 能夠在@Async註解中配置線程池的名字  String qualifier = getExecutorQualifier(method);  if (StringUtils.hasLength(qualifier)) {  targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);  }  else {  // 獲取默認的線程池  targetExecutor = this.defaultExecutor.get();  }  if (targetExecutor == null) {  return null;  }  executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?  (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));  this.executors.put(method, executor);  }  return executor; } 複製代碼

最終會調用到org.springframework.aop.interceptor.AsyncExecutionInterceptor#getDefaultExecutor這個方法中

protected Executor getDefaultExecutor(@Nullable BeanFactory beanFactory) {
 Executor defaultExecutor = super.getDefaultExecutor(beanFactory);  return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); } 複製代碼

能夠看到,它默認使用的線程池是SimpleAsyncTaskExecutor。咱們不看這個類的源碼,只看它上面的文檔註釋,以下:

image-20200720160047340
image-20200720160047340

主要說了三點

  1. 爲每一個任務新起一個線程
  2. 默認線程數不作限制
  3. 不復用線程

就這三點,你還敢用嗎?只要你的任務耗時長一點,說不定服務器就給你來個OOM

解決方案

最好的辦法就是使用自定義的線程池,主要有這麼幾種配置方法

  1. 在以前的源碼分析中,咱們能夠知道,能夠經過 AsyncConfigurer來配置使用的線程池

以下:

public class DmzAsyncConfigurer implements AsyncConfigurer {
 @Override  public Executor getAsyncExecutor() {  // 建立自定義的線程池  } } 複製代碼
  1. 直接在@Async註解中配置要使用的線程池的名稱

以下:

public class A implements AService {
  private B b;   @Autowired  public void setB(B b) {  System.out.println(b);  this.b = b;  }   @Async("dmzExecutor")  public void doSomething() {  } } 複製代碼
@EnableAsync
@Configuration @ComponentScan("com.dmz.spring.async") @Aspect public class Config {  @Bean("dmzExecutor")  public Executor executor(){  // 建立自定義的線程池  return executor;  } } 複製代碼

總結

本文主要介紹了Spring中異步註解的使用、原理及可能碰到的問題,針對每一個問題文中也給出了方案。但願經過這篇文章能幫助你完全掌握@Async註解的使用,知其然並知其因此然!

文章有幫到你的話,記得點個贊哈~ 若是本文對你由幫助的話,記得點個贊吧!也歡迎關注個人公衆號,微信搜索:程序員DMZ,或者掃描下方二維碼,跟着我一塊兒認認真真學Java,踏踏實實作一個coder。

我叫DMZ,一個在學習路上匍匐前行的小菜鳥!

相關文章
相關標籤/搜索