經過我以前的Tomcat系列文章,相信看我博客的同窗對Tomcat應該有一個比較清晰的瞭解了,在前幾篇博客咱們討論了Tomcat在SpringBoot框架中是如何啓動的,討論了Tomcat的內部組件是如何設計以及請求是如何流轉的,那麼咱們這篇博客聊聊Tomcat的異步Servlet,Tomcat是如何實現異步Servlet的以及異步Servlet的使用場景。web
咱們直接藉助SpringBoot框架來實現一個Servlet,這裏只展現Servlet代碼:apache
@WebServlet(urlPatterns = "/async",asyncSupported = true)
@Slf4j
public class AsyncServlet extends HttpServlet {
ExecutorService executorService =Executors.newSingleThreadExecutor();
@Override
protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
//開啓異步,獲取異步上下文
final AsyncContext ctx = req.startAsync();
// 提交線程池異步執行
executorService.execute(new Runnable() {
@Override
public void run() {
try {
log.info("async Service 準備執行了");
//模擬耗時任務
Thread.sleep(10000L);
ctx.getResponse().getWriter().print("async servlet");
log.info("async Service 執行了");
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
//最後執行完成後完成回調。
ctx.complete();
}
});
}
複製代碼
上面的代碼實現了一個異步的Servlet,實現了doGet
方法注意在SpringBoot中使用須要再啓動類加上@ServletComponentScan
註解來掃描Servlet。既然代碼寫好了,咱們來看看實際運行效果。編程
咱們發送一個請求後,看到頁面有響應,同時,看到請求時間花費了10.05s,那麼咱們這個Servlet算是能正常運行啦。有同窗確定會問,這不是異步servlet嗎?你的響應時間並無加快,有什麼用呢?對,咱們的響應時間並不能加快,仍是會取決於咱們的業務邏輯,可是咱們的異步servlet請求後,依賴於業務的異步執行,咱們能夠當即返回,也就是說,Tomcat的線程能夠當即回收,默認狀況下,Tomcat的核心線程是10,最大線程數是200,咱們能及時回收線程,也就意味着咱們能處理更多的請求,可以增長咱們的吞吐量,這也是異步Servlet的主要做用。瀏覽器
瞭解完異步Servlet的做用後,咱們來看看,Tomcat是如何是先異步Servlet的。其實上面的代碼,主要核心邏輯就兩部分,final AsyncContext ctx = req.startAsync()
和ctx.complete()
那咱們來看看他們究竟作了什麼?tomcat
public AsyncContext startAsync(ServletRequest request,
ServletResponse response) {
if (!isAsyncSupported()) {
IllegalStateException ise =
new IllegalStateException(sm.getString("request.asyncNotSupported"));
log.warn(sm.getString("coyoteRequest.noAsync",
StringUtils.join(getNonAsyncClassNames())), ise);
throw ise;
}
if (asyncContext == null) {
asyncContext = new AsyncContextImpl(this);
}
asyncContext.setStarted(getContext(), request, response,
request==getRequest() && response==getResponse().getResponse());
asyncContext.setTimeout(getConnector().getAsyncTimeout());
return asyncContext;
}
複製代碼
咱們發現req.startAsync()
只是保存了一個異步上下文,同時設置一些基礎信息,好比Timeout
,順便提一下,這裏設置的默認超時時間是30S,若是你的異步處理邏輯超過30S,此時執行ctx.complete()
就會拋出IllegalStateException 異常。bash
咱們來看看ctx.complete()
的邏輯多線程
public void complete() {
if (log.isDebugEnabled()) {
logDebug("complete ");
}
check();
request.getCoyoteRequest().action(ActionCode.ASYNC_COMPLETE, null);
}
//類:AbstractProcessor
public final void action(ActionCode actionCode, Object param) {
case ASYNC_COMPLETE: {
clearDispatches();
if (asyncStateMachine.asyncComplete()) {
processSocketEvent(SocketEvent.OPEN_READ, true);
}
break;
}
}
//類:AbstractProcessor
protected void processSocketEvent(SocketEvent event, boolean dispatch) {
SocketWrapperBase<?> socketWrapper = getSocketWrapper();
if (socketWrapper != null) {
socketWrapper.processSocket(event, dispatch);
}
}
//類:AbstractEndpoint
public boolean processSocket(SocketWrapperBase<S> socketWrapper,
SocketEvent event, boolean dispatch) {
//省略部分代碼
SocketProcessorBase<S> sc = null;
if (processorCache != null) {
sc = processorCache.pop();
}
if (sc == null) {
sc = createSocketProcessor(socketWrapper, event);
} else {
sc.reset(socketWrapper, event);
}
Executor executor = getExecutor();
if (dispatch && executor != null) {
executor.execute(sc);
} else {
sc.run();
}
return true;
}
複製代碼
因此,這裏最終會調用AbstractEndpoint
的processSocket
方法,以前看過我前面博客的同窗應該有印象,EndPoint
是用來接受和處理請求的,接下來就會交給Processor
去進行協議處理。併發
類:AbstractProcessorLight
public SocketState process(SocketWrapperBase<?> socketWrapper, SocketEvent status)
throws IOException {
//省略部分diam
SocketState state = SocketState.CLOSED;
Iterator<DispatchType> dispatches = null;
do {
if (dispatches != null) {
DispatchType nextDispatch = dispatches.next();
state = dispatch(nextDispatch.getSocketStatus());
} else if (status == SocketEvent.DISCONNECT) {
} else if (isAsync() || isUpgrade() || state == SocketState.ASYNC_END) {
state = dispatch(status);
if (state == SocketState.OPEN) {
state = service(socketWrapper);
}
} else if (status == SocketEvent.OPEN_WRITE) {
state = SocketState.LONG;
} else if (status == SocketEvent.OPEN_READ){
state = service(socketWrapper);
} else {
state = SocketState.CLOSED;
}
} while (state == SocketState.ASYNC_END ||
dispatches != null && state != SocketState.CLOSED);
return state;
}
複製代碼
這部分是重點,AbstractProcessorLight
會根據SocketEvent
的狀態來判斷是否是要去調用service(socketWrapper)
,該方法最終會去調用到容器,從而完成業務邏輯的調用,咱們這個請求是執行完成後調用的,確定不能進容器了,否則就是死循環了,這裏經過isAsync()
判斷,就會進入dispatch(status)
,最終會調用CoyoteAdapter
的asyncDispatch
方法app
public boolean asyncDispatch(org.apache.coyote.Request req, org.apache.coyote.Response res,
SocketEvent status) throws Exception {
//省略部分代碼
Request request = (Request) req.getNote(ADAPTER_NOTES);
Response response = (Response) res.getNote(ADAPTER_NOTES);
boolean success = true;
AsyncContextImpl asyncConImpl = request.getAsyncContextInternal();
try {
if (!request.isAsync()) {
response.setSuspended(false);
}
if (status==SocketEvent.TIMEOUT) {
if (!asyncConImpl.timeout()) {
asyncConImpl.setErrorState(null, false);
}
} else if (status==SocketEvent.ERROR) {
}
if (!request.isAsyncDispatching() && request.isAsync()) {
WriteListener writeListener = res.getWriteListener();
ReadListener readListener = req.getReadListener();
if (writeListener != null && status == SocketEvent.OPEN_WRITE) {
ClassLoader oldCL = null;
try {
oldCL = request.getContext().bind(false, null);
res.onWritePossible();//這裏執行瀏覽器響應,寫入數據
if (request.isFinished() && req.sendAllDataReadEvent() &&
readListener != null) {
readListener.onAllDataRead();
}
} catch (Throwable t) {
} finally {
request.getContext().unbind(false, oldCL);
}
}
}
}
//這裏判斷異步正在進行,說明這不是一個完成方法的回調,是一個正常異步請求,繼續調用容器。
if (request.isAsyncDispatching()) {
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
Throwable t = (Throwable) request.getAttribute(RequestDispatcher.ERROR_EXCEPTION);
if (t != null) {
asyncConImpl.setErrorState(t, true);
}
}
//注意,這裏,若是超時或者出錯,request.isAsync()會返回false,這裏是爲了儘快的輸出錯誤給客戶端。
if (!request.isAsync()) {
//這裏也是輸出邏輯
request.finishRequest();
response.finishResponse();
}
//銷燬request和response
if (!success || !request.isAsync()) {
updateWrapperErrorCount(request, response);
request.recycle();
response.recycle();
}
}
return success;
}
複製代碼
上面的代碼就是ctx.complete()
執行最終的方法了(固然省略了不少細節),完成了數據的輸出,最終輸出到瀏覽器。框架
這裏有同窗可能會說,我知道異步執行完後,調用ctx.complete()
會輸出到瀏覽器,可是,第一次doGet請求執行完成後,Tomcat是怎麼知道不用返回到客戶端的呢?關鍵代碼在CoyoteAdapter
中的service
方法,部分代碼以下:
postParseSuccess = postParseRequest(req, request, res, response);
//省略部分代碼
if (postParseSuccess) {
request.setAsyncSupported(
connector.getService().getContainer().getPipeline().isAsyncSupported());
connector.getService().getContainer().getPipeline().getFirst().invoke(
request, response);
}
if (request.isAsync()) {
async = true;
} else {
//輸出數據到客戶端
request.finishRequest();
response.finishResponse();
if (!async) {
updateWrapperErrorCount(request, response);
//銷燬request和response
request.recycle();
response.recycle();
}
複製代碼
這部分代碼在調用完Servlet
後,會經過request.isAsync()
來判斷是不是異步請求,若是是異步請求,就設置async = true
。若是是非異步請求就執行輸出數據到客戶端邏輯,同時銷燬request
和response
。這裏就完成了請求結束後不響應客戶端的操做。
由於以前準備寫本篇文章的時候就查詢過不少資料,發現不少資料寫SpringBoot異步編程都是依賴於@EnableAsync
註解,而後在Controller
用多線程來完成業務邏輯,最後彙總結果,完成返回輸出。這裏拿一個掘金大佬的文章來舉例《新手也能看懂的 SpringBoot 異步編程指南》,這篇文章寫得很通俗易懂,很是不錯,從業務層面來講,確實是異步編程,可是有一個問題,拋開業務的並行處理來講,針對整個請求來講,並非異步的,也就是說不能當即釋放Tomcat的線程,從而不能達到異步Servlet的效果。這裏我參考上文也寫了一個demo,咱們來驗證下,爲何它不是異步的。
@RestController
@Slf4j
public class TestController {
@Autowired
private TestService service;
@GetMapping("/hello")
public String test() {
try {
log.info("testAsynch Start");
CompletableFuture<String> test1 = service.test1();
CompletableFuture<String> test2 = service.test2();
CompletableFuture<String> test3 = service.test3();
CompletableFuture.allOf(test1, test2, test3);
log.info("test1=====" + test1.get());
log.info("test2=====" + test2.get());
log.info("test3=====" + test3.get());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
return "hello";
}
@Service
public class TestService {
@Async("asyncExecutor")
public CompletableFuture<String> test1() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test1");
}
@Async("asyncExecutor")
public CompletableFuture<String> test2() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test2");
}
@Async("asyncExecutor")
public CompletableFuture<String> test3() throws InterruptedException {
Thread.sleep(3000L);
return CompletableFuture.completedFuture("test3");
}
}
@SpringBootApplication
@EnableAsync
public class TomcatdebugApplication {
public static void main(String[] args) {
SpringApplication.run(TomcatdebugApplication.class, args);
}
@Bean(name = "asyncExecutor")
public Executor asyncExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(3);
executor.setMaxPoolSize(3);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("AsynchThread-");
executor.initialize();
return executor;
}
複製代碼
這裏我運行下,看看效果
這裏我請求以後,在調用容器執行業務邏輯以前打了一個斷點,而後在返回以後的一樣打了一個斷點,在Controller
執行完以後,請求才回到了CoyoteAdapter
中,而且判斷request.isAsync()
,根據圖中看到,是爲false
,那麼接下來就會執行request.finishRequest()
和response.finishResponse()
來執行響應的結束,並銷燬請求和響應體。頗有趣的事情是,我實驗的時候發現,在執行request.isAsync()
以前,瀏覽器的頁面上已經出現了響應體,這是SpringBoot框架已經經過StringHttpMessageConverter
類中的writeInternal
方法已經進行輸出了。
以上分析的核心邏輯就是,Tomcat的線程執行CoyoteAdapter
調用容器後,必需要等到請求返回,而後再判斷是不是異步請求,再處理請求,而後執行完畢後,線程才能進行回收。而我一最開始的異步Servlet例子,執行完doGet方法後,就會當即返回,也就是會直接到request.isAsync()
的邏輯,而後整個線程的邏輯執行完畢,線程被回收。
分析了這麼多,那麼異步Servlet的使用場景有哪些呢?其實咱們只要抓住一點就能夠分析了,就是異步Servlet提升了系統的吞吐量,能夠接受更多的請求。假設web系統中Tomcat的線程不夠用了,大量請求在等待,而此時Web系統應用層面的優化已經不能再優化了,也就是沒法縮短業務邏輯的響應時間了,這個時候,若是想讓減小用戶的等待時間,提升吞吐量,能夠嘗試下使用異步Servlet。
舉一個實際的例子:好比作一個短信系統,短信系統對實時性要求很高,因此要求等待時間儘量短,而發送功能咱們其實是委託運營商去發送的,也就是說咱們要調用接口,假設併發量很高,那麼這個時候業務系統調用咱們的發送短信功能,就有可能把咱們的Tomcat線程池用完,剩下的請求就會在隊列中等待,那這個時候,短信的延時就上去了,爲了解決這個問題,咱們能夠引入異步Servlet,接受更多的短信發送請求,從而減小短信的延時。
這篇文章我從手寫一個異步Servlet來開始,分析了異步Servlet的做用,以及Tomcat內部是如何實現異步Servlet的,而後我也根據互聯網上流行的SpringBoot異步編程來進行說明,其在Tomcat內部並非一個異步的Servlet。最後,我談到了異步Servlet的使用場景,分析了什麼狀況下能夠嘗試異步Servlet。