今天來聊聊如何讓項目異步化的一些事。java
同步和異步,阻塞和非阻塞, 這個幾個詞已是老生常談,可是經常仍是有不少同窗分不清楚,覺得同步確定就是阻塞,異步確定就是非阻塞,其實他們不是一回事。git
同步和異步關注的是結果消息的通訊機制程序員
阻塞和非阻塞主要關注的是等待結果返回調用方的狀態github
能夠看見同步和異步,阻塞和非阻塞主要關注的點不一樣,有人會問同步還能非阻塞,異步還能阻塞?固然是能夠的,下面爲了更好的說明他們的組合之間的意思,用幾個簡單的例子說明: 1.同步阻塞:同步阻塞基本也是編程中最多見的模型,打個比方你去商店買衣服,你去了以後發現衣服賣完了,那你就在店裏面一直等,期間不作任何事(包括看手機),等着商家進貨,直到有貨爲止,這個效率很低。web
2.同步非阻塞:同步非阻塞在編程中能夠抽象爲一個輪詢模式,你去了商店以後,發現衣服賣完了,這個時候不須要傻傻的等着,你能夠去其餘地方好比奶茶店,買杯水,可是你仍是須要時不時的去商店問老闆新衣服到了嗎。算法
3.異步阻塞:異步阻塞這個編程裏面用的較少,有點相似你寫了個線程池,submit而後立刻future.get(),這樣線程其實仍是掛起的。有點像你去商店買衣服,這個時候發現衣服沒有了,這個時候你就給老闆留給電話,說衣服到了就給我打電話,而後你就守着這個電話,一直等着他響什麼事也不作。這樣感受的確有點傻,因此這個模式用得比較少。spring
4.異步非阻塞:異步非阻塞這也是如今高併發編程的一個核心,也是今天主要講的一個核心。比如你去商店買衣服,衣服沒了,你只須要給老闆說這是個人電話,衣服到了就打。而後你就爲所欲爲的去玩,也不用操心衣服何時到,衣服一到,電話一響就能夠去買衣服了。數據庫
上面已經看到了同步阻塞的效率是多麼的低,若是使用同步阻塞的方式去買衣服,你有可能一天只能買一件衣服,其餘什麼事都不能幹,若是用異步非阻塞的方式去買,買衣服只是你一天中進行的一個小事。apache
咱們把這個映射到咱們代碼中,當咱們的線程發生一次rpc調用或者http調用,又或者其餘的一些耗時的IO調用,發起以後,若是是同步阻塞,咱們的這個線程就會被阻塞掛起,直到結果返回,試想一下若是IO調用很頻繁那咱們的CPU使用率實際上是很低很低。正所謂是物盡其用,既然CPU的使用率被IO調用搞得很低,那咱們就可使用異步非阻塞,當發生IO調用時我並不立刻關心結果,我只須要把回調函數寫入此次IO調用,我這個時候線程能夠繼續處理新的請求,當IO調用結束結束時,會調用回調函數。而咱們的線程始終處於忙碌之中,這樣就能作更多的有意義的事了。編程
這裏首先要說明的是,異步化不是萬能,異步化並不能縮短你整個鏈路調用時間長的問題,可是他能極大的提高你的最大qps。通常咱們的業務中有兩處比較耗時:
上面說了異步化是用於解決IO阻塞的問題,而咱們通常項目中可使用異步化以下:
下面我會從上面幾個方面進行異步化的介紹.
對於Java開發程序員來講servlet並不陌生吧,在項目中不論你使用struts2,仍是使用的springmvc,本質上都是封裝的servlet。可是咱們的通常的開發,其實都是使用的同步阻塞模式以下:
上面的模式優勢在於編碼簡單,適合在項目啓動初期,訪問量較少,或者是CPU運算較多的項目
缺點在於,業務邏輯線程和servlet容器線程是同一個,通常的業務邏輯總得發生點IO,好比查詢數據庫,好比產生RPC調用,這個時候就會發生阻塞,而咱們的servlet容器線程確定是有限的,當servlet容器線程都被阻塞的時候咱們的服務這個時候就會發生拒絕訪問,線程否則我固然們能夠經過增長機器的一系列手段來解決這個問題,可是俗話說得好靠人不如靠本身,靠別人替我分擔請求,還不如我本身搞定。因此在servlet3.0以後支持了異步化,咱們採用異步化以後就會變成以下:
在這裏咱們採用新的線程處理業務邏輯,IO調用的阻塞就不會影響咱們的serlvet了,實現異步serlvet的代碼也比較簡單,以下:
@WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true) public class WorkServlet extends HttpServlet{ private static final long serialVersionUID = 1L; @Override protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { this.doPost(req, resp); } @Override protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { //設置ContentType,關閉緩存 resp.setContentType("text/plain;charset=UTF-8"); resp.setHeader("Cache-Control","private"); resp.setHeader("Pragma","no-cache"); final PrintWriter writer= resp.getWriter(); writer.println("老師檢查做業了"); writer.flush(); List<String> zuoyes=new ArrayList<String>(); for (int i = 0; i < 10; i++) { zuoyes.add("zuoye"+i);; } //開啓異步請求 final AsyncContext ac=req.startAsync(); doZuoye(ac, zuoyes); writer.println("老師佈置做業"); writer.flush(); } private void doZuoye(final AsyncContext ac, final List<String> zuoyes) { ac.setTimeout(1*60*60*1000L); ac.start(new Runnable() { @Override public void run() { //經過response得到字符輸出流 try { PrintWriter writer=ac.getResponse().getWriter(); for (String zuoye:zuoyes) { writer.println("\""+zuoye+"\"請求處理中"); Thread.sleep(1*1000L); writer.flush(); } ac.complete(); } catch (Exception e) { e.printStackTrace(); } } }); } }
實現serlvet的關鍵在於http採起了長鏈接,也就是當請求打過來的時候就算有返回也不會關閉,由於可能還會有數據,直到返回關閉指令。 AsyncContext ac=req.startAsync(); 用於獲取異步上下文,後續咱們經過這個異步上下文進行回調返回數據,有點像咱們買衣服的時候,給老闆一個電話,而這個上下文也是一個電話,當有衣服到的時候,也就是當有數據準備好的時候就能夠打電話發送數據了。 ac.complete(); 用來進行長連接的關閉。
如今其實不多人來進行serlvet編程,都是直接採用現成的一些框架,好比struts2,springmvc。下面介紹下使用springmvc如何進行異步化:
<dependency> <groupId>javax.servlet</groupId> <artifactId>javax.servlet-api</artifactId> <version>3.1.0</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.springframework</groupId> <artifactId>spring-webmvc</artifactId> <version>4.2.3.RELEASE</version> </dependency>
<?xml version="1.0" encoding="UTF-8"?> <web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://java.sun.com/xml/ns/javaee http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"> <filter> <filter-name>testFilter</filter-name> <filter-class>com.TestFilter</filter-class> <async-supported>true</async-supported> </filter> <servlet> <servlet-name>mvc-dispatcher</servlet-name> <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> ......... <async-supported>true</async-supported> </servlet>
@RequestMapping(value="/asynctask", method = RequestMethod.GET) public DeferredResult<String> asyncTask() throws IOReactorException { IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); conManager.setMaxTotal(100); conManager.setDefaultMaxPerRoute(100); CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); // Start the client httpclient.start(); //設置超時時間200ms final DeferredResult<String> deferredResult = new DeferredResult<String>(200L); deferredResult.onTimeout(new Runnable() { @Override public void run() { System.out.println("異步調用執行超時!thread id is : " + Thread.currentThread().getId()); deferredResult.setResult("超時了"); } }); System.out.println("/asynctask 調用!thread id is : " + Thread.currentThread().getId()); final HttpGet request2 = new HttpGet("http://www.apache.org/"); httpclient.execute(request2, new FutureCallback<HttpResponse>() { public void completed(final HttpResponse response2) { System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); deferredResult.setResult(request2.getRequestLine() + "->" + response2.getStatusLine()); } public void failed(final Exception ex) { System.out.println(request2.getRequestLine() + "->" + ex); } public void cancelled() { System.out.println(request2.getRequestLine() + " cancelled"); } }); return deferredResult; }
注意: 在serlvet異步化中有個問題是filter的後置結果處理,無法使用,對於咱們一些打點,結果統計直接使用serlvet異步是無法用的。在springmvc中就很好的解決了這個問題,springmvc採用了一個比較取巧的方式經過請求轉發,能讓請求再次過濾器。可是又引入了新的一個問題那就是過濾器會處理兩次,這裏能夠經過SpringMVC源碼中自身判斷的方法,咱們能夠在filter中使用下面這句話來進行判斷是否是屬於springmvc轉發過來的請求,從而不處理filter的前置事件,只處理後置事件:
Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE); return asyncManagerAttr instanceof WebAsyncManager ;
上面咱們介紹了serlvet的異步化,相信細心的同窗都看出來彷佛並無解決根本的問題,個人IO阻塞依然存在,只是換了個位置而已,當IO調用頻繁一樣會讓業務線程池快速變滿,雖然serlvet容器線程不被阻塞,可是這個業務依然會變得不可用。
那麼怎麼才能解決上面的問題呢?答案就是全鏈路異步化,全鏈路異步追求的是沒有阻塞,打滿你的CPU,把機器的性能壓榨到極致模型圖以下:
具體的NIO client到底作了什麼事呢,具體以下面模型:
上面就是咱們全鏈路異步的圖了(部分線程池能夠優化)。全鏈路的核心在於只要咱們遇到IO調用的時候,咱們就可使用NIO,從而避免阻塞,也就解決了以前說的業務線程池被打滿獲得尷尬場景。
咱們通常遠程調用使用rpc或者http。對於rpc來講通常thrift,http,motan等支持都異步調用,其內部原理也都是採用事件驅動的NIO模型,對於http來講通常的apachehttpclient和okhttp也都提供了異步調用。 下面簡單介紹下Http異步化調用是怎麼作的: 首先來看一個例子:
public class HTTPAsyncClientDemo { public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException { //具體參數含義下文會講 //apache提供了ioReactor的參數配置,這裏咱們配置IO 線程爲1 IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); //根據這個配置建立一個ioReactor ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); //asyncHttpClient使用PoolingNHttpClientConnectionManager管理咱們客戶端鏈接 PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); //設置總共的鏈接的最大數量 conManager.setMaxTotal(100); //設置每一個路由的鏈接的最大數量 conManager.setDefaultMaxPerRoute(100); //建立一個Client CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); // Start the client httpclient.start(); // Execute request final HttpGet request1 = new HttpGet("http://www.apache.org/"); Future<HttpResponse> future = httpclient.execute(request1, null); // and wait until a response is received HttpResponse response1 = future.get(); System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine()); // One most likely would want to use a callback for operation result final HttpGet request2 = new HttpGet("http://www.apache.org/"); httpclient.execute(request2, new FutureCallback<HttpResponse>() { //Complete成功後會回調這個方法 public void completed(final HttpResponse response2) { System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); } public void failed(final Exception ex) { System.out.println(request2.getRequestLine() + "->" + ex); } public void cancelled() { System.out.println(request2.getRequestLine() + " cancelled"); } }); } }
下面給出httpAsync的整個類圖:
對於咱們的HTTPAysncClient 其實最後使用的是InternalHttpAsyncClient,在InternalHttpAsyncClient中有個ConnectionManager,這個就是咱們管理鏈接的管理器,而在httpAsync中只有一個實現那就是PoolingNHttpClientConnectionManager,這個鏈接管理器中有兩個咱們比較關心的一個是Reactor,一個是Cpool。
Reactor :全部的Reactor這裏都是實現了IOReactor接口。在PoolingNHttpClientConnectionManager中會有擁有一個Reactor,那就是DefaultConnectingIOReactor,這個DefaultConnectingIOReactor,負責處理Acceptor。在DefaultConnectingIOReactor有個excutor方法,生成IOReactor也就是咱們圖中的BaseIOReactor,進行IO的操做。這個模型就是咱們上面的1.2.2的模型
CPool :在PoolingNHttpClientConnectionManager中有個CPool,主要是負責控制咱們鏈接,咱們上面所說的maxTotal和defaultMaxPerRoute,都是由其進行控制,若是每一個路由的滿了他會斷開最老的一個連接,若是總共的total滿了他會放入leased隊列,釋放空間的時候就會將其從新鏈接。
對於數據庫調用通常的框架並無提供異步化的方法,這裏推薦本身封裝或者使用網上開源的,這裏咱們公司有個開源的 https://github.com/ainilife/zebra-dao/blob/master/README_ZH.md 能很好的支持異步化
異步化並非高併發的銀彈,可是有了異步化的確能提升你機器的qps,吞吐量等等。上述講的一些模型若是能合理的作一些優化,而後進行應用,相信能對你的服務有很大的幫助的。
最後這篇文章被我收錄於JGrowing,一個全面,優秀,由社區一塊兒共建的Java學習路線,若是您想參與開源項目的維護,能夠一塊兒共建,github地址爲:https://github.com/javagrowing/JGrowing 麻煩給個小星星喲。