SpringBoot的四種異步處理,學到了

本篇文章咱們以SpringBoot中異步的使用(包括:異步調用和異步方法兩個維度)來進行講解。css

異步請求與同步請求

咱們先經過一張圖來區分一下異步請求和同步請求的區別:前端

SpringBoot的四種異步處理,學到了


在上圖中有三個角色:客戶端、Web容器和業務處理線程。java

兩個流程中客戶端對Web容器的請求,都是同步的。由於它們在請求客戶端時都處於阻塞等待狀態,並無進行異步處理。web

在Web容器部分,第一個流程採用同步請求,第二個流程採用異步回調的形式。spring

經過異步處理,能夠先釋放容器分配給請求的線程與相關資源,減輕系統負擔,從而增長了服務器對客戶端請求的吞吐量。但併發請求量較大時,一般會經過負載均衡的方案來解決,而不是異步。typescript

Servlet3.0中的異步

Servlet 3.0以前,Servlet採用Thread-Per-Request的方式處理請求,即每一次Http請求都由一個線程從頭至尾處理。當涉及到耗時操做時,性能問題便比較明顯。瀏覽器

Servlet 3.0中提供了異步處理請求。能夠先釋放容器分配給請求的線程與相關資源,減輕系統負擔,從而增長服務的吞吐量。服務器

Servlet 3.0的異步是經過AsyncContext對象來完成的,它能夠從當前線程傳給另外一個線程,並歸還初始線程。新的線程處理完業務能夠直接返回結果給客戶端。網絡

AsyncContext對象能夠從HttpServletRequest中獲取:併發

@RequestMapping("/async")public void async(HttpServletRequest request) {
    AsyncContext asyncContext = request.getAsyncContext();
}

在AsyncContext中提供了獲取ServletRequest、ServletResponse和添加監聽(addListener)等功能:

public interface AsyncContext {

    ServletRequest getRequest();

    ServletResponse getResponse();

    void addListener(AsyncListener var1);
    
    void setTimeout(long var1);

    // 省略其餘方法}

不只能夠經過AsyncContext獲取Request和Response等信息,還能夠設置異步處理超時時間。一般,超時時間(單位毫秒)是須要設置的,否則無限等下去不就與同步處理同樣了。

經過AsyncContext的addListener還能夠添加監聽事件,用來處理異步線程的開始、完成、異常、超時等事件回調。

addListener方法的參數AsyncListener的源碼以下:

public interface AsyncListener extends EventListener {
    // 異步執行完畢時調用    void onComplete(AsyncEvent var1) throws IOException;
    // 異步線程執行超時調用    void onTimeout(AsyncEvent var1) throws IOException;
    // 異步線程出錯時調用    void onError(AsyncEvent var1) throws IOException;
    // 異步線程開始時調用    void onStartAsync(AsyncEvent var1) throws IOException;
}

一般,異常或超時時返回調用方錯誤信息,而異常時會處理一些清理和關閉操做或記錄異常日誌等。

基於Servlet方式實現異步請求

下面直接看一個基於Servlet方式的異步請求示例:

@GetMapping(value = "/email/send")public void servletReq(HttpServletRequest request) {
    AsyncContext asyncContext = request.startAsync();
    // 設置監聽器:可設置其開始、完成、異常、超時等事件的回調處理    asyncContext.addListener(new AsyncListener() {
        @Override
        public void onTimeout(AsyncEvent event) {
            System.out.println("處理超時了...");
        }

        @Override
        public void onStartAsync(AsyncEvent event) {
            System.out.println("線程開始執行");
        }

        @Override
        public void onError(AsyncEvent event) {
            System.out.println("執行過程當中發生錯誤:" + event.getThrowable().getMessage());
        }

        @Override
        public void onComplete(AsyncEvent event) {
            System.out.println("執行完成,釋放資源");
        }
    });
    //設置超時時間    asyncContext.setTimeout(6000);
    asyncContext.start(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(5000);
                System.out.println("內部線程:" + Thread.currentThread().getName());
                asyncContext.getResponse().getWriter().println("async processing");
            } catch (Exception e) {
                System.out.println("異步處理髮生異常:" + e.getMessage());
            }
            // 異步請求完成通知,整個請求完成            asyncContext.complete();
        }
    });
    //此時request的線程鏈接已經釋放了    System.out.println("主線程:" + Thread.currentThread().getName());
}

啓動項目,訪問對應的URL,打印日誌以下:

主線程:http-nio-8080-exec-4內部線程:http-nio-8080-exec-5執行完成,釋放資源

能夠看出,上述代碼先執行完了主線程,也就是程序的最後一行代碼的日誌打印,而後纔是內部線程的執行。內部線程執行完成,AsyncContext的onComplete方法被調用。

若是經過瀏覽器訪問對應的URL,還能夠看到該方法的返回值「async processing」。說明內部線程的結果一樣正常的返回到客戶端了。

基於Spring實現異步請求

基於Spring能夠經過Callable、DeferredResult或者WebAsyncTask等方式實現異步請求。

基於Callable實現

對於一次請求(/email),基於Callable的處理流程以下:

一、Spring MVC開啓副線程處理業務(將Callable提交到TaskExecutor);

二、DispatcherServlet和全部的Filter退出Web容器的線程,可是response保持打開狀態;

三、Callable返回結果,SpringMVC將原始請求從新派發給容器(再從新請求一次/email),恢復以前的處理;

四、DispatcherServlet從新被調用,將結果返回給用戶;

代碼實現示例以下:

@GetMapping("/email")public Callable<String> order() {
    System.out.println("主線程開始:" + Thread.currentThread().getName());
    Callable<String> result = () -> {
        System.out.println("副線程開始:" + Thread.currentThread().getName());
        Thread.sleep(1000);
        System.out.println("副線程返回:" + Thread.currentThread().getName());
        return "success";
    };

    System.out.println("主線程返回:" + Thread.currentThread().getName());
    return result;
}

訪問對應URL,控制檯輸入日誌以下:

主線程開始:http-nio-8080-exec-1主線程返回:http-nio-8080-exec-1副線程開始:task-1副線程返回:task-1

經過日誌能夠看出,主線程已經完成了,副線程才進行執行。同時,URL返回結果「success」。這也說明一個問題,服務器端的異步處理對客戶端來講是不可見的。

Callable默認使用SimpleAsyncTaskExecutor類來執行,這個類很是簡單並且沒有重用線程。在實踐中,須要使用AsyncTaskExecutor類來對線程進行配置。

這裏經過實現WebMvcConfigurer接口來完成線程池的配置。

@Configurationpublic class WebConfig implements WebMvcConfigurer {

    @Resource    private ThreadPoolTaskExecutor myThreadPoolTaskExecutor;

    /**
     * 配置線程池
     */    @Bean(name = "asyncPoolTaskExecutor")
    public ThreadPoolTaskExecutor getAsyncThreadPoolTaskExecutor() {
        ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor();
        taskExecutor.setCorePoolSize(2);
        taskExecutor.setMaxPoolSize(10);
        taskExecutor.setQueueCapacity(25);
        taskExecutor.setKeepAliveSeconds(200);
        taskExecutor.setThreadNamePrefix("thread-pool-");
        // 線程池對拒絕任務(無線程可用)的處理策略,目前只支持AbortPolicy、CallerRunsPolicy;默認爲後者        taskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        taskExecutor.initialize();
        return taskExecutor;
    }

    @Override    public void configureAsyncSupport(final AsyncSupportConfigurer configurer) {
        // 處理callable超時        configurer.setDefaultTimeout(60 * 1000);
        configurer.setTaskExecutor(myThreadPoolTaskExecutor);
        configurer.registerCallableInterceptors(timeoutCallableProcessingInterceptor());
    }

    @Bean    public TimeoutCallableProcessingInterceptor timeoutCallableProcessingInterceptor() {
        return new TimeoutCallableProcessingInterceptor();
    }
}

爲了驗證打印的線程,咱們將實例代碼中的System.out.println替換成日誌輸出,會發如今使用線程池以前,打印日誌以下:

2021-02-21 09:45:37.144  INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 主線程開始:http-nio-8080-exec-12021-02-21 09:45:37.144  INFO 8312 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 主線程返回:http-nio-8080-exec-12021-02-21 09:45:37.148  INFO 8312 --- [         task-1] c.s.learn.controller.AsynController      : 副線程開始:task-12021-02-21 09:45:38.153  INFO 8312 --- [         task-1] c.s.learn.controller.AsynController      : 副線程返回:task-1

線程名稱爲「task-1」。讓線程池生效以後,打印日誌以下:

2021-02-21 09:50:28.950  INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 主線程開始:http-nio-8080-exec-12021-02-21 09:50:28.951  INFO 8339 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 主線程返回:http-nio-8080-exec-12021-02-21 09:50:28.955  INFO 8339 --- [  thread-pool-1] c.s.learn.controller.AsynController      : 副線程開始:thread-pool-12021-02-21 09:50:29.956  INFO 8339 --- [  thread-pool-1] c.s.learn.controller.AsynController      : 副線程返回:thread-pool-1

線程名稱爲「thread-pool-1」,其中前面的「thread-pool」正是咱們配置的線程池前綴。

除了線程池的配置,還能夠配置統一異常處理,這裏就再也不演示了。

基於WebAsyncTask實現

Spring提供的WebAsyncTask是對Callable的包裝,提供了更強大的功能,好比:處理超時回調、錯誤回調、完成回調等。

@GetMapping("/webAsyncTask")public WebAsyncTask<String> webAsyncTask() {
    log.info("外部線程:" + Thread.currentThread().getName());
    WebAsyncTask<String> result = new WebAsyncTask<>(60 * 1000L, new Callable<String>() {
        @Override        public String call() {
            log.info("內部線程:" + Thread.currentThread().getName());
            return "success";
        }
    });
    result.onTimeout(new Callable<String>() {
        @Override        public String call() {
            log.info("timeout callback");
            return "timeout callback";
        }
    });
    result.onCompletion(new Runnable() {
        @Override        public void run() {
            log.info("finish callback");
        }
    });
    return result;
}

訪問對應請求,打印日誌:

2021-02-21 10:22:33.028  INFO 8547 --- [nio-8080-exec-1] c.s.learn.controller.AsynController      : 外部線程:http-nio-8080-exec-12021-02-21 10:22:33.033  INFO 8547 --- [  thread-pool-1] c.s.learn.controller.AsynController      : 內部線程:thread-pool-12021-02-21 10:22:33.055  INFO 8547 --- [nio-8080-exec-2] c.s.learn.controller.AsynController      : finish callback

基於DeferredResult實現

DeferredResult使用方式與Callable相似,但在返回結果時不同,它返回的時實際結果可能沒有生成,實際的結果可能會在另外的線程裏面設置到DeferredResult中去。

DeferredResult的這個特性對實現服務端推技術、訂單過時時間處理、長輪詢、模擬MQ的功能等高級應用很是重要。

關於DeferredResult的使用先來看一下官方的例子和說明:

@RequestMapping("/quotes")@ResponseBodypublic DeferredResult<String> quotes() {
  DeferredResult<String> deferredResult = new DeferredResult<String>();
  // Save the deferredResult in in-memory queue ...  return deferredResult;
}// In some other thread...deferredResult.setResult(data);

上述示例中咱們能夠發現DeferredResult的調用並不必定在Spring MVC當中,它能夠是別的線程。官方的解釋也是如此:

In this case the return value will also be produced from a separate thread. However, that thread is not known to Spring MVC. For example the result may be produced in response to some external event such as a JMS message, a scheduled task, etc.

也就是說,DeferredResult返回的結果也多是由MQ、定時任務或其餘線程觸發。來個實例:

@Controller@RequestMapping("/async/controller")public class AsyncHelloController {

    private List<DeferredResult<String>> deferredResultList = new ArrayList<>();

    @ResponseBody    @GetMapping("/hello")
    public DeferredResult<String> helloGet() throws Exception {
        DeferredResult<String> deferredResult = new DeferredResult<>();

        //先存起來,等待觸發        deferredResultList.add(deferredResult);
        return deferredResult;
    }

    @ResponseBody    @GetMapping("/setHelloToAll")
    public void helloSet() throws Exception {
        // 讓全部hold住的請求給與響應        deferredResultList.forEach(d -> d.setResult("say hello to all"));
    }
}

第一個請求/hello,會先將deferredResult存起來,前端頁面是一直等待(轉圈)狀態。直到發第二個請求:setHelloToAll,全部的相關頁面纔會有響應。

整個執行流程以下:

  • controller返回一個DeferredResult,把它保存到內存裏或者List裏面(供後續訪問);
  • Spring MVC調用request.startAsync(),開啓異步處理;與此同時將DispatcherServlet裏的攔截器、Filter等等都立刻退出主線程,可是response仍然保持打開的狀態;
  • 應用經過另一個線程(多是MQ消息、定時任務等)給DeferredResult#setResult值。而後SpringMVC會把這個請求再次派發給servlet容器;
  • DispatcherServlet再次被調用,而後處理後續的標準流程;

經過上述流程能夠發現:利用DeferredResult可實現一些長鏈接的功能,好比當某個操做是異步時,能夠先保存對應的DeferredResult對象,當異步通知回來時,再找到這個DeferredResult對象,在setResult處理結果便可。從而提升性能。

SpringBoot中的異步實現

在SpringBoot中將一個方法聲明爲異步方法很是簡單,只需兩個註解便可@EnableAsync和@Async。其中@EnableAsync用於開啓SpringBoot支持異步的功能,用在SpringBoot的啓動類上。@Async用於方法上,標記該方法爲異步處理方法。

須要注意的是@Async並不支持用於被@Configuration註解的類的方法上。同一個類中,一個方法調用另一個有@Async的方法,註解也是不會生效的。

@EnableAsync的使用示例:

@SpringBootApplication@EnableAsyncpublic class App {

    public static void main(String[] args) {
        SpringApplication.run(App.class, args);
    }
}

@Async的使用示例:

@Servicepublic class SyncService {
    
    @Async    public void asyncEvent() {
        // 業務處理    }
}

@Async註解的使用與Callable有相似之處,在默認狀況下使用的都是SimpleAsyncTaskExecutor線程池,可參考Callable中的方式來自定義線程池。

下面經過一個實例來驗證一下,啓動類上使用@EnableAsync,而後定義Controller類:

@RestControllerpublic class IndexController {
    
    @Resource    private UserService userService;
    
    @RequestMapping("/async")    public String async(){
        System.out.println("--IndexController--1");
        userService.sendSms();
        System.out.println("--IndexController--4");
        return "success";
    }
}

定義Service及異步方法:

@Servicepublic class UserService {

    @Async
    public void sendSms(){
        System.out.println("--sendSms--2");
        IntStream.range(0, 5).forEach(d -> {
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
        System.out.println("--sendSms--3");
    }
}

若是先註釋掉@EnableAsync和@Async註解,即正常狀況下的業務請求,打印日誌爲:

--IndexController--1--sendSms--2--sendSms--3--IndexController--4

使用@EnableAsync和@Async註解時,打印日誌以下:

--IndexController--1--IndexController--4--sendSms--2--sendSms--3

經過日誌的對比咱們能夠看出,使用了@Async的方法,會被當成一個子線程。因此,整個sendSms方法會在主線程執行完了以後執行。

這樣的效果是否是跟咱們上面使用的其餘形式的異步殊途同歸?因此在文章最開始已經說到,網絡上所謂的「異步調用與異步請求的區別」是並不存儲在的,本質上都是一回事,只不過實現形式不一樣而已。這裏所提到異步方法,也就是將方法進行異步處理而已。

@Async、WebAsyncTask、Callable、DeferredResult的區別

所在的包不一樣:

  • @Async:org.springframework.scheduling.annotation;
  • WebAsyncTask:org.springframework.web.context.request.async;
  • Callable:java.util.concurrent;
  • DeferredResult:org.springframework.web.context.request.async;

經過所在的包,咱們應該隱隱約約感到一些區別,好比@Async是位於scheduling包中,而WebAsyncTask和DeferredResult是用於Web(Spring MVC)的,而Callable是用於concurrent(併發)處理的。

對於Callable,一般用於Controller方法的異步請求,固然也能夠用於替換Runable的方式。在方法的返回上與正常的方法有所區別:

// 普通方法public String aMethod(){
}// 對照Callable方法public Callable<String>  aMethod(){
}

而WebAsyncTask是對Callable的封裝,提供了一些事件回調的處理,本質上區別不大。

DeferredResult使用方式與Callable相似,重點在於跨線程之間的通訊。

@Async也是替換Runable的一種方式,能夠代替咱們本身建立線程。並且適用的範圍更廣,並不侷限於Controller層,而能夠是任何層的方法上。

固然,你們也能夠從返回結果,異常處理等角度來分析一下,這裏就再也不展開了。

小結

通過上述的一步步分析,你們應該對於Servlet3.0及Spring中對異步處理有所瞭解。當了解了這些基礎理論,實戰實例,使用方法及注意事項以後,想必更可以對網絡上的相關知識可以進一步的去僞存真。

相關文章
相關標籤/搜索