Springboot異步接口調用與多線程線程池使用注意事項說明

目錄html

Java多線程相關說明java

異步調用分爲兩類web

多線程調用方法spring

多線程線程池apache

Springboot實現線程池配置tomcat

AsyncTaskExecutor服務器

ThreadPoolTaskExecutor多線程

Rest風格的異步請求接口併發

@Async註解app

Callable異步返回值

DeferredResult

WebAsyncTask

多線程異步併發異常

模擬Servlet異步請求分發


 

Java多線程相關說明

異步調用分爲兩類

  • 有返回值:用戶須要Callable、Futrue等來接收最終的處理結果,但這個過程是異步非阻塞的。
  • 無返回值:用戶提交請求到接口以後不須要任何返回值,請求到達服務端以後就沒有任何關係了,用戶能夠不等待而去作其餘的業務操做。

多線程調用方法

  • Callable:有返回值的線程方法,call 將會對用戶請求作出結果反饋。
  • Runnable:線程的接口,須要實現run方法。
  • Thread:經過Thread類繼承重寫run方法實現。

多線程線程池

  • Executor(Executors) 提供多種線程調度的線程池方案。
  • AsyncTaskExecutor 異步線程池方案。
  • ThreadPoolTaskExecutor 任務線程池方案。

注意:線程池合理分配能夠實現服務器CPU高效利用,初始化大小,最大大小,排隊隊列的大小都會影響系統業務吞吐量,能夠使用Jmeter壓測接口來驗證。切記:接口異步了並不表明全部的任務線程池都能吸納,若是排隊滿了,線程池會拋異常,因此爲了不拋異常咱們在提交任務到線程池的時候須要加一個本地隊列,切不可將全部請求都懟到線程池。線程池的大小有限制,早晚會到達這個邊界,到達邊界就會異常,因此應該避免。

Springboot實現線程池配置


AsyncTaskExecutor

異步任務線程池,這個從名字上看一眼就知道是Executor的系列接口:

public interface AsyncTaskExecutor extends TaskExecutor {
    long TIMEOUT_IMMEDIATE = 0L;
    long TIMEOUT_INDEFINITE = 9223372036854775807L;

    void execute(Runnable var1, long var2);

    Future<?> submit(Runnable var1);

    <T> Future<T> submit(Callable<T> var1);
}

@FunctionalInterface
public interface TaskExecutor extends Executor {
    void execute(Runnable var1);
}

/** @since 1.5
 * @author Doug Lea
 */
public interface Executor {

    /**
     * Executes the given command at some time in the future.  The command
     * may execute in a new thread, in a pooled thread, or in the calling
     * thread, at the discretion of the {@code Executor} implementation.
     *
     * @param command the runnable task
     * @throws RejectedExecutionException if this task cannot be
     * accepted for execution
     * @throws NullPointerException if command is null
     */
    void execute(Runnable command);
}

在Springboot中能夠做爲配置使用:

@Configuration
public class AsyncTaskExecutorConfig extends WebMvcConfigurerAdapter {

    @Override
    public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
        configurer.setDefaultTimeout(-1);
        configurer.setTaskExecutor(asyncTaskExecutor());
    }

    @Bean("asyncTaskExecutor")
    public AsyncTaskExecutor asyncTaskExecutor() {
        return new SimpleAsyncTaskExecutor("async");
    }

}

ThreadPoolTaskExecutor

這個類是比較經常使用的有界任務線程池。在Springboot中也做爲配置類使用:

@Configuration
public class ThreadPoolTaskExecutorConfig {

    @Value("${executors.threadPoolTaskExecutor.corePoolSize:10}")
    private Integer corePoolSize;

    @Value("${executors.threadPoolTaskExecutor.maxPoolSize:500}")
    private Integer maxPoolSize;

    @Value("${executors.threadPoolTaskExecutor.queueCapacity:500}")
    private Integer queueCapacity;

    @Value("${executors.threadPoolTaskExecutor.keepAliveSeconds:60}")
    private Integer keepAliveSeconds;

    @Value("${executors.threadPoolTaskExecutor.allowCoreThreadTimeOut:true}")
    private Boolean allowCoreThreadTimeOut;

    @Value("${executors.threadPoolTaskExecutor.threadNamePrefix:ThreadPoolTaskExecutor}")
    private String threadNamePrefix;

    @Value("${executors.threadPoolTaskExecutor.awaitTerminationSeconds:10}")
    private Integer awaitTerminationSeconds;

    @Value("${executors.threadPoolTaskExecutor.rejectedExecutionHandler:AbortPolicy}")
    private String rejectedExecutionHandler;

    /**
     * 線程池配置【不能將全部任務都提交到線程會產生線程排隊,即便線程排隊足夠大也不能這樣處理】
     *
     * @return
     */
    @Bean("taskExecutor")
    public ThreadPoolTaskExecutor taskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        // 核心線程數
        taskExecutor.setCorePoolSize(corePoolSize);
        // 最大線程數
        taskExecutor.setMaxPoolSize(maxPoolSize);
        // 隊列容量大小
        taskExecutor.setQueueCapacity(queueCapacity);
        // 是否容許核心線程超時
        taskExecutor.setAllowCoreThreadTimeOut(allowCoreThreadTimeOut);
        // 線程保活時間
        taskExecutor.setKeepAliveSeconds(keepAliveSeconds);
        // 線程命名前綴規則
        taskExecutor.setThreadNamePrefix(threadNamePrefix);
        // 等待終止時間(秒)
        taskExecutor.setAwaitTerminationSeconds(awaitTerminationSeconds);
        /**
         * @see https://blog.csdn.net/pfnie/article/details/52755769
         * 線程池滿了以後如何處理:默認是 new AbortPolicy();
         *
         * (1) ThreadPoolExecutor.AbortPolicy   處理程序遭到拒絕將拋出運行時RejectedExecutionException;
         * (2) ThreadPoolExecutor.CallerRunsPolicy 線程調用運行該任務的 execute 自己。此策略提供簡單的反饋控制機制,可以減緩新任務的提交速度
         * (3) ThreadPoolExecutor.DiscardPolicy  不能執行的任務將被刪除;
         * (4) ThreadPoolExecutor.DiscardOldestPolicy  若是執行程序還沒有關閉,則位於工做隊列頭部的任務將被刪除,而後重試執行程序(若是再次失敗,則重複此過程)。
         */
         switch (rejectedExecutionHandler){
             case "AbortPolicy":
                 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
                 break;
             case "CallerRunsPolicy":
                 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
                 break;
             case "DiscardPolicy":
                 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
                 break;
             case "DiscardOldestPolicy":
                 taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardOldestPolicy());
                 break;
         }
        // 初始化線程池
        taskExecutor.initialize();
        return taskExecutor;
    }
}

上面的參數配置的比較詳細了,這個類全部參數基本都在上面了很簡單。後面的示例用的就是該線程池。

Rest風格的異步請求接口

@Async註解

示例以下:

@Slf4j
@RestController
@RequestMapping("/test")
public class AsyncController {

    /**
     * HTTP心跳發送接口
     *
     * @return
     */
    @RequestMapping("/async/heartBeat.do")
    @Async
    public void heartBeat(XhyPosition position) {
        long start = System.currentTimeMillis();
        try {
            //mobileService.savePositionBySync(position);
            log.info("HTTP心跳發送成功!");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            long end = System.currentTimeMillis();
            log.info("HTTP心跳耗時=" + (end - start));
        }
    }


}

須要在Springboot啓動入口添加@EnableAsync註解。

Callable異步返回值

Callable 類型的返回Spring在底層已經作了代理進行處理,不須要經過線程池提交。

@Slf4j
@RestController
@RequestMapping("/test")
public class CallableController {

    @Autowired
    MobileWebService mobileService;

    /**
     * HTTP心跳發送接口
     *
     * @return
     */
    @RequestMapping("/callable/heartBeat.do")
    public Callable<ResponseEntity> heartBeat(XhyPosition position) {
        Callable<ResponseEntity> responseEntityCallable = new Callable<ResponseEntity>() {
            @Override
            public ResponseEntity call() throws Exception {
                //mobileService.savePositionBySync(position);
                log.info("HTTP心跳發送成功!");
                return ResponseEntity.ok("HTTP心跳發送成功!");
            }
        };
        return responseEntityCallable;
    }


}

但值得注意的是一樣的寫法,沒有返回值的回調線程方法是不執行的,須要經過線程池進行提交。

/**
     * HTTP心跳發送接口
     *
     * @return
     */
    @RequestMapping("/callable/heartBeat2.do")
    public void heartBeat2(XhyPosition position) {
        Callable<ResponseEntity> responseEntityCallable = new Callable<ResponseEntity>() {
            @Override
            public ResponseEntity call() throws Exception {
                //mobileService.savePositionBySync(position);
                log.info("HTTP心跳發送成功!");
                return ResponseEntity.ok("HTTP心跳發送成功!");
            }
        };
        // 必須進行手動提交
        threadPoolExecutor.submit(responseEntityCallable);

    }

DeferredResult

DeferredResult是一個結果的封裝類型,Spring MVC推薦使用DeferredResult、Callable、CompletableFuture的返回值來實現異步接口調用。下面是本節示例:

@RestController
@RequestMapping("/test")
public class DeferredResultController {

    /**
     * HTTP心跳發送接口
     *
     * @return
     */
    @RequestMapping("/deferred/heartBeat.do")
    public DeferredResult<StatusCode> heartBeat(XhyPosition position) {
        DeferredResult<StatusCode> deferredResult = new DeferredResult<>();
        deferredResult.setResult(StatusCode.SUCCESS);
        return deferredResult;
    }
}

WebAsyncTask

WebAsyncTask提供了多環節的回調操做,能夠知足大多數需求,但Spring並不推薦此種方式的異步。

/**
 * @Copyright: 2019-2021
 * @FileName: WebAsyncTaskController.java
 * @Author: PJL
 * @Date: 2020/4/14 10:04
 * @Description: 異步接口調用之WebAsyncTask[操做完善:但Spring不推薦使用,推薦使用:DeferredResultController、Callable、CompletableFuture]
 */
@Slf4j
@RestController
@RequestMapping("/test")
public class WebAsyncTaskController {

    /**
     * 異步線程超時設置
     */
    @Value("${system.asyncRequestTimeout:10}")
    private Integer asyncRequestTimeout;

    @Autowired
    MobileWebService mobileService;

    @Autowired
    AggregationRedisService aggregationRedisService;

    @Qualifier("asyncTaskExecutor")
    @Autowired
    AsyncTaskExecutor executor;

    /**
     * HTTP[異步請求]心跳發送接口
     * @return
     */
    @RequestMapping("/webAsyncTask/blank.do")
    public WebAsyncTask<StatusCode> blank(){
        WebAsyncTask webAsyncTask= new WebAsyncTask(asyncRequestTimeout*1000L,()-> {
            return StatusCode.SUCCESS;
        });
        return webAsyncTask;
    }


    /**
     * HTTP[異步請求]心跳發送接口
     * @return
     */
    @RequestMapping("/webAsyncTask/heartBeat.do")
    public WebAsyncTask<StatusCode> heartBeat(XhyPosition position){
        WebAsyncTask webAsyncTask= new WebAsyncTask(asyncRequestTimeout*1000L,()-> {
            position.setT(System.currentTimeMillis());
            MobileServiceDataQueue.addToPositionQueue(position);
            return StatusCode.SUCCESS;
        });
        // 處理完成異步回調
        webAsyncTask.onCompletion(()->{
            log.debug("HTTP心跳發送........完成!");
            // aggregationRedisService.publish(Constants.MOBILE_POSITION_FINISH_UPLOAD,"HTTP心跳處理完成!");
        });
        // 超時處理
        webAsyncTask.onTimeout(()->{
            //log.debug("HTTP心跳發送........超時!");
            return StatusCode.TIMEOUT;
        });
        // 錯誤處理
        webAsyncTask.onError(()->{
           log.debug("HTTP心跳發送........錯誤!");
            return StatusCode.ERROR;
        });
        return webAsyncTask;
    }


    /**
     * HTTP[線程池]心跳發送接口
     * @return
     */
    @RequestMapping("/webAsyncTask/heartBeatWithThreadPool.do")
    public WebAsyncTask<StatusCode> heartBeatWithThreadPool(XhyPosition position){
        WebAsyncTask webAsyncTaskByPool= new WebAsyncTask(asyncRequestTimeout*1000L,executor,()-> {
            position.setT(System.currentTimeMillis());
            MobileServiceDataQueue.addToPositionQueue(position);
            return StatusCode.SUCCESS;
        });
        // 處理完成異步回調
        webAsyncTaskByPool.onCompletion(()->{
            log.debug("HTTP心跳發送....by pool....完成!");
            // aggregationRedisService.publish(Constants.MOBILE_POSITION_FINISH_UPLOAD,"HTTP心跳處理完成!");
        });
        // 超時處理
        webAsyncTaskByPool.onTimeout(()->{
            log.debug("HTTP心跳發送........超時!");
            return StatusCode.TIMEOUT;
        });
        // 錯誤處理
        webAsyncTaskByPool.onError(()->{
            log.debug("HTTP心跳發送........錯誤!");
            return StatusCode.ERROR;
        });
        return webAsyncTaskByPool;
    }

}

多線程異步併發異常

cannot dispatch without an AsyncContext:

[org.apache.catalina.core.AsyncListenerWrapper]
java.lang.IllegalArgumentException: Cannot dispatch without an AsyncContext
	at org.springframework.util.Assert.notNull(Assert.java:198)
	at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.dispatch(StandardServletAsyncWebRequest.java:131)
	at org.springframework.web.context.request.async.WebAsyncManager.setConcurrentResultAndDispatch(WebAsyncManager.java:391)
	at org.springframework.web.context.request.async.WebAsyncManager.lambda$startCallableProcessing$2(WebAsyncManager.java:315)
	at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.lambda$onError$0(StandardServletAsyncWebRequest.java:146)
	at java.util.ArrayList.forEach(ArrayList.java:1257)
	at org.springframework.web.context.request.async.StandardServletAsyncWebRequest.onError(StandardServletAsyncWebRequest.java:146)
	at org.apache.catalina.core.AsyncListenerWrapper.fireOnError(AsyncListenerWrapper.java:49)
	at org.apache.catalina.core.AsyncContextImpl.setErrorState(AsyncContextImpl.java:410)
	at org.apache.catalina.connector.CoyoteAdapter.asyncDispatch(CoyoteAdapter.java:239)
	at org.apache.coyote.AbstractProcessor.dispatch(AbstractProcessor.java:242)
	at org.apache.coyote.AbstractProcessorLight.process(AbstractProcessorLight.java:53)
	at org.apache.coyote.AbstractProtocol$ConnectionHandler.process(AbstractProtocol.java:861)
	at org.apache.tomcat.util.net.NioEndpoint$SocketProcessor.doRun(NioEndpoint.java:1579)
	at org.apache.tomcat.util.net.SocketProcessorBase.run(SocketProcessorBase.java:49)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at org.apache.tomcat.util.threads.TaskThread$WrappingRunnable.run(TaskThread.java:61)
	at java.lang.Thread.run(Thread.java:748)

由於HTTP底層包含了TCP的鏈接,客戶端發送Jmeter方法請求會分配系統臨時端口用於客戶端請求處理。Jmeter 接口請求參數keep_alive須要去掉勾選否則端口不會及時釋放。

這個問題能夠總結爲是:Spring容器異步分發工做機制客戶端與服務端在鏈接斷開時無法處理異步請求的目標servlet沒法分發所致。這個問題不是程序自己業務代碼報錯,能夠忽略。

模擬Servlet異步請求分發

參考:https://www.concretepage.com/java-ee/jsp-servlet/asynccontext-example-in-servlet-3

package com.concretepage.servlet;
import java.io.IOException;
import java.io.PrintWriter;
import javax.servlet.AsyncContext;
import javax.servlet.ServletRequest;
import javax.servlet.annotation.WebServlet;
import javax.servlet.http.HttpServlet;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;

@WebServlet(asyncSupported = true, value = "/AsyncContextExample", loadOnStartup = 1)
public class AsyncContextExample extends HttpServlet   {
	private static final long serialVersionUID = 1L;
	public void doPost(HttpServletRequest request, HttpServletResponse response) throws IOException{
		doGet(request,response);
	}
        public void doGet(HttpServletRequest request, HttpServletResponse response) throws IOException{
           response.setContentType("text/html");
           PrintWriter out = response.getWriter();
           AsyncContext asyncContext = request.startAsync();
           asyncContext.setTimeout(0);
           ServletRequest servReq = asyncContext.getRequest();
           boolean b = servReq.isAsyncStarted();
           out.println("isAsyncStarted : "+b);
           asyncContext.dispatch("/asynctest.jsp");
	   out.println("<br/>asynchronous task finished.");	
	}
}
相關文章
相關標籤/搜索