咱們先來看下面這個Demo。java
pom.xml中maven依賴:web
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>1.5.14.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <dependencies> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.18</version> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> </dependencies>
啓動類SpringBootAsyncApplication.javaspring
@SpringBootApplication @EnableAsync public class SpringBootAsyncApplication { public static void main(String[] args) { SpringApplication.run(SpringBootAsyncApplication.class, args); } }
DemoController.javac#
@RestController @RequestMapping(value = "/demos") @Slf4j public class DemoController { @Autowired private IDemoService demoService; @GetMapping("") public String test(@RequestParam String name) { long start = System.currentTimeMillis(); log.info("start send. ThreadName: {}", Thread.currentThread().getName()); demoService.send(name); long end = System.currentTimeMillis(); log.info("send end, time: {}", (end - start)); return "success"; } }
IDemoService接口實現類DemoServiceImpl.javasegmentfault
@Service @Slf4j public class DemoServiceImpl implements IDemoService { @Async @Override public void send(String name) { try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } log.info("async name={}, ThreadName: {}", name, Thread.currentThread().getName()); } }
啓動項目後,訪問GET http://127.0.0.1:8002/demos?name=test,獲得以下結果:app
2019-04-12 17:17:32.462 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController : start send. ThreadName: http-nio-8002-exec-1 2019-04-12 17:17:32.466 INFO 12 --- [nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either 2019-04-12 17:17:32.467 INFO 12 --- [nio-8002-exec-1] c.l.d.s.controller.DemoController : send end, time: 5 2019-04-12 17:17:37.468 INFO 12 --- [cTaskExecutor-1] c.l.d.s.service.impl.DemoServiceImpl : async name=test, ThreadName: SimpleAsyncTaskExecutor-1
經過控制檯日誌打印,咱們能夠看到有兩個線程,一個是主線程,一個是異步的線程。沒等異步的線程執行完,主線程就直接執行完畢,返回響應結果了,這就是異步的效果。less
爲何會拋出下面這段日誌?異步
[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
沒有發現用於異步處理的任務執行器,既沒有TaskExecutor類型的bean,也沒有名爲「taskExecutor」的bean。什麼意思?並且異步開啓的線程爲啥前綴是SimpleAsyncTaskExecutor?帶着這些疑問,咱們深刻到源碼中,看看到底發生了什麼。async
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.Async.javamaven
/** * Annotation that marks a method as a candidate for <i>asynchronous</i> execution. * Can also be used at the type level, in which case all of the type's methods are * considered as asynchronous. * * <p>In terms of target method signatures, any parameter types are supported. * However, the return type is constrained to either {@code void} or * {@link java.util.concurrent.Future}. In the latter case, you may declare the * more specific {@link org.springframework.util.concurrent.ListenableFuture} or * {@link java.util.concurrent.CompletableFuture} types which allow for richer * interaction with the asynchronous task and for immediate composition with * further processing steps. * * <p>A {@code Future} handle returned from the proxy will be an actual asynchronous * {@code Future} that can be used to track the result of the asynchronous method * execution. However, since the target method needs to implement the same signature, * it will have to return a temporary {@code Future} handle that just passes a value * through: e.g. Spring's {@link AsyncResult}, EJB 3.1's {@link javax.ejb.AsyncResult}, * or {@link java.util.concurrent.CompletableFuture#completedFuture(Object)}. * * @author Juergen Hoeller * @author Chris Beams * @since 3.0 * @see AnnotationAsyncExecutionInterceptor * @see AsyncAnnotationAdvisor */ @Target({ElementType.METHOD, ElementType.TYPE}) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface Async { /** * 指定異步操做的限定符值 * A qualifier value for the specified asynchronous operation(s). * <p>May be used to determine the target executor to be used when executing this * method, matching the qualifier value (or the bean name) of a specific * {@link java.util.concurrent.Executor Executor} or * {@link org.springframework.core.task.TaskExecutor TaskExecutor} * bean definition. * <p>When specified on a class level {@code @Async} annotation, indicates that the * given executor should be used for all methods within the class. Method level use * of {@code Async#value} always overrides any value set at the class level. * @since 3.1.2 */ String value() default ""; }
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.EnableAsync.java
package org.springframework.scheduling.annotation; import java.lang.annotation.Annotation; import java.lang.annotation.Documented; import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; import java.lang.annotation.Target; import org.springframework.context.annotation.AdviceMode; import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.Import; import org.springframework.core.Ordered; /** * Enables Spring's asynchronous method execution capability, similar to functionality * found in Spring's {@code <task:*>} XML namespace. * * <p>To be used together with @{@link Configuration Configuration} classes as follows, * enabling annotation-driven async processing for an entire Spring application context: * * <pre class="code"> * @Configuration * @EnableAsync * public class AppConfig { * * }</pre> * * {@code MyAsyncBean} is a user-defined type with one or more methods annotated with * either Spring's {@code @Async} annotation, the EJB 3.1 {@code @javax.ejb.Asynchronous} * annotation, or any custom annotation specified via the {@link #annotation} attribute. * The aspect is added transparently for any registered bean, for instance via this * configuration: * * <pre class="code"> * @Configuration * public class AnotherAppConfig { * * @Bean * public MyAsyncBean asyncBean() { * return new MyAsyncBean(); * } * }</pre> * * <p>By default, Spring will be searching for an associated thread pool definition: * either a unique {@link org.springframework.core.task.TaskExecutor} bean in the context, * or an {@link java.util.concurrent.Executor} bean named "taskExecutor" otherwise. If * neither of the two is resolvable, a {@link org.springframework.core.task.SimpleAsyncTaskExecutor} * will be used to process async method invocations. Besides, annotated methods having a * {@code void} return type cannot transmit any exception back to the caller. By default, * such uncaught exceptions are only logged. * * <p>To customize all this, implement {@link AsyncConfigurer} and provide: * <ul> * <li>your own {@link java.util.concurrent.Executor Executor} through the * {@link AsyncConfigurer#getAsyncExecutor getAsyncExecutor()} method, and</li> * <li>your own {@link org.springframework.aop.interceptor.AsyncUncaughtExceptionHandler * AsyncUncaughtExceptionHandler} through the {@link AsyncConfigurer#getAsyncUncaughtExceptionHandler * getAsyncUncaughtExceptionHandler()} * method.</li> * </ul> * * <pre class="code"> * @Configuration * @EnableAsync * public class AppConfig implements AsyncConfigurer { * * @Override * public Executor getAsyncExecutor() { * ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); * executor.setCorePoolSize(7); * executor.setMaxPoolSize(42); * executor.setQueueCapacity(11); * executor.setThreadNamePrefix("MyExecutor-"); * executor.initialize(); * return executor; * } * * @Override * public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { * return MyAsyncUncaughtExceptionHandler(); * } * }</pre> * * <p>If only one item needs to be customized, {@code null} can be returned to * keep the default settings. Consider also extending from {@link AsyncConfigurerSupport} * when possible. * * <p>Note: In the above example the {@code ThreadPoolTaskExecutor} is not a fully managed * Spring bean. Add the {@code @Bean} annotation to the {@code getAsyncExecutor()} method * if you want a fully managed bean. In such circumstances it is no longer necessary to * manually call the {@code executor.initialize()} method as this will be invoked * automatically when the bean is initialized. * * <p>For reference, the example above can be compared to the following Spring XML * configuration: * * <pre class="code"> * {@code * <beans> * * <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/> * * <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/> * * <bean id="asyncBean" class="com.foo.MyAsyncBean"/> * * <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/> * * </beans> * }</pre> * * The above XML-based and JavaConfig-based examples are equivalent except for the * setting of the <em>thread name prefix</em> of the {@code Executor}; this is because * the {@code <task:executor>} element does not expose such an attribute. This * demonstrates how the JavaConfig-based approach allows for maximum configurability * through direct access to actual componentry. * * <p>The {@link #mode} attribute controls how advice is applied: If the mode is * {@link AdviceMode#PROXY} (the default), then the other attributes control the behavior * of the proxying. Please note that proxy mode allows for interception of calls through * the proxy only; local calls within the same class cannot get intercepted that way. * * <p>Note that if the {@linkplain #mode} is set to {@link AdviceMode#ASPECTJ}, then the * value of the {@link #proxyTargetClass} attribute will be ignored. Note also that in * this case the {@code spring-aspects} module JAR must be present on the classpath, with * compile-time weaving or load-time weaving applying the aspect to the affected classes. * There is no proxy involved in such a scenario; local calls will be intercepted as well. * * @author Chris Beams * @author Juergen Hoeller * @author Stephane Nicoll * @author Sam Brannen * @since 3.1 * @see Async * @see AsyncConfigurer * @see AsyncConfigurationSelector */ @Target(ElementType.TYPE) @Retention(RetentionPolicy.RUNTIME) @Documented @Import(AsyncConfigurationSelector.class) public @interface EnableAsync { /** * Indicate the 'async' annotation type to be detected at either class * or method level. * <p>By default, both Spring's @{@link Async} annotation and the EJB 3.1 * {@code @javax.ejb.Asynchronous} annotation will be detected. * <p>This attribute exists so that developers can provide their own * custom annotation type to indicate that a method (or all methods of * a given class) should be invoked asynchronously. */ Class<? extends Annotation> annotation() default Annotation.class; /** * Indicate whether subclass-based (CGLIB) proxies are to be created as opposed * to standard Java interface-based proxies. * <p><strong>Applicable only if the {@link #mode} is set to {@link AdviceMode#PROXY}</strong>. * <p>The default is {@code false}. * <p>Note that setting this attribute to {@code true} will affect <em>all</em> * Spring-managed beans requiring proxying, not just those marked with {@code @Async}. * For example, other beans marked with Spring's {@code @Transactional} annotation * will be upgraded to subclass proxying at the same time. This approach has no * negative impact in practice unless one is explicitly expecting one type of proxy * vs. another — for example, in tests. */ boolean proxyTargetClass() default false; /** * Indicate how async advice should be applied. * <p><b>The default is {@link AdviceMode#PROXY}.</b> * Please note that proxy mode allows for interception of calls through the proxy * only. Local calls within the same class cannot get intercepted that way; an * {@link Async} annotation on such a method within a local call will be ignored * since Spring's interceptor does not even kick in for such a runtime scenario. * For a more advanced mode of interception, consider switching this to * {@link AdviceMode#ASPECTJ}. */ AdviceMode mode() default AdviceMode.PROXY; /** * Indicate the order in which the {@link AsyncAnnotationBeanPostProcessor} * should be applied. * <p>The default is {@link Ordered#LOWEST_PRECEDENCE} in order to run * after all other post-processors, so that it can add an advisor to * existing proxies rather than double-proxy. */ int order() default Ordered.LOWEST_PRECEDENCE; }
@EnableAsync啓用Spring異步方法執行功能,相似於Spring的<task:*>XML命名空間,和@Configuration註解類一塊兒使用,爲整個Spring應用程序上下文啓用註釋驅動的異步處理。
默認狀況下,Spring將會搜索一個關聯的線程池定義:要麼是一個在上下文中惟一的TaskExecutor類型的bean,要麼是一個名叫"taskExecutor"的Executor類型的bean;若是二者都沒法解決,SimpleAsyncTaskExecutor將用於處理異步方法調用。此外,具備void返回類型的帶註釋的方法不能將任何異常傳回調用者。默認狀況下,此類未捕獲異常只會被記錄下來。
爲了定製全部這些,須要實現AsyncConfigurer類,並重寫AsyncConfigurer類中getAsyncExecutor()方法和getAsyncUncaughtExceptionHandler()方法。
@Configuration @EnableAsync public class AppConfig implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor(); executor.setCorePoolSize(7); executor.setMaxPoolSize(42); executor.setQueueCapacity(11); executor.setThreadNamePrefix("MyExecutor-"); executor.initialize(); return executor; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return MyAsyncUncaughtExceptionHandler(); } }
若是隻須要定製一個項目,則能夠返回null以保持默認設置,若是可能的話還能夠考慮從AsyncConfigurerSupport擴展。
注意:在上面的示例中,ThreadPoolTaskExecutor不是一個徹底受管理Spring bean。若是你想要一個徹底受管理的bean,能夠將@Bean註解添加到getAsyncExecutor()方法上;在這種狀況下,就沒有必要再手動調用executor.initialize()方法進行bean初始化了。
@Bean @Override public Executor getAsyncExecutor() { ThreadPoolTaskExecutor executor= new ThreadPoolTaskExecutor(); executor.setCorePoolSize(7); executor.setMaxPoolSize(42); executor.setQueueCapacity(11); executor.setThreadNamePrefix("MyExecutor-"); return executor; }
爲了便於參考,能夠將上面的示例與下面的Spring XML配置進行比較:
<beans> <task:annotation-driven executor="myExecutor" exception-handler="exceptionHandler"/> <task:executor id="myExecutor" pool-size="7-42" queue-capacity="11"/> <bean id="asyncBean" class="com.foo.MyAsyncBean"/> <bean id="exceptionHandler" class="com.foo.MyAsyncUncaughtExceptionHandler"/> </beans>
上述基於xml和基於Java配置的示例除了設置執行器的線程名前綴外,其他都是等價的,這是由於<task:executor>元素沒有暴露這樣的屬性。這演示了基於Java配置的方法如何經過直接訪問實際組件來實現最大的可配置性。
#mode屬性控制應用如何被通知:若是mode是AdviceMode#PROXY(默認值),那麼其餘屬性控制代理的行爲。請注意,代理模式只容許攔截經過代理進行的調用,同一類內的本地調用不能以這種方式被攔截。
注意,若是#mode被設置爲AdviceMode#ASPECTJ,那麼#proxyTargetClass屬性值將被忽略。還要注意,在這種狀況下,spring-aspects模塊JAR必須出如今類路徑上,編譯時或加載時應用切面到受影響的類上。在這種狀況下不涉及代理,本地調用也會被攔截。
上面這些都是EnableAsync類註釋,具體是如何實現的?且看下面。
有沒有發現,EnableAsync類惟一的核心註解就是@Import(AsyncConfigurationSelector.class),咱們來看下它的源碼:
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AsyncConfigurationSelector.java
/** * Selects which implementation of {@link AbstractAsyncConfiguration} should be used based * on the value of {@link EnableAsync#mode} on the importing {@code @Configuration} class. * * @author Chris Beams * @since 3.1 * @see EnableAsync * @see ProxyAsyncConfiguration */ public class AsyncConfigurationSelector extends AdviceModeImportSelector<EnableAsync> { private static final String ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME = "org.springframework.scheduling.aspectj.AspectJAsyncConfiguration"; /** * {@inheritDoc} * @return {@link ProxyAsyncConfiguration} or {@code AspectJAsyncConfiguration} for * {@code PROXY} and {@code ASPECTJ} values of {@link EnableAsync#mode()}, respectively */ @Override public String[] selectImports(AdviceMode adviceMode) { switch (adviceMode) { case PROXY: return new String[] { ProxyAsyncConfiguration.class.getName() }; case ASPECTJ: return new String[] { ASYNC_EXECUTION_ASPECT_CONFIGURATION_CLASS_NAME }; default: return null; } } }
根據導入@Configuration類上EnableAsync#mode的值選擇AbstractAsyncConfiguration的哪一個實現類應該被使用。
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.ProxyAsyncConfiguration.java
/** * {@code @Configuration} class that registers the Spring infrastructure beans necessary * to enable proxy-based asynchronous method execution. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync * @see AsyncConfigurationSelector */ @Configuration @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public class ProxyAsyncConfiguration extends AbstractAsyncConfiguration { /** * 定義了一個AsyncAnnotationBeanPostProcessor類bean */ @Bean(name = TaskManagementConfigUtils.ASYNC_ANNOTATION_PROCESSOR_BEAN_NAME) @Role(BeanDefinition.ROLE_INFRASTRUCTURE) public AsyncAnnotationBeanPostProcessor asyncAdvisor() { Assert.notNull(this.enableAsync, "@EnableAsync annotation metadata was not injected"); // 新建一個異步註解bean後處理器 AsyncAnnotationBeanPostProcessor bpp = new AsyncAnnotationBeanPostProcessor(); // 獲取@EnableAsync中用戶自定義annotation Class<? extends Annotation> customAsyncAnnotation = this.enableAsync.getClass("annotation"); // 若是不是默認註解,則設置異步註解配置 if (customAsyncAnnotation != AnnotationUtils.getDefaultValue(EnableAsync.class, "annotation")) { bpp.setAsyncAnnotationType(customAsyncAnnotation); } // 設置線程任務執行器 if (this.executor != null) { bpp.setExecutor(this.executor); } // 設置異常處理器 if (this.exceptionHandler != null) { bpp.setExceptionHandler(this.exceptionHandler); } // 設置是否升級到CGLIB子類代理,默認不開啓 bpp.setProxyTargetClass(this.enableAsync.getBoolean("proxyTargetClass")); // 設置執行優先級,默認最後執行 bpp.setOrder(this.enableAsync.<Integer>getNumber("order")); return bpp; } }
spring-context-4.3.18.RELEASE.jar!org.springframework.scheduling.annotation.AbstractAsyncConfiguration.java
/** * Abstract base {@code Configuration} class providing common structure for enabling * Spring's asynchronous method execution capability. * * @author Chris Beams * @author Stephane Nicoll * @since 3.1 * @see EnableAsync */ @Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { // 註解屬性 protected AnnotationAttributes enableAsync; // 線程任務執行器 protected Executor executor; // 異常處理器 protected AsyncUncaughtExceptionHandler exceptionHandler; @Override public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } /** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer.getAsyncExecutor(); this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler(); } }
能夠看到接口org.springframework.scheduling.annotation.AsyncConfigurer的惟一實現類org.springframework.scheduling.annotation.AsyncConfigurerSupport:
/** * A convenience {@link AsyncConfigurer} that implements all methods * so that the defaults are used. Provides a backward compatible alternative * of implementing {@link AsyncConfigurer} directly. * * @author Stephane Nicoll * @since 4.1 */ public class AsyncConfigurerSupport implements AsyncConfigurer { @Override public Executor getAsyncExecutor() { return null; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return null; } }
這就是上面註釋講的,能夠經過實現AsyncConfigurer接口實現默認線程池和異常處理的定製化。
回到ProxyAsyncConfiguration類,這是一個Configuration類,在其中經過@bean注入了AsyncAnnotationBeanPostProcessor 類。
AsyncAnnotationBeanPostProcessor這個類的Bean初始化時,重寫了BeanFactoryAware接口setBeanFactory方法,對AsyncAnnotationAdvisor異步註解切面進行了構造。
@Override public void setBeanFactory(BeanFactory beanFactory) { super.setBeanFactory(beanFactory); AsyncAnnotationAdvisor advisor = new AsyncAnnotationAdvisor(this.executor, this.exceptionHandler); if (this.asyncAnnotationType != null) { advisor.setAsyncAnnotationType(this.asyncAnnotationType); } advisor.setBeanFactory(beanFactory); this.advisor = advisor; }
它會在普通bean屬性以後、初始化回調(如InitializingBean#afterPropertiesSet() 或者一個自定義初始化方法)以前被調用。
AsyncAnnotationBeanPostProcessor的後置bean處理是經過其父類AbstractAdvisingBeanPostProcessor來實現的,該類實現了BeanPostProcessor接口,重寫postProcessAfterInitialization方法。
@Override public Object postProcessAfterInitialization(Object bean, String beanName) { if (bean instanceof AopInfrastructureBean) { // Ignore AOP infrastructure such as scoped proxies. return bean; } //把Advisor添加進bean ProxyFactory-》AdvisedSupport-》Advised if (bean instanceof Advised) { Advised advised = (Advised) bean; if (!advised.isFrozen() && isEligible(AopUtils.getTargetClass(bean))) { // Add our local Advisor to the existing proxy's Advisor chain... if (this.beforeExistingAdvisors) { advised.addAdvisor(0, this.advisor); } else { advised.addAdvisor(this.advisor); } return bean; } } //構造ProxyFactory代理工廠,添加代理的接口,設置切面,最後返回代理類:AopProxy if (isEligible(bean, beanName)) { ProxyFactory proxyFactory = prepareProxyFactory(bean, beanName); if (!proxyFactory.isProxyTargetClass()) { evaluateProxyInterfaces(bean.getClass(), proxyFactory); } proxyFactory.addAdvisor(this.advisor); customizeProxyFactory(proxyFactory); return proxyFactory.getProxy(getProxyClassLoader()); } // No async proxy needed. return bean; }
JDK動態代理類JdkDynamicAopProxy實現AopProxy接口,最終執行的是InvocationHandler接口的invoke方法。
/** * Implementation of {@code InvocationHandler.invoke}. * <p>Callers will see exactly the exception thrown by the target, * unless a hook method throws an exception. */ @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { MethodInvocation invocation; Object oldProxy = null; boolean setProxyContext = false; TargetSource targetSource = this.advised.targetSource; Class<?> targetClass = null; Object target = null; try { if (!this.equalsDefined && AopUtils.isEqualsMethod(method)) { // The target does not implement the equals(Object) method itself. return equals(args[0]); } else if (!this.hashCodeDefined && AopUtils.isHashCodeMethod(method)) { // The target does not implement the hashCode() method itself. return hashCode(); } else if (method.getDeclaringClass() == DecoratingProxy.class) { // There is only getDecoratedClass() declared -> dispatch to proxy config. return AopProxyUtils.ultimateTargetClass(this.advised); } else if (!this.advised.opaque && method.getDeclaringClass().isInterface() && method.getDeclaringClass().isAssignableFrom(Advised.class)) { // Service invocations on ProxyConfig with the proxy config... return AopUtils.invokeJoinpointUsingReflection(this.advised, method, args); } Object retVal; if (this.advised.exposeProxy) { // Make invocation available if necessary. oldProxy = AopContext.setCurrentProxy(proxy); setProxyContext = true; } // May be null. Get as late as possible to minimize the time we "own" the target, // in case it comes from a pool. target = targetSource.getTarget(); if (target != null) { targetClass = target.getClass(); } // Get the interception chain for this method. List<Object> chain = this.advised.getInterceptorsAndDynamicInterceptionAdvice(method, targetClass); // Check whether we have any advice. If we don't, we can fallback on direct // reflective invocation of the target, and avoid creating a MethodInvocation. if (chain.isEmpty()) { // We can skip creating a MethodInvocation: just invoke the target directly // Note that the final invoker must be an InvokerInterceptor so we know it does // nothing but a reflective operation on the target, and no hot swapping or fancy proxying. Object[] argsToUse = AopProxyUtils.adaptArgumentsIfNecessary(method, args); retVal = AopUtils.invokeJoinpointUsingReflection(target, method, argsToUse); } else { // We need to create a method invocation... invocation = new ReflectiveMethodInvocation(proxy, target, method, args, targetClass, chain); // Proceed to the joinpoint through the interceptor chain. retVal = invocation.proceed(); } // Massage return value if necessary. Class<?> returnType = method.getReturnType(); if (retVal != null && retVal == target && returnType != Object.class && returnType.isInstance(proxy) && !RawTargetAccess.class.isAssignableFrom(method.getDeclaringClass())) { // Special case: it returned "this" and the return type of the method // is type-compatible. Note that we can't help if the target sets // a reference to itself in another returned object. retVal = proxy; } else if (retVal == null && returnType != Void.TYPE && returnType.isPrimitive()) { throw new AopInvocationException( "Null return value from advice does not match primitive return type for: " + method); } return retVal; } finally { if (target != null && !targetSource.isStatic()) { // Must have come from TargetSource. targetSource.releaseTarget(target); } if (setProxyContext) { // Restore old proxy. AopContext.setCurrentProxy(oldProxy); } } }
@Async註解的攔截器是AsyncExecutionInterceptor,它繼承了MethodInterceptor接口。而MethodInterceptor就是AOP規範中的Advice(切點的處理器)。chain不爲空,執行第二個分支,構造ReflectiveMethodInvocation,而後執行proceed方法。
@Override public Object proceed() throws Throwable { // We start with an index of -1 and increment early. if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) { return invokeJoinpoint(); } Object interceptorOrInterceptionAdvice = this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex); if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) { // Evaluate dynamic method matcher here: static part will already have // been evaluated and found to match. InterceptorAndDynamicMethodMatcher dm = (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice; if (dm.methodMatcher.matches(this.method, this.targetClass, this.arguments)) { return dm.interceptor.invoke(this); } else { // Dynamic matching failed. // Skip this interceptor and invoke the next in the chain. return proceed(); } } else { // It's an interceptor, so we just invoke it: The pointcut will have // been evaluated statically before this object was constructed. return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this); } }
核心方法是InterceptorAndDynamicMethodMatcher.interceptor.invoke(this),實際就是執行了AsyncExecutionInterceptor.invoke。
/** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */ @Override public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = new Callable<Object>() { @Override public Object call() throws Exception { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; } }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.java
/** * Determine the specific executor to use when executing the given method. * Should preferably return an {@link AsyncListenableTaskExecutor} implementation. * @return the executor to use (or {@code null}, but just if no default executor is available) */ protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor = this.defaultExecutor; if (targetExecutor == null) { synchronized (this.executors) { if (this.defaultExecutor == null) { this.defaultExecutor = getDefaultExecutor(this.beanFactory); } targetExecutor = this.defaultExecutor; } } } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
@Aync註解有個value能夠標註使用哪一個executor,這裏的getExecutorQualifier就是尋找這個標識。這裏若是defaultExecutor爲null的話,則獲取找默認的executor。
/** * Retrieve or build a default executor for this advice instance. * An executor returned from here will be cached for further use. * <p>The default implementation searches for a unique {@link TaskExecutor} bean * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable, this implementation will return {@code null}. * @param beanFactory the BeanFactory to use for a default executor lookup * @return the default executor, or {@code null} if none available * @since 4.2.6 * @see #findQualifiedExecutor(BeanFactory, String) * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */ protected Executor getDefaultExecutor(BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; }
若是工程裏頭沒有定義默認的task executor的話,則獲取bean的時候會拋出NoSuchBeanDefinitionException。這就是爲何上面例子中會拋出以下錯誤的緣由。
[nio-8002-exec-1] .s.a.AnnotationAsyncExecutionInterceptor : No task executor bean found for async processing: no bean of type TaskExecutor and no bean named 'taskExecutor' either
spring-aop-4.3.18.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.java
/** * This implementation searches for a unique {@link org.springframework.core.task.TaskExecutor} * bean in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable (e.g. if no {@code BeanFactory} was configured at all), * this implementation falls back to a newly created {@link SimpleAsyncTaskExecutor} instance * for local use if no default could be found. * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */ @Override protected Executor getDefaultExecutor(BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
AsyncExecutionInterceptor重寫了getDefaultExecutor方法,先調用AsyncExecutionAspectSupport的getDefaultExecutor,若是默認的找不到,這裏new一個SimpleAsyncTaskExecutor。這也是爲何上面例子中出現SimpleAsyncTaskExecutor線程前綴的緣由。
總體流程大致可梳理爲兩條線:
1.從註解開始:@EnableAsync--》ProxyAsyncConfiguration類構造一個bean(類型:AsyncAnnotationBeanPostProcessor)
2.從AsyncAnnotationBeanPostProcessor這個類的bean的生命週期走:AOP-Advisor切面初始化(setBeanFactory())--》AOP-生成代理類AopProxy(postProcessAfterInitialization())--》AOP-切點執行(InvocationHandler.invoke)
參考
https://segmentfault.com/a/1190000011339882
https://blog.csdn.net/qq_39470742/article/details/83382338
https://blog.csdn.net/fenglongmiao/article/details/82429460
https://www.baeldung.com/spring-async