springmvc的異步處理

     關於異步的好處我在這裏就很少說了,自從servlet3.1規範發佈以來,控制層的異步處理也愈來愈多的被人說起。而Spring5的webflux誕生也意味着Spring全方位對異步提供了支持。其實早在SpringMVC3.2版本就開始支持異步了,那麼這篇文章咱們就來探討一下SpringMVC使用異步的方式。java

1、DeferredResult

     DeferredResult這個類表明延遲結果,咱們先看一看spring的API文檔給咱們的解釋:web

{@code DeferredResult} provides an alternative to using a {@link Callable} for asynchronous request processing. While a {@code Callable} is executed concurrently on behalf of the application, with a {@code DeferredResult} the application can produce the result from a thread of its choice.spring

     根據文檔說明DeferredResult能夠替代Callable來進行異步的請求處理。只不過這個類能夠從其餘線程裏拿到對應的結果。當使用DeferredResult,咱們能夠將DefferedResult的類型並將其保存到能夠獲取到該對象的地方,好比說隊列或者集合當中,這樣方便其它線程可以取到並設置DefferedResult的值。tomcat

1.一、示例

     咱們先定義一個Controller,代碼內容以下:mvc

package com.bdqn.lyrk.ssm.study.web.controller;

import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;
import org.springframework.web.context.request.async.DeferredResult;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Callable;

/**
 * 異步任務的控制器
 *
 * @author chen.nie
 * @date 2018/8/2
 **/
@RestController
public class AsyncController {

    private BlockingQueue<DeferredResult<String>> blockingQueue = new ArrayBlockingQueue(1024);

    /**
     * 返回值是DeferredResult類型,若是沒有結果請求阻塞
     *
     * @return
     */
    @GetMapping("/quotes")
    public DeferredResult<String> quotes() {
        //指定超時時間,及出錯時返回的值
        DeferredResult<String> result = new DeferredResult(3000L,"error");
        blockingQueue.add(result);
        return result;
    }

    /**
     * 另一個請求(新的線程)設置值
     *
     * @throws InterruptedException
     */

    @GetMapping("take")
    public void take() throws InterruptedException {
        DeferredResult<String> result = blockingQueue.take();
        result.setResult("route");
    }

    @GetMapping
    public Callable<String> callable() {
        return () -> "callable";
    }


}

     控制器能夠從不一樣的線程異步生成返回值,例如響應外部事件(JMS消息)、計劃任務等,那麼在這裏我先使用另一個請求來模擬這個過程
     此時咱們啓動tomcat,先訪問地址http://localhost:8080/quotes ,此時咱們會看到發送的請求因爲等待響應遭到了阻塞:
A00F6B02_D6B9_4B5A_AE5F_2397E6E9CD87app

     當在規定時間內訪問http://localhost:8080/take 時,則能成功顯示結果:
75E7C6AC_446B_424A_857C_CFBAACF93120框架

1.二、DeferredResult處理流程

根據官網描述:異步

DeferredResult processing:async

  • Controller returns a DeferredResult and saves it in some in-memory queue or list where it can be accessed.
  • Spring MVC calls request.startAsync().
  • Meanwhile the DispatcherServlet and all configured Filter’s exit the request processing thread but the response remains open.
  • The application sets the DeferredResult from some thread and Spring MVC dispatches the request back to the Servlet container.
  • The DispatcherServlet is invoked again and processing resumes with the asynchronously produced return value.

     將Controller返回的DeferredResult值保存到內存隊列或集合當中,緊接着SpringMVC調用HttpServletRequeststartAsync()方法,與此同時DispatcherServlet和全部配置的Filter退出當前的請求線程(不過響應時開放的),當其餘線程裏設置DeferredResult的值時將從新發送請求,此時DispatcherServlet使用異步生成的返回值繼續處理。ide

     在這裏一切的一切還須要經過源代碼來解釋:

  • 當一個請求被DispatcherServlet處理時,會試着獲取一個WebAsyncManager對象
protected void doDispatch(HttpServletRequest request, HttpServletResponse response) throws Exception {
        HttpServletRequest processedRequest = request;
        HandlerExecutionChain mappedHandler = null;
        boolean multipartRequestParsed = false;

        WebAsyncManager asyncManager = WebAsyncUtils.getAsyncManager(request);
        try {
          // ......省略部分代碼
          // 執行子控制器的方法
          mv = ha.handle(processedRequest, response, mappedHandler.getHandler());
        //若是當前的請求須要異步處理,則終止當前請求,可是響應是開放的
          if (asyncManager.isConcurrentHandlingStarted()) {
              return;
          }
        //....省略部分代碼
       }
        catch (Exception ex) {
            triggerAfterCompletion(processedRequest, response, mappedHandler, ex);
        }
        catch (Throwable err) {
            triggerAfterCompletion(processedRequest, response, mappedHandler,
                new NestedServletException("Handler processing failed", err));
        }
        finally {
            if (asyncManager.isConcurrentHandlingStarted()) {
                // Instead of postHandle and afterCompletion
                if (mappedHandler != null) {
                    mappedHandler.applyAfterConcurrentHandlingStarted(processedRequest, response);
                }
            }
            else {
                // Clean up any resources used by a multipart request.
                if (multipartRequestParsed) {
                    cleanupMultipart(processedRequest);
                }
            }
        }
}
  • 對於每個子控制器的方法返回值,都是HandlerMethodReturnValueHandler接口處理的,其中有一個實現類是DeferredResultMethodReturnValueHandler,關鍵代碼以下:
package org.springframework.web.servlet.mvc.method.annotation;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CompletionStage;
import java.util.function.BiFunction;

import org.springframework.core.MethodParameter;
import org.springframework.lang.UsesJava8;
import org.springframework.util.Assert;
import org.springframework.util.ClassUtils;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.context.request.NativeWebRequest;
import org.springframework.web.context.request.async.DeferredResult;
import org.springframework.web.context.request.async.WebAsyncUtils;
import org.springframework.web.method.support.AsyncHandlerMethodReturnValueHandler;
import org.springframework.web.method.support.ModelAndViewContainer;

/**
 * Handler for return values of type {@link DeferredResult}, {@link ListenableFuture},
 * {@link CompletionStage} and any other async type with a {@link #getAdapterMap()
 * registered adapter}.
 *
 * @author Rossen Stoyanchev
 * @since 3.2
 */
@SuppressWarnings("deprecation")
public class DeferredResultMethodReturnValueHandler implements AsyncHandlerMethodReturnValueHandler {

    //存放DeferredResult的適配集合
    private final Map<Class<?>, DeferredResultAdapter> adapterMap;


    public DeferredResultMethodReturnValueHandler() {
        this.adapterMap = new HashMap<Class<?>, DeferredResultAdapter>(5);
        this.adapterMap.put(DeferredResult.class, new SimpleDeferredResultAdapter());
        this.adapterMap.put(ListenableFuture.class, new ListenableFutureAdapter());
        if (ClassUtils.isPresent("java.util.concurrent.CompletionStage", getClass().getClassLoader())) {
            this.adapterMap.put(CompletionStage.class, new CompletionStageAdapter());
        }
    }


    /**
     * Return the map with {@code DeferredResult} adapters.
     * <p>By default the map contains adapters for {@code DeferredResult}, which
     * simply downcasts, {@link ListenableFuture}, and {@link CompletionStage}.
     * @return the map of adapters
     * @deprecated in 4.3.8, see comments on {@link DeferredResultAdapter}
     */
    @Deprecated
    public Map<Class<?>, DeferredResultAdapter> getAdapterMap() {
        return this.adapterMap;
    }

    private DeferredResultAdapter getAdapterFor(Class<?> type) {
        for (Class<?> adapteeType : getAdapterMap().keySet()) {
            if (adapteeType.isAssignableFrom(type)) {
                return getAdapterMap().get(adapteeType);
            }
        }
        return null;
    }


    @Override
    public boolean supportsReturnType(MethodParameter returnType) {
        return (getAdapterFor(returnType.getParameterType()) != null);
    }

    @Override
    public boolean isAsyncReturnValue(Object returnValue, MethodParameter returnType) {
        return (returnValue != null && (getAdapterFor(returnValue.getClass()) != null));
    }

    @Override
    public void handleReturnValue(Object returnValue, MethodParameter returnType,
            ModelAndViewContainer mavContainer, NativeWebRequest webRequest) throws Exception {

        if (returnValue == null) {
            mavContainer.setRequestHandled(true);
            return;
        }
       //根據返回值的類型獲取對應的DeferredResult適配器
        DeferredResultAdapter adapter = getAdapterFor(returnValue.getClass());
        if (adapter == null) {
            throw new IllegalStateException(
                    "Could not find DeferredResultAdapter for return value type: " + returnValue.getClass());
        }
        DeferredResult<?> result = adapter.adaptToDeferredResult(returnValue);
        //開啓異步請求
        WebAsyncUtils.getAsyncManager(webRequest).startDeferredResultProcessing(result, mavContainer);
    }

}

     在這裏咱們關注handleReturnValue的方法,在通過適配包裝後獲取DeferredResult開啓了異步之旅

  • 緊接着咱們關注一下WebAsyncManagerstartDeferredResultProcessing方法
/**
     * Start concurrent request processing and initialize the given
     * {@link DeferredResult} with a {@link DeferredResultHandler} that saves
     * the result and dispatches the request to resume processing of that
     * result. The {@code AsyncWebRequest} is also updated with a completion
     * handler that expires the {@code DeferredResult} and a timeout handler
     * assuming the {@code DeferredResult} has a default timeout result.
     * @param deferredResult the DeferredResult instance to initialize
     * @param processingContext additional context to save that can be accessed
     * via {@link #getConcurrentResultContext()}
     * @throws Exception if concurrent processing failed to start
     * @see #getConcurrentResult()
     * @see #getConcurrentResultContext()
     */
    public void startDeferredResultProcessing(
            final DeferredResult<?> deferredResult, Object... processingContext) throws Exception {

        Assert.notNull(deferredResult, "DeferredResult must not be null");
        Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");
        //設置超時時間
        Long timeout = deferredResult.getTimeoutValue();
        if (timeout != null) {
            this.asyncWebRequest.setTimeout(timeout);
        }

        //獲取全部的延遲結果攔截器
        List<DeferredResultProcessingInterceptor> interceptors = new ArrayList<DeferredResultProcessingInterceptor>();
        interceptors.add(deferredResult.getInterceptor());
        interceptors.addAll(this.deferredResultInterceptors.values());
        interceptors.add(timeoutDeferredResultInterceptor);

        final DeferredResultInterceptorChain interceptorChain = new DeferredResultInterceptorChain(interceptors);
       
        this.asyncWebRequest.addTimeoutHandler(new Runnable() {
            @Override
            public void run() {
                try {
                    interceptorChain.triggerAfterTimeout(asyncWebRequest, deferredResult);
                }
                catch (Throwable ex) {
                    setConcurrentResultAndDispatch(ex);
                }
            }
        });

        this.asyncWebRequest.addCompletionHandler(new Runnable() {
            @Override
            public void run() {
                interceptorChain.triggerAfterCompletion(asyncWebRequest, deferredResult);
            }
        });

        interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, deferredResult);
         //開始異步處理
        startAsyncProcessing(processingContext);

        try {
            interceptorChain.applyPreProcess(this.asyncWebRequest, deferredResult);
            deferredResult.setResultHandler(new DeferredResultHandler() {
                @Override
                public void handleResult(Object result) {
                    result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result);
                    //設置結果並轉發
                    setConcurrentResultAndDispatch(result);
                }
            });
        }
        catch (Throwable ex) {
            setConcurrentResultAndDispatch(ex);
        }
    }

    private void startAsyncProcessing(Object[] processingContext) {
        clearConcurrentResult();
        this.concurrentResultContext = processingContext;
        //其實是執行的是HttpServletRequest對應方法
        this.asyncWebRequest.startAsync();

        if (logger.isDebugEnabled()) {
            HttpServletRequest request = this.asyncWebRequest.getNativeRequest(HttpServletRequest.class);
            String requestUri = urlPathHelper.getRequestUri(request);
            logger.debug("Concurrent handling starting for " + request.getMethod() + " [" + requestUri + "]");
        }
    }

     在這裏首先收集全部配置好的DeferredResultProcessingInterceptor,而後設置asyncRequest的超時處理,完成時的處理等,同時會分階段執行攔截器中的各個方法。在這裏真的佩服Spring框架的擴展機制作的實在是太好了。最後咱們關注一下以下代碼:

deferredResult.setResultHandler(new DeferredResultHandler() {
                @Override
                public void handleResult(Object result) {
                    result = interceptorChain.applyPostProcess(asyncWebRequest, deferredResult, result);
                    //設置結果並轉發
                    setConcurrentResultAndDispatch(result);
                }
            });

     其最終仍是要調用AsyncWebRequest接口中的dispatch方法進行轉發,讓DispatcherServlet從新處理異步結果:

/**
     * Dispatch the request to the container in order to resume processing after
     * concurrent execution in an application thread.
     */
    void dispatch();

     其實在這裏都是封裝自HttpServletRequest的異步操做,咱們能夠看一下StandardServletAsyncWebRequest的類結構圖:StandardServletAsyncWebRequest

     咱們能夠在其父類ServletRequestAttributes裏找到對應的實現:

private final HttpServletRequest request;
/**
     * Exposes the native {@link HttpServletRequest} that we're wrapping.
     */
    public final HttpServletRequest getRequest() {
        return this.request;
    }

     最後我在貼出一段StandardServletAsyncWebRequest代碼,你們就應該知道整個異步是怎麼執行的了:

//java.servlet.AsnycContext
    private AsyncContext asyncContext;
  
    @Override
    public void startAsync() {
        Assert.state(getRequest().isAsyncSupported(),
                "Async support must be enabled on a servlet and for all filters involved " +
                "in async request processing. This is done in Java code using the Servlet API " +
                "or by adding \"<async-supported>true</async-supported>\" to servlet and " +
                "filter declarations in web.xml.");
        Assert.state(!isAsyncComplete(), "Async processing has already completed");

        if (isAsyncStarted()) {
            return;
        }
        this.asyncContext = getRequest().startAsync(getRequest(), getResponse());
        this.asyncContext.addListener(this);
        if (this.timeout != null) {
            this.asyncContext.setTimeout(this.timeout);
        }
    }

    @Override
    public void dispatch() {
        Assert.notNull(this.asyncContext, "Cannot dispatch without an AsyncContext");
        this.asyncContext.dispatch();
    }

2、使用Callable做爲返回值

     使用Callable做爲返回值來實現異步與DeferredResult相似,咱們先看一看官網描述的具體流程:

Callable processing:

  • Controller returns a Callable.
  • Spring MVC calls request.startAsync() and submits the Callable to a TaskExecutor for processing in a separate thread.
  • Meanwhile the DispatcherServlet and all Filter’s exit the Servlet container thread but the response remains open.
  • Eventually the Callable produces a result and Spring MVC dispatches the request back to the Servlet container to complete processing.
  • The DispatcherServlet is invoked again and processing resumes with the asynchronously produced return value from the Callable.

     流程上大致與DeferredResult相似,只不過Callable是由TaskExecutor來處理的,而TaskExecutor繼承自java.util.concurrent.Executor。咱們來看一下它的源代碼,它也是在WebAysncManager中處理的:

/**
     * Use the given {@link WebAsyncTask} to configure the task executor as well as
     * the timeout value of the {@code AsyncWebRequest} before delegating to
     * {@link #startCallableProcessing(Callable, Object...)}.
     * @param webAsyncTask a WebAsyncTask containing the target {@code Callable}
     * @param processingContext additional context to save that can be accessed
     * via {@link #getConcurrentResultContext()}
     * @throws Exception if concurrent processing failed to start
     */
    public void startCallableProcessing(final WebAsyncTask<?> webAsyncTask, Object... processingContext) throws Exception {
        Assert.notNull(webAsyncTask, "WebAsyncTask must not be null");
        Assert.state(this.asyncWebRequest != null, "AsyncWebRequest must not be null");

        Long timeout = webAsyncTask.getTimeout();
        if (timeout != null) {
            this.asyncWebRequest.setTimeout(timeout);
        }

        AsyncTaskExecutor executor = webAsyncTask.getExecutor();
        if (executor != null) {
            this.taskExecutor = executor;
        }

        List<CallableProcessingInterceptor> interceptors = new ArrayList<CallableProcessingInterceptor>();
        interceptors.add(webAsyncTask.getInterceptor());
        interceptors.addAll(this.callableInterceptors.values());
        interceptors.add(timeoutCallableInterceptor);

        final Callable<?> callable = webAsyncTask.getCallable();
        final CallableInterceptorChain interceptorChain = new CallableInterceptorChain(interceptors);

        this.asyncWebRequest.addTimeoutHandler(new Runnable() {
            @Override
            public void run() {
                logger.debug("Processing timeout");
                Object result = interceptorChain.triggerAfterTimeout(asyncWebRequest, callable);
                if (result != CallableProcessingInterceptor.RESULT_NONE) {
                    setConcurrentResultAndDispatch(result);
                }
            }
        });

        this.asyncWebRequest.addCompletionHandler(new Runnable() {
            @Override
            public void run() {
                interceptorChain.triggerAfterCompletion(asyncWebRequest, callable);
            }
        });

        interceptorChain.applyBeforeConcurrentHandling(this.asyncWebRequest, callable);
        startAsyncProcessing(processingContext);
        //啓動線程池的異步處理
        try {
            this.taskExecutor.submit(new Runnable() {
                @Override
                public void run() {
                    Object result = null;
                    try {
                        interceptorChain.applyPreProcess(asyncWebRequest, callable);
                        result = callable.call();
                    }
                    catch (Throwable ex) {
                        result = ex;
                    }
                    finally {
                        result = interceptorChain.applyPostProcess(asyncWebRequest, callable, result);
                    }
                    //設置當前的結果並轉發
                    setConcurrentResultAndDispatch(result);
                }
            });
        }
        catch (RejectedExecutionException ex) {
            Object result = interceptorChain.applyPostProcess(this.asyncWebRequest, callable, ex);
            setConcurrentResultAndDispatch(result);
            throw ex;
        }
    }

     對比DeferredResult,在這裏剛開始也是添加攔截器,只不過攔截器的名稱是CallableProcessingInterceptor,同時也須要設置WebAsyncRequest的超時處理,完成時處理的響應操做。這其中最大的區別就是使用TaskExecutor來對Callable進行異步處理

相關文章
相關標籤/搜索