Tomcat是如何實現異步Servlet的

tomcat

前言

經過我以前的Tomcat系列文章,相信看我博客的同窗對Tomcat應該有一個比較清晰的瞭解了,在前幾篇博客咱們討論了Tomcat在SpringBoot框架中是如何啓動的,討論了Tomcat的內部組件是如何設計以及請求是如何流轉的,那麼咱們這篇博客聊聊Tomcat的異步Servlet,Tomcat是如何實現異步Servlet的以及異步Servlet的使用場景。web

手擼一個異步的Servlet

咱們直接藉助SpringBoot框架來實現一個Servlet,這裏只展現Servlet代碼:apache

@WebServlet(urlPatterns = "/async",asyncSupported = true)
@Slf4j
public class AsyncServlet extends HttpServlet {

    ExecutorService executorService =Executors.newSingleThreadExecutor();

    @Override
     protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        //開啓異步,獲取異步上下文
        final AsyncContext ctx = req.startAsync();
        // 提交線程池異步執行
        executorService.execute(new Runnable() {


            @Override
            public void run() {
                try {
                    log.info("async Service 準備執行了");
                    //模擬耗時任務
                    Thread.sleep(10000L);
                    ctx.getResponse().getWriter().print("async servlet");
                    log.info("async Service 執行了");
                } catch (IOException e) {
                    e.printStackTrace();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                //最後執行完成後完成回調。
                ctx.complete();
            }
        });
    }

複製代碼

上面的代碼實現了一個異步的Servlet,實現了doGet方法注意在SpringBoot中使用須要再啓動類加上@ServletComponentScan註解來掃描Servlet。既然代碼寫好了,咱們來看看實際運行效果。編程

咱們發送一個請求後,看到頁面有響應,同時,看到請求時間花費了10.05s,那麼咱們這個Servlet算是能正常運行啦。有同窗確定會問,這不是異步servlet嗎?你的響應時間並無加快,有什麼用呢?對,咱們的響應時間並不能加快,仍是會取決於咱們的業務邏輯,可是咱們的異步servlet請求後,依賴於業務的異步執行,咱們能夠當即返回,也就是說,Tomcat的線程能夠當即回收,默認狀況下,Tomcat的核心線程是10,最大線程數是200,咱們能及時回收線程,也就意味着咱們能處理更多的請求,可以增長咱們的吞吐量,這也是異步Servlet的主要做用。瀏覽器

異步Servlet的內部原理

瞭解完異步Servlet的做用後,咱們來看看,Tomcat是如何是先異步Servlet的。其實上面的代碼,主要核心邏輯就兩部分,final AsyncContext ctx = req.startAsync()ctx.complete()那咱們來看看他們究竟作了什麼?tomcat

public AsyncContext startAsync(ServletRequest request,
            ServletResponse response) {
        if (!isAsyncSupported()) {
            IllegalStateException ise =
                    new IllegalStateException(sm.getString("request.asyncNotSupported"));
            log.warn(sm.getString("coyoteRequest.noAsync",
                    StringUtils.join(getNonAsyncClassNames())), ise);
            throw ise;
        }

        if (asyncContext == null) {
            asyncContext = new AsyncContextImpl(this);
        }

        asyncContext.setStarted(getContext(), request, response,
                request==getRequest() && response==getResponse().getResponse());
        asyncContext.setTimeout(getConnector().getAsyncTimeout());

        return asyncContext;
    }
複製代碼

咱們發現req.startAsync()只是保存了一個異步上下文,同時設置一些基礎信息,好比Timeout,順便提一下,這裏設置的默認超時時間是30S,若是你的異步處理邏輯超過30S,此時執行ctx.complete()就會拋出IllegalStateException 異常。bash

咱們來看看ctx.complete()的邏輯多線程

public void complete() {
        if (log.isDebugEnabled()) {
            logDebug("complete ");
        }
        check();
        request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
    }
//類:AbstractProcessor 
 public final void action(ActionCode actionCode, Object param) {
    case ASYNC_COMPLETE: {
            clearDispatches();
            if (asyncStateMachine.asyncComplete()) {
                processSocketEvent(SocketEvent.OPEN_READ, true);
            }
            break;
        } 
    }
    //類:AbstractProcessor 
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
        SocketWrapperBase<?> socketWrapper = getSocketWrapper();
        if (socketWrapper != null) {
            socketWrapper.processSocket(event, dispatch);
        }
    }
    //類:AbstractEndpoint
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
            SocketEvent event, boolean dispatch) {
        //省略部分代碼
            SocketProcessorBase<S> sc = null;
            if (processorCache != null) {
                sc = processorCache.pop();
            }
            if (sc == null) {
                sc = createSocketProcessor(socketWrapper, event);
            } else {
                sc.reset(socketWrapper, event);
            }
            Executor executor = getExecutor();
            if (dispatch && executor != null) {
                executor.execute(sc);
            } else {
                sc.run();
            }
   
        return true;
    }
複製代碼

因此,這裏最終會調用AbstractEndpointprocessSocket方法,以前看過我前面博客的同窗應該有印象,EndPoint是用來接受和處理請求的,接下來就會交給Processor去進行協議處理。併發

類:AbstractProcessorLight
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
            throws IOException {
        //省略部分diam
        SocketState state = SocketState.CLOSED;
        Iterator<DispatchType> dispatches = null;
        do {
            if (dispatches != null) {
                DispatchType nextDispatch = dispatches.next();
                state = dispatch(nextDispatch.getSocketStatus());
            } else if (status == SocketEvent.DISCONNECT) {
            
            } else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
                state = dispatch(status);
                if (state == SocketState.OPEN) {
                    state = service(socketWrapper);
                }
            } else if (status == SocketEvent.OPEN_WRITE) {
                state = SocketState.LONG;
            } else if (status == SocketEvent.OPEN_READ){
                state = service(socketWrapper);
            } else {
                state = SocketState.CLOSED;
            }

        } while (state == SocketState.ASYNC_END ||
                dispatches != null && state != SocketState.CLOSED);

        return state;
    }
複製代碼

這部分是重點,AbstractProcessorLight會根據SocketEvent的狀態來判斷是否是要去調用service(socketWrapper),該方法最終會去調用到容器,從而完成業務邏輯的調用,咱們這個請求是執行完成後調用的,確定不能進容器了,否則就是死循環了,這裏經過isAsync()判斷,就會進入dispatch(status),最終會調用CoyoteAdapterasyncDispatch方法app

public boolean asyncDispatch(org.apache.coyote.Request req, org.apache.coyote.Response res,
            SocketEvent status) throws Exception {
        //省略部分代碼
        Request request = (Request) req.getNote(ADAPTER_NOTES);
        Response response = (Response) res.getNote(ADAPTER_NOTES);
        boolean success = true;
        AsyncContextImpl asyncConImpl = request.getAsyncContextInternal();
        try {
            if (!request.isAsync()) {
                response.setSuspended(false);
            }

            if (status==SocketEvent.TIMEOUT) {
                if (!asyncConImpl.timeout()) {
                    asyncConImpl.setErrorState(null, false);
                }
            } else if (status==SocketEvent.ERROR) {
                
            }

            if (!request.isAsyncDispatching() && request.isAsync()) {
                WriteListener writeListener = res.getWriteListener();
                ReadListener readListener = req.getReadListener();
                if (writeListener != null && status == SocketEvent.OPEN_WRITE) {
                    ClassLoader oldCL = null;
                    try {
                        oldCL = request.getContext().bind(false, null);
                        res.onWritePossible();//這裏執行瀏覽器響應,寫入數據
                        if (request.isFinished() && req.sendAllDataReadEvent() &&
                                readListener != null) {
                            readListener.onAllDataRead();
                        }
                    } catch (Throwable t) {
                       
                    } finally {
                        request.getContext().unbind(false, oldCL);
                    }
                } 
                }
            }
            //這裏判斷異步正在進行,說明這不是一個完成方法的回調,是一個正常異步請求,繼續調用容器。
            if (request.isAsyncDispatching()) {
                connector.getService().getContainer().getPipeline().getFirst().invoke(
                        request, response);
                Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
                if (t != null) {
                    asyncConImpl.setErrorState(t, true);
                }
            }
            //注意,這裏,若是超時或者出錯,request.isAsync()會返回false,這裏是爲了儘快的輸出錯誤給客戶端。
            if (!request.isAsync()) {
                //這裏也是輸出邏輯
                request.finishRequest();
                response.finishResponse();
            }
            //銷燬request和response
            if (!success || !request.isAsync()) {
                updateWrapperErrorCount(request, response);
                request.recycle();
                response.recycle();
            }
        }
        return success;
    }
複製代碼

上面的代碼就是ctx.complete()執行最終的方法了(固然省略了不少細節),完成了數據的輸出,最終輸出到瀏覽器。框架

這裏有同窗可能會說,我知道異步執行完後,調用ctx.complete()會輸出到瀏覽器,可是,第一次doGet請求執行完成後,Tomcat是怎麼知道不用返回到客戶端的呢?關鍵代碼在CoyoteAdapter中的service方法,部分代碼以下:

postParseSuccess = postParseRequest(req, request, res, response);
            //省略部分代碼
            if (postParseSuccess) {
                request.setAsyncSupported(
                        connector.getService().getContainer().getPipeline().isAsyncSupported());
                connector.getService().getContainer().getPipeline().getFirst().invoke(
                        request, response);
            }
            if (request.isAsync()) {
                async = true;
               } else {
               //輸出數據到客戶端
                request.finishRequest();
                response.finishResponse();
            if (!async) {
                updateWrapperErrorCount(request, response);
                //銷燬request和response
                request.recycle();
                response.recycle();
            }

複製代碼

這部分代碼在調用完Servlet後,會經過request.isAsync()來判斷是不是異步請求,若是是異步請求,就設置async = true。若是是非異步請求就執行輸出數據到客戶端邏輯,同時銷燬requestresponse。這裏就完成了請求結束後不響應客戶端的操做。

爲何說Spring Boot的@EnableAsync註解不是異步Servlet

由於以前準備寫本篇文章的時候就查詢過不少資料,發現不少資料寫SpringBoot異步編程都是依賴於@EnableAsync註解,而後在Controller用多線程來完成業務邏輯,最後彙總結果,完成返回輸出。這裏拿一個掘金大佬的文章來舉例《新手也能看懂的 SpringBoot 異步編程指南》,這篇文章寫得很通俗易懂,很是不錯,從業務層面來講,確實是異步編程,可是有一個問題,拋開業務的並行處理來講,針對整個請求來講,並非異步的,也就是說不能當即釋放Tomcat的線程,從而不能達到異步Servlet的效果。這裏我參考上文也寫了一個demo,咱們來驗證下,爲何它不是異步的。

@RestController
@Slf4j
public class TestController {
    @Autowired
    private TestService service;

    @GetMapping("/hello")
    public String test() {
        try {
            log.info("testAsynch Start");
            CompletableFuture<String> test1 = service.test1();
            CompletableFuture<String> test2 = service.test2();
            CompletableFuture<String> test3 = service.test3();
            CompletableFuture.allOf(test1, test2, test3);
            log.info("test1=====" + test1.get());
            log.info("test2=====" + test2.get());
            log.info("test3=====" + test3.get());
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (ExecutionException e) {
            e.printStackTrace();
        }
        return "hello";
    }
@Service
public class TestService {
    @Async("asyncExecutor")
    public CompletableFuture<String> test1() throws InterruptedException {
        Thread.sleep(3000L);
        return CompletableFuture.completedFuture("test1");
    }

    @Async("asyncExecutor")
    public CompletableFuture<String> test2() throws InterruptedException {
        Thread.sleep(3000L);
        return CompletableFuture.completedFuture("test2");
    }

    @Async("asyncExecutor")
    public CompletableFuture<String> test3() throws InterruptedException {
        Thread.sleep(3000L);
        return CompletableFuture.completedFuture("test3");
    }
}
@SpringBootApplication
@EnableAsync
public class TomcatdebugApplication {

    public static void main(String[] args) {
        SpringApplication.run(TomcatdebugApplication.class, args);
    }

    @Bean(name = "asyncExecutor")
    public Executor asyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(3);
        executor.setMaxPoolSize(3);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("AsynchThread-");
        executor.initialize();
        return executor;
    }

複製代碼

這裏我運行下,看看效果

這裏我請求以後,在調用容器執行業務邏輯以前打了一個斷點,而後在返回以後的一樣打了一個斷點,在Controller執行完以後,請求才回到了CoyoteAdapter中,而且判斷request.isAsync(),根據圖中看到,是爲false,那麼接下來就會執行request.finishRequest()response.finishResponse() 來執行響應的結束,並銷燬請求和響應體。頗有趣的事情是,我實驗的時候發現,在執行request.isAsync()以前,瀏覽器的頁面上已經出現了響應體,這是SpringBoot框架已經經過StringHttpMessageConverter類中的writeInternal方法已經進行輸出了。

以上分析的核心邏輯就是,Tomcat的線程執行CoyoteAdapter調用容器後,必需要等到請求返回,而後再判斷是不是異步請求,再處理請求,而後執行完畢後,線程才能進行回收。而我一最開始的異步Servlet例子,執行完doGet方法後,就會當即返回,也就是會直接到request.isAsync()的邏輯,而後整個線程的邏輯執行完畢,線程被回收。

聊聊異步Servlet的使用場景

分析了這麼多,那麼異步Servlet的使用場景有哪些呢?其實咱們只要抓住一點就能夠分析了,就是異步Servlet提升了系統的吞吐量,能夠接受更多的請求。假設web系統中Tomcat的線程不夠用了,大量請求在等待,而此時Web系統應用層面的優化已經不能再優化了,也就是沒法縮短業務邏輯的響應時間了,這個時候,若是想讓減小用戶的等待時間,提升吞吐量,能夠嘗試下使用異步Servlet。

舉一個實際的例子:好比作一個短信系統,短信系統對實時性要求很高,因此要求等待時間儘量短,而發送功能咱們其實是委託運營商去發送的,也就是說咱們要調用接口,假設併發量很高,那麼這個時候業務系統調用咱們的發送短信功能,就有可能把咱們的Tomcat線程池用完,剩下的請求就會在隊列中等待,那這個時候,短信的延時就上去了,爲了解決這個問題,咱們能夠引入異步Servlet,接受更多的短信發送請求,從而減小短信的延時。

總結

這篇文章我從手寫一個異步Servlet來開始,分析了異步Servlet的做用,以及Tomcat內部是如何實現異步Servlet的,而後我也根據互聯網上流行的SpringBoot異步編程來進行說明,其在Tomcat內部並非一個異步的Servlet。最後,我談到了異步Servlet的使用場景,分析了什麼狀況下能夠嘗試異步Servlet。

相關文章
相關標籤/搜索