本文主要聊聊spring中的async註解。html
@EnableAsync(proxyTargetClass = true) @Configuration public class AsyncConfig implements AsyncConfigurer{ private static final Logger LOGGER = LoggerFactory.getLogger(AsyncConfig.class); @Override public Executor getAsyncExecutor() { return null; } @Override public AsyncUncaughtExceptionHandler getAsyncUncaughtExceptionHandler() { return new AsyncUncaughtExceptionHandler() { @Override public void handleUncaughtException(Throwable throwable, Method method, Object... params) { LOGGER.error(throwable); } }; } }
這裏討論一下getAsyncExecutor這裏定義null的狀況。
spring-context-4.3.9.RELEASE-sources.jar!/org/springframework/scheduling/annotation/AbstractAsyncConfiguration.javajava
@Configuration public abstract class AbstractAsyncConfiguration implements ImportAware { protected AnnotationAttributes enableAsync; protected Executor executor; protected AsyncUncaughtExceptionHandler exceptionHandler; @Override public void setImportMetadata(AnnotationMetadata importMetadata) { this.enableAsync = AnnotationAttributes.fromMap( importMetadata.getAnnotationAttributes(EnableAsync.class.getName(), false)); if (this.enableAsync == null) { throw new IllegalArgumentException( "@EnableAsync is not present on importing class " + importMetadata.getClassName()); } } /** * Collect any {@link AsyncConfigurer} beans through autowiring. */ @Autowired(required = false) void setConfigurers(Collection<AsyncConfigurer> configurers) { if (CollectionUtils.isEmpty(configurers)) { return; } if (configurers.size() > 1) { throw new IllegalStateException("Only one AsyncConfigurer may exist"); } AsyncConfigurer configurer = configurers.iterator().next(); this.executor = configurer.getAsyncExecutor(); this.exceptionHandler = configurer.getAsyncUncaughtExceptionHandler(); } }
這裏從AsyncConfigurer獲取executorspring
spring-aop-4.3.9.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.javaasync
/** * Intercept the given method invocation, submit the actual calling of the method to * the correct task executor and return immediately to the caller. * @param invocation the method to intercept and make asynchronous * @return {@link Future} if the original method returns {@code Future}; {@code null} * otherwise. */ @Override public Object invoke(final MethodInvocation invocation) throws Throwable { Class<?> targetClass = (invocation.getThis() != null ? AopUtils.getTargetClass(invocation.getThis()) : null); Method specificMethod = ClassUtils.getMostSpecificMethod(invocation.getMethod(), targetClass); final Method userDeclaredMethod = BridgeMethodResolver.findBridgedMethod(specificMethod); AsyncTaskExecutor executor = determineAsyncExecutor(userDeclaredMethod); if (executor == null) { throw new IllegalStateException( "No executor specified and no default executor set on AsyncExecutionInterceptor either"); } Callable<Object> task = new Callable<Object>() { @Override public Object call() throws Exception { try { Object result = invocation.proceed(); if (result instanceof Future) { return ((Future<?>) result).get(); } } catch (ExecutionException ex) { handleError(ex.getCause(), userDeclaredMethod, invocation.getArguments()); } catch (Throwable ex) { handleError(ex, userDeclaredMethod, invocation.getArguments()); } return null; } }; return doSubmit(task, executor, invocation.getMethod().getReturnType()); }
spring-aop-4.3.9.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionAspectSupport.javaide
** * Determine the specific executor to use when executing the given method. * Should preferably return an {@link AsyncListenableTaskExecutor} implementation. * @return the executor to use (or {@code null}, but just if no default executor is available) */ protected AsyncTaskExecutor determineAsyncExecutor(Method method) { AsyncTaskExecutor executor = this.executors.get(method); if (executor == null) { Executor targetExecutor; String qualifier = getExecutorQualifier(method); if (StringUtils.hasLength(qualifier)) { targetExecutor = findQualifiedExecutor(this.beanFactory, qualifier); } else { targetExecutor = this.defaultExecutor; if (targetExecutor == null) { synchronized (this.executors) { if (this.defaultExecutor == null) { this.defaultExecutor = getDefaultExecutor(this.beanFactory); } targetExecutor = this.defaultExecutor; } } } if (targetExecutor == null) { return null; } executor = (targetExecutor instanceof AsyncListenableTaskExecutor ? (AsyncListenableTaskExecutor) targetExecutor : new TaskExecutorAdapter(targetExecutor)); this.executors.put(method, executor); } return executor; }
@Aync註解有個value能夠標註使用哪一個executor,這裏的getExecutorQualifier就是尋找這個標識。優化
這裏若是defaultExecutor爲null的話,則獲取找默認的executorui
/** * Retrieve or build a default executor for this advice instance. * An executor returned from here will be cached for further use. * <p>The default implementation searches for a unique {@link TaskExecutor} bean * in the context, or for an {@link Executor} bean named "taskExecutor" otherwise. * If neither of the two is resolvable, this implementation will return {@code null}. * @param beanFactory the BeanFactory to use for a default executor lookup * @return the default executor, or {@code null} if none available * @since 4.2.6 * @see #findQualifiedExecutor(BeanFactory, String) * @see #DEFAULT_TASK_EXECUTOR_BEAN_NAME */ protected Executor getDefaultExecutor(BeanFactory beanFactory) { if (beanFactory != null) { try { // Search for TaskExecutor bean... not plain Executor since that would // match with ScheduledExecutorService as well, which is unusable for // our purposes here. TaskExecutor is more clearly designed for it. return beanFactory.getBean(TaskExecutor.class); } catch (NoUniqueBeanDefinitionException ex) { logger.debug("Could not find unique TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { if (logger.isInfoEnabled()) { logger.info("More than one TaskExecutor bean found within the context, and none is named " + "'taskExecutor'. Mark one of them as primary or name it 'taskExecutor' (possibly " + "as an alias) in order to use it for async processing: " + ex.getBeanNamesFound()); } } } catch (NoSuchBeanDefinitionException ex) { logger.debug("Could not find default TaskExecutor bean", ex); try { return beanFactory.getBean(DEFAULT_TASK_EXECUTOR_BEAN_NAME, Executor.class); } catch (NoSuchBeanDefinitionException ex2) { logger.info("No task executor bean found for async processing: " + "no bean of type TaskExecutor and no bean named 'taskExecutor' either"); } // Giving up -> either using local default executor or none at all... } } return null; }
若是工程裏頭沒有定義默認的task executor的話,則獲取bean的時候會拋出NoSuchBeanDefinitionExceptionthis
spring-aop-4.3.9.RELEASE-sources.jar!/org/springframework/aop/interceptor/AsyncExecutionInterceptor.javadebug
protected Executor getDefaultExecutor(BeanFactory beanFactory) { Executor defaultExecutor = super.getDefaultExecutor(beanFactory); return (defaultExecutor != null ? defaultExecutor : new SimpleAsyncTaskExecutor()); }
AsyncExecutionInterceptor重寫了getDefaultExecutor方法,先調用AsyncExecutionAspectSupport的getDefaultExecutor,若是默認的找不到,這裏new一個SimpleAsyncTaskExecutorcode
若是是在AsyncConfigurer定義的executor,沒有受spring託管,貌似是不會在spring context關閉的時候主動shutdown,這個多是個問題。
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { //... }
spring-context-4.3.9.RELEASE-sources.jar!/org/springframework/scheduling/concurrent/ExecutorConfigurationSupport.java
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean { //... /** * Perform a shutdown on the underlying ExecutorService. * @see java.util.concurrent.ExecutorService#shutdown() * @see java.util.concurrent.ExecutorService#shutdownNow() * @see #awaitTerminationIfNecessary() */ public void shutdown() { if (logger.isInfoEnabled()) { logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (this.waitForTasksToCompleteOnShutdown) { this.executor.shutdown(); } else { this.executor.shutdownNow(); } awaitTerminationIfNecessary(); } /** * Wait for the executor to terminate, according to the value of the * {@link #setAwaitTerminationSeconds "awaitTerminationSeconds"} property. */ private void awaitTerminationIfNecessary() { if (this.awaitTerminationSeconds > 0) { try { if (!this.executor.awaitTermination(this.awaitTerminationSeconds, TimeUnit.SECONDS)) { if (logger.isWarnEnabled()) { logger.warn("Timed out while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); } } } catch (InterruptedException ex) { if (logger.isWarnEnabled()) { logger.warn("Interrupted while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); } Thread.currentThread().interrupt(); } } } }
ExecutorConfigurationSupport實現了DisposableBean接口,重寫了destory方法,在裏頭調用shutdown
所以,最好將ThreadPoolTaskExecutor的定義託管給spring,這樣能夠優化關閉。
不是spring託管的executor,須要本身額外去監聽事件,而後優雅關閉
好比
@Async("myTaskExecutor") public void xxxx(){ }
這個則使用指定的myTaskExecutor,而不是AsyncConfigurer中定義的executor。
推薦async註解指定task executor,而後AsyncConfigurer的getAsyncExecutor返回null,讓它去尋找默認的taskExecutor(
本身應用裏頭都默認定義一個taskExecutor給spring託管
)