關於異步的好處我在這裏就很少說了,自從servlet3.1規範發佈以來,控制層的異步處理也愈來愈多的被人說起。而Spring5的webflux誕生也意味着Spring全方位對異步提供了支持。其實早在SpringMVC3.2版本就開始支持異步了,那麼這篇文章咱們就來探討一下SpringMVC使用異步的方式。java
DeferredResult這個類表明延遲結果,咱們先看一看spring的API文檔給咱們的解釋:web
{@code DeferredResult} provides an alternative to using a {@link Callable} for asynchronous request processing. While a {@code Callable} is executed concurrently on behalf of the application, with a {@code DeferredResult} the application can produce the result from a thread of its choice.spring
根據文檔說明DeferredResult
能夠替代Callable
來進行異步的請求處理。只不過這個類能夠從其餘線程裏拿到對應的結果。當使用DeferredResult
,咱們能夠將DefferedResult的類型並將其保存到能夠獲取到該對象的地方,好比說隊列或者集合當中,這樣方便其它線程可以取到並設置DefferedResult
的值。tomcat
咱們先定義一個Controller,代碼內容以下:mvc
package com.bdqn.lyrk.ssm.study.web.controller; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.BlockingQueue; import java.util.concurrent.Callable; /** * 異步任務的控制器 * * @author chen.nie * @date 2018/8/2 **/ @RestController public class AsyncController { private BlockingQueue<DeferredResult<String>> blockingQueue = new ArrayBlockingQueue(1024); /** * 返回值是DeferredResult類型,若是沒有結果請求阻塞 * * @return */ @GetMapping("/quotes") public DeferredResult<String> quotes() { //指定超時時間,及出錯時返回的值 DeferredResult<String> result = new DeferredResult(3000L,"error"); blockingQueue.add(result); return result; } /** * 另一個請求(新的線程)設置值 * * @throws InterruptedException */ @GetMapping("take") public void take() throws InterruptedException { DeferredResult<String> result = blockingQueue.take(); result.setResult("route"); } @GetMapping public Callable<String> callable() { return () -> "callable"; } }
控制器能夠從不一樣的線程異步生成返回值,例如響應外部事件(JMS消息)、計劃任務等,那麼在這裏我先使用另一個請求來模擬這個過程
此時咱們啓動tomcat,先訪問地址http://localhost:8080/quotes ,此時咱們會看到發送的請求因爲等待響應遭到了阻塞:
app
當在規定時間內訪問http://localhost:8080/take 時,則能成功顯示結果:
框架
根據官網描述:異步
DeferredResult processing:async
將Controller返回的DeferredResult
值保存到內存隊列或集合當中,緊接着SpringMVC調用HttpServletRequest
的startAsync()
方法,與此同時DispatcherServlet
和全部配置的Filter
退出當前的請求線程(不過響應時開放的),當其餘線程裏設置DeferredResult的值時將從新發送請求,此時DispatcherServlet使用異步生成的返回值繼續處理。ide
在這裏一切的一切還須要經過源代碼來解釋:
DispatcherServlet
處理時,會試着獲取一個WebAsyncManager
對象protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception { HttpServletRequest processedRequest = request; HandlerExecutionChain mappedHandler = null; boolean multipartRequestParsed = false; WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request); try { // ......省略部分代碼 // 執行子控制器的方法 mv = ha.handle(processedRequest, response, mappedHandler.getHandler()); //若是當前的請求須要異步處理,則終止當前請求,可是響應是開放的 if (asyncManager.isConcurrentHandlingStarted()) { return; } //....省略部分代碼 } catch (Exception ex) { triggerAfterCompletion(processedRequest, response, mappedHandler, ex); } catch (Throwable err) { triggerAfterCompletion(processedRequest, response, mappedHandler, new NestedServletException("Handler processing failed", err)); } finally { if (asyncManager.isConcurrentHandlingStarted()) { // Instead of postHandle and afterCompletion if (mappedHandler != null) { mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response); } } else { // Clean up any resources used by a multipart request. if (multipartRequestParsed) { cleanupMultipart(processedRequest); } } } }
HandlerMethodReturnValueHandler
接口處理的,其中有一個實現類是DeferredResultMethodReturnValueHandler
,關鍵代碼以下:package org.springframework.web.servlet.mvc.method.annotation; import java.util.HashMap; import java.util.Map; import java.util.concurrent.CompletionStage; import java.util.function.BiFunction; import org.springframework.core.MethodParameter; import org.springframework.lang.UsesJava8; import org.springframework.util.Assert; import org.springframework.util.ClassUtils; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback; import org.springframework.web.context.request.NativeWebRequest; import org.springframework.web.context.request.async.DeferredResult; import org.springframework.web.context.request.async.WebAsyncUtils; import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler; import org.springframework.web.method.support.ModelAndViewContainer; /** * Handler for return values of type {@link DeferredResult}, {@link ListenableFuture}, * {@link CompletionStage} and any other async type with a {@link #getAdapterMap() * registered adapter}. * * @author Rossen Stoyanchev * @since 3.2 */ @SuppressWarnings("deprecation") public class DeferredResultMethodReturnValueHandler implements AsyncHandlerMethodReturnValueHandler { //存放DeferredResult的適配集合 private final Map<Class<?>, DeferredResultAdapter> adapterMap; public DeferredResultMethodReturnValueHandler() { this.adapterMap = new HashMap<Class<?>, DeferredResultAdapter>(5); this.adapterMap.put(DeferredResult.class, new SimpleDeferredResultAdapter()); this.adapterMap.put(ListenableFuture.class, new ListenableFutureAdapter()); if (ClassUtils.isPresent("java.util.concurrent.CompletionStage", getClass().getClassLoader())) { this.adapterMap.put(CompletionStage.class, new CompletionStageAdapter()); } } /** * Return the map with {@code DeferredResult} adapters. * <p>By default the map contains adapters for {@code DeferredResult}, which * simply downcasts, {@link ListenableFuture}, and {@link CompletionStage}. * @return the map of adapters * @deprecated in 4.3.8, see comments on {@link DeferredResultAdapter} */ @Deprecated public Map<Class<?>, DeferredResultAdapter> getAdapterMap() { return this.adapterMap; } private DeferredResultAdapter getAdapterFor(Class<?> type) { for (Class<?> adapteeType : getAdapterMap().keySet()) { if (adapteeType.isAssignableFrom(type)) { return getAdapterMap().get(adapteeType); } } return null; } @Override public boolean supportsReturnType(MethodParameter returnType) { return (getAdapterFor(returnType.getParameterType()) != null); } @Override public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) { return (returnValue != null && (getAdapterFor(returnValue.getClass()) != null)); } @Override public void handleReturnValue(Object returnValue, MethodParameter returnType, ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception { if (returnValue == null) { mavContainer.setRequestHandled(true); return; } //根據返回值的類型獲取對應的DeferredResult適配器 DeferredResultAdapter adapter = getAdapterFor(returnValue.getClass()); if (adapter == null) { throw new IllegalStateException( "Could not find DeferredResultAdapter for return value type: " + returnValue.getClass()); } DeferredResult<?> result = adapter.adaptToDeferredResult(returnValue); //開啓異步請求 WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer); } }
在這裏咱們關注handleReturnValue
的方法,在通過適配包裝後獲取DeferredResult
開啓了異步之旅
WebAsyncManager
的startDeferredResultProcessing
方法/** * Start concurrent request processing and initialize the given * {@link DeferredResult} with a {@link DeferredResultHandler} that saves * the result and dispatches the request to resume processing of that * result. The {@code AsyncWebRequest} is also updated with a completion * handler that expires the {@code DeferredResult} and a timeout handler * assuming the {@code DeferredResult} has a default timeout result. * @param deferredResult the DeferredResult instance to initialize * @param processingContext additional context to save that can be accessed * via {@link #getConcurrentResultContext()} * @throws Exception if concurrent processing failed to start * @see #getConcurrentResult() * @see #getConcurrentResultContext() */ public void startDeferredResultProcessing( final DeferredResult<?> deferredResult, Object... processingContext) throws Exception { Assert.notNull(deferredResult, "DeferredResult must not be null"); Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); //設置超時時間 Long timeout = deferredResult.getTimeoutValue(); if (timeout != null) { this.asyncWebRequest.setTimeout(timeout); } //獲取全部的延遲結果攔截器 List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<DeferredResultProcessingInterceptor>(); interceptors.add(deferredResult.getInterceptor()); interceptors.addAll(this.deferredResultInterceptors.values()); interceptors.add(timeoutDeferredResultInterceptor); final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors); this.asyncWebRequest.addTimeoutHandler(new Runnable() { @Override public void run() { try { interceptorChain.triggerAfterTimeout(asyncWebRequest, deferredResult); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } } }); this.asyncWebRequest.addCompletionHandler(new Runnable() { @Override public void run() { interceptorChain.triggerAfterCompletion(asyncWebRequest, deferredResult); } }); interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult); //開始異步處理 startAsyncProcessing(processingContext); try { interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult); deferredResult.setResultHandler(new DeferredResultHandler() { @Override public void handleResult(Object result) { result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result); //設置結果並轉發 setConcurrentResultAndDispatch(result); } }); } catch (Throwable ex) { setConcurrentResultAndDispatch(ex); } } private void startAsyncProcessing(Object[] processingContext) { clearConcurrentResult(); this.concurrentResultContext = processingContext; //其實是執行的是HttpServletRequest對應方法 this.asyncWebRequest.startAsync(); if (logger.isDebugEnabled()) { HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class); String requestUri = urlPathHelper.getRequestUri(request); logger.debug("Concurrent handling starting for " + request.getMethod() + " [" + requestUri + "]"); } }
在這裏首先收集全部配置好的DeferredResultProcessingInterceptor
,而後設置asyncRequest的超時處理,完成時的處理等,同時會分階段執行攔截器中的各個方法。在這裏真的佩服Spring框架的擴展機制作的實在是太好了。最後咱們關注一下以下代碼:
deferredResult.setResultHandler(new DeferredResultHandler() { @Override public void handleResult(Object result) { result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result); //設置結果並轉發 setConcurrentResultAndDispatch(result); } });
其最終仍是要調用AsyncWebRequest
接口中的dispatch
方法進行轉發,讓DispatcherServlet
從新處理異步結果:
/** * Dispatch the request to the container in order to resume processing after * concurrent execution in an application thread. */ void dispatch();
其實在這裏都是封裝自HttpServletRequest
的異步操做,咱們能夠看一下StandardServletAsyncWebRequest
的類結構圖:
咱們能夠在其父類ServletRequestAttributes
裏找到對應的實現:
private final HttpServletRequest request; /** * Exposes the native {@link HttpServletRequest} that we're wrapping. */ public final HttpServletRequest getRequest() { return this.request; }
最後我在貼出一段StandardServletAsyncWebRequest
代碼,你們就應該知道整個異步是怎麼執行的了:
//java.servlet.AsnycContext private AsyncContext asyncContext; @Override public void startAsync() { Assert.state(getRequest().isAsyncSupported(), "Async support must be enabled on a servlet and for all filters involved " + "in async request processing. This is done in Java code using the Servlet API " + "or by adding \"<async-supported>true</async-supported>\" to servlet and " + "filter declarations in web.xml."); Assert.state(!isAsyncComplete(), "Async processing has already completed"); if (isAsyncStarted()) { return; } this.asyncContext = getRequest().startAsync(getRequest(), getResponse()); this.asyncContext.addListener(this); if (this.timeout != null) { this.asyncContext.setTimeout(this.timeout); } } @Override public void dispatch() { Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext"); this.asyncContext.dispatch(); }
使用Callable
做爲返回值來實現異步與DeferredResult
相似,咱們先看一看官網描述的具體流程:
Callable processing:
流程上大致與DeferredResult
相似,只不過Callable
是由TaskExecutor
來處理的,而TaskExecutor
繼承自java.util.concurrent.Executor
。咱們來看一下它的源代碼,它也是在WebAysncManager
中處理的:
/** * Use the given {@link WebAsyncTask} to configure the task executor as well as * the timeout value of the {@code AsyncWebRequest} before delegating to * {@link #startCallableProcessing(Callable, Object...)}. * @param webAsyncTask a WebAsyncTask containing the target {@code Callable} * @param processingContext additional context to save that can be accessed * via {@link #getConcurrentResultContext()} * @throws Exception if concurrent processing failed to start */ public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception { Assert.notNull(webAsyncTask, "WebAsyncTask must not be null"); Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null"); Long timeout = webAsyncTask.getTimeout(); if (timeout != null) { this.asyncWebRequest.setTimeout(timeout); } AsyncTaskExecutor executor = webAsyncTask.getExecutor(); if (executor != null) { this.taskExecutor = executor; } List<CallableProcessingInterceptor> interceptors = new ArrayList<CallableProcessingInterceptor>(); interceptors.add(webAsyncTask.getInterceptor()); interceptors.addAll(this.callableInterceptors.values()); interceptors.add(timeoutCallableInterceptor); final Callable<?> callable = webAsyncTask.getCallable(); final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors); this.asyncWebRequest.addTimeoutHandler(new Runnable() { @Override public void run() { logger.debug("Processing timeout"); Object result = interceptorChain.triggerAfterTimeout(asyncWebRequest, callable); if (result != CallableProcessingInterceptor.RESULT_NONE) { setConcurrentResultAndDispatch(result); } } }); this.asyncWebRequest.addCompletionHandler(new Runnable() { @Override public void run() { interceptorChain.triggerAfterCompletion(asyncWebRequest, callable); } }); interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable); startAsyncProcessing(processingContext); //啓動線程池的異步處理 try { this.taskExecutor.submit(new Runnable() { @Override public void run() { Object result = null; try { interceptorChain.applyPreProcess(asyncWebRequest, callable); result = callable.call(); } catch (Throwable ex) { result = ex; } finally { result = interceptorChain.applyPostProcess(asyncWebRequest, callable, result); } //設置當前的結果並轉發 setConcurrentResultAndDispatch(result); } }); } catch (RejectedExecutionException ex) { Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex); setConcurrentResultAndDispatch(result); throw ex; } }
對比DeferredResult
,在這裏剛開始也是添加攔截器,只不過攔截器的名稱是CallableProcessingInterceptor
,同時也須要設置WebAsyncRequest的超時處理,完成時處理的響應操做。這其中最大的區別就是使用TaskExecutor
來對Callable
進行異步處理