spring context源碼解析之@Async

1、背景java

本文轉自「天河聊技術」微信公衆號spring

你們項目中用到異步、多線程的場景不少,使用最多的場景仍是主動對象模式,就是主線程開啓一個線程池去任務分發,任務執行完成以後,關閉線程池,可是有的場景則須要部分代碼異步執行的效果,簡單的說就是有一個能夠複用的線程池能夠複用,直接new Thread固然也能夠,不推薦,沒有線程池的可靠性好,若是這個時候再建立一個線程池用完再關閉代碼是否是有點重,維護性也很差,@Async這個註解就是爲了解決這個問題,只須要在bean的方法上加上這和註解便可,就能夠實現異步。緩存

 

2、正文微信

先簡單介紹下怎麼使用多線程

一、先在程序上加上@EnableAsync這個註解,能夠指定執行器,若是不指定就從beanFactory中獲取taskExecutor這個執行器異步

@Async(value = "executor")
    public void test(){

二、若是想自定義執行器,實現這個接口org.springframework.scheduling.annotation.AsyncConfigurer或者繼承org.springframework.scheduling.annotation.AsyncConfigurerSupport這個類async

public class AsyncExecutor implements AsyncConfigurer {

    @Override
    public Executor getAsyncExecutor() {
//        自定義線程池
        return Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
    }

 

源碼實現ide

@EnableAsync這個註解作什麼了什麼事情ui

@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Import(AsyncConfigurationSelector.class)
public @interface EnableAsync {

能夠看到@Import引入了AsyncConfigurationSelector這個類,前面介紹spring ioc源碼解析的時候這個@Import這個註解會把引入的類進行bean定義解析後註冊到beanFactory然後進行實例化。this

 

咱們看到這個selector又繼承了這個類

public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> {

spring切面攔截到這個註解後進行下一步操做。

 

org.springframework.scheduling.annotation.AsyncConfigurationSelector#selectImports

有兩種通知模式實現。

 

咱們能夠看到@EnableAsync這個註解中是能夠指定mode屬性的,若是不指定默認設置是PROXY

AdviceMode mode() default AdviceMode.PROXY;

咱們這裏有以PROXY爲例跟蹤下。

 

會加載到這個類

org.springframework.scheduling.annotation.ProxyAsyncConfiguration

@Configuration
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration {

   @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME)
   @Role(BeanDefinition.ROLE_INFRASTRUCTURE)
   public AsyncAnnotationBeanPostProcessor asyncAdvisor() {
      Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected");
//    建立AsyncAnnotationBeanPostProcessor,BeanPostProcessor 前面spring ioc源碼解析部分介紹過是對bean初始化過程的管理
      AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor();
      Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation");
      if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) {
         bpp.setAsyncAnnotationType(customAsyncAnnotation);
      }
//    對線程池執行器進行賦值
      if (this.executor != null) {
         bpp.setExecutor(this.executor);
      }
      if (this.exceptionHandler != null) {
         bpp.setExceptionHandler(this.exceptionHandler);
      }
//    這裏默認是jdk的動態代理,能夠@EnableAsync這個註解的屬性值,也能夠指定
      bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass"));
      bpp.setOrder(this.enableAsync.<Integer>getNumber("order"));
      return bpp;
   }

}

進入

org.springframework.scheduling.annotation.AbstractAsyncConfiguration

@Autowired(required = false)
   void setConfigurers(Collection<AsyncConfigurer> configurers) {
      if (CollectionUtils.isEmpty(configurers)) {
         return;
      }
      if (configurers.size() > 1) {
         throw new IllegalStateException("Only one AsyncConfigurer may exist");
      }
      AsyncConfigurer configurer = configurers.iterator().next();
//    獲取線程池
      this.executor = configurer.getAsyncExecutor();
      this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler();
   }

這個方法對線程進行賦值。

咱們能夠看到這個接口org.springframework.scheduling.annotation.AsyncConfigurer的惟一實現類org.springframework.scheduling.annotation.AsyncConfigurerSupport

public class AsyncConfigurerSupport implements AsyncConfigurer {

   @Override
   public Executor getAsyncExecutor() {
      return null;
   }

   @Override
   @Nullable
   public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() {
      return null;
   }

}

是一個空實現,多以想要自定義本身的線程池,實現這個接口org.springframework.scheduling.annotation.AsyncConfigurer或者繼承org.springframework.scheduling.annotation.AsyncConfigurerSupport重寫父類的方法。

 

異步方法返回值支持void、java.util.concurrent.Future、org.springframework.util.concurrent.ListenableFuture、java.util.concurrent.CompletableFuture

 

進入

org.springframework.aop.interceptor.AsyncExecutionInterceptor

找到

org.springframework.aop.interceptor.AsyncExecutionInterceptor#invoke這個方法,執行目標方法的切面方法

@Override
   @Nullable
   public Object invoke(final MethodInvocation invocation) throws Throwable {
      Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null);
//    獲取要執行的方法
      Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass);
      final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod);
//    獲取執行器
      AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod);
      if (executor == null) {
         throw new IllegalStateException(
               "No executor specified and no default executor set on AsyncExecutionInterceptor either");
      }

      Callable<Object> task = () -> {
         try {
            Object result = invocation.proceed();
            if (result instanceof Future) {
               return ((Future<?>) result).get();
            }
         }
         catch (ExecutionException ex) {
            handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments());
         }
         catch (Throwable ex) {
            handleError(ex, userDeclaredMethod, invocation.getArguments());
         }
         return null;
      };

      return doSubmit(task, executor, invocation.getMethod().getReturnType());
   }

進入

org.springframework.aop.interceptor.AsyncExecutionAspectSupport#determineAsyncExecutor

@Nullable
   protected AsyncTaskExecutor determineAsyncExecutor(Method method) {
//    先從緩存中獲取執行器
      AsyncTaskExecutor executor = this.executors.get(method);
      if (executor == null) {
         Executor targetExecutor;
//       從@Async註解中指定的屬性值獲取執行器
         String qualifier = getExecutorQualifier(method);
         if (StringUtils.hasLength(qualifier)) {
            targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier);
         }
         else {
            targetExecutor = this.defaultExecutor;
            if (targetExecutor == null) {
               synchronized (this.executors) {
                  if (this.defaultExecutor == null) {
//                   從beanFactory中獲取taskExecutor執行器
                     this.defaultExecutor = getDefaultExecutor(this.beanFactory);
                  }
                  targetExecutor = this.defaultExecutor;
               }
            }
         }
         if (targetExecutor == null) {
            return null;
         }
         executor = (targetExecutor instanceof AsyncListenableTaskExecutor ?
               (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor));
//       放入緩存
         this.executors.put(method, executor);
      }
      return executor;
   }

若是@Async指定了執行器,就會使用這個執行器,若是沒有指定就從beanFactory中獲取taskExecutor這個執行器。

 

 

3、最後

本次源碼解析到這裏,僅供參考。

相關文章
相關標籤/搜索