異步化,你的高併發大殺器

今天來聊聊如何讓項目異步化的一些事。java

1.同步和異步,阻塞和非阻塞

同步和異步,阻塞和非阻塞, 這個幾個詞已是老生常談,可是經常仍是有不少同窗分不清楚,覺得同步確定就是阻塞,異步確定就是非阻塞,其實他們不是一回事。git

同步和異步關注的是結果消息的通訊機制程序員

  • 同步:同步的意思就是調用方須要主動等待結果的返回
  • 異步:異步的意思就是不須要主動等待結果的返回,而是經過其餘手段好比,狀態通知,回調函數等。

阻塞和非阻塞主要關注的是等待結果返回調用方的狀態github

  • 阻塞:是指結果返回以前,當前線程被掛起,不作任何事
  • 非阻塞:是指結果在返回以前,線程能夠作一些其餘事,不會被掛起。

能夠看見同步和異步,阻塞和非阻塞主要關注的點不一樣,有人會問同步還能非阻塞,異步還能阻塞?固然是能夠的,下面爲了更好的說明他們的組合之間的意思,用幾個簡單的例子說明: 1.同步阻塞:同步阻塞基本也是編程中最多見的模型,打個比方你去商店買衣服,你去了以後發現衣服賣完了,那你就在店裏面一直等,期間不作任何事(包括看手機),等着商家進貨,直到有貨爲止,這個效率很低。web

2.同步非阻塞:同步非阻塞在編程中能夠抽象爲一個輪詢模式,你去了商店以後,發現衣服賣完了,這個時候不須要傻傻的等着,你能夠去其餘地方好比奶茶店,買杯水,可是你仍是須要時不時的去商店問老闆新衣服到了嗎。算法

3.異步阻塞:異步阻塞這個編程裏面用的較少,有點相似你寫了個線程池,submit而後立刻future.get(),這樣線程其實仍是掛起的。有點像你去商店買衣服,這個時候發現衣服沒有了,這個時候你就給老闆留給電話,說衣服到了就給我打電話,而後你就守着這個電話,一直等着他響什麼事也不作。這樣感受的確有點傻,因此這個模式用得比較少。spring

4.異步非阻塞:異步非阻塞這也是如今高併發編程的一個核心,也是今天主要講的一個核心。比如你去商店買衣服,衣服沒了,你只須要給老闆說這是個人電話,衣服到了就打。而後你就爲所欲爲的去玩,也不用操心衣服何時到,衣服一到,電話一響就能夠去買衣服了。數據庫

2.同步阻塞 PK 異步非阻塞

上面已經看到了同步阻塞的效率是多麼的低,若是使用同步阻塞的方式去買衣服,你有可能一天只能買一件衣服,其餘什麼事都不能幹,若是用異步非阻塞的方式去買,買衣服只是你一天中進行的一個小事。apache

咱們把這個映射到咱們代碼中,當咱們的線程發生一次rpc調用或者http調用,又或者其餘的一些耗時的IO調用,發起以後,若是是同步阻塞,咱們的這個線程就會被阻塞掛起,直到結果返回,試想一下若是IO調用很頻繁那咱們的CPU使用率實際上是很低很低。正所謂是物盡其用,既然CPU的使用率被IO調用搞得很低,那咱們就可使用異步非阻塞,當發生IO調用時我並不立刻關心結果,我只須要把回調函數寫入此次IO調用,我這個時候線程能夠繼續處理新的請求,當IO調用結束結束時,會調用回調函數。而咱們的線程始終處於忙碌之中,這樣就能作更多的有意義的事了。編程

這裏首先要說明的是,異步化不是萬能,異步化並不能縮短你整個鏈路調用時間長的問題,可是他能極大的提高你的最大qps。通常咱們的業務中有兩處比較耗時:

  • cpu: cpu耗時指的是咱們的通常的業務處理邏輯,好比一些數據的運算,對象的序列化。這些異步化是不能解決的,得須要靠一些算法的優化,或者一些高性能框架。
  • iowait: io耗時就像咱們上面說的,通常發生在網絡調用,文件傳輸中等等,這個時候線程通常會掛起阻塞。而咱們的異步化一般用於解決這部分的問題。

3.哪些能夠異步化?

上面說了異步化是用於解決IO阻塞的問題,而咱們通常項目中可使用異步化以下:

  • servlet異步化,springmvc異步化
  • rpc調用如(dubbo,thrift),http調用異步化
  • 數據庫調用,緩存調用異步化

下面我會從上面幾個方面進行異步化的介紹.

4.servlet異步化

對於Java開發程序員來講servlet並不陌生吧,在項目中不論你使用struts2,仍是使用的springmvc,本質上都是封裝的servlet。可是咱們的通常的開發,其實都是使用的同步阻塞模式以下: 這裏寫圖片描述

上面的模式優勢在於編碼簡單,適合在項目啓動初期,訪問量較少,或者是CPU運算較多的項目

缺點在於,業務邏輯線程和servlet容器線程是同一個,通常的業務邏輯總得發生點IO,好比查詢數據庫,好比產生RPC調用,這個時候就會發生阻塞,而咱們的servlet容器線程確定是有限的,當servlet容器線程都被阻塞的時候咱們的服務這個時候就會發生拒絕訪問,線程否則我固然們能夠經過增長機器的一系列手段來解決這個問題,可是俗話說得好靠人不如靠本身,靠別人替我分擔請求,還不如我本身搞定。因此在servlet3.0以後支持了異步化,咱們採用異步化以後就會變成以下: 這裏寫圖片描述

在這裏咱們採用新的線程處理業務邏輯,IO調用的阻塞就不會影響咱們的serlvet了,實現異步serlvet的代碼也比較簡單,以下:

@WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true)
public class WorkServlet extends HttpServlet{
    private static final long serialVersionUID = 1L;
    @Override
    protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        this.doPost(req, resp);
    }

    @Override
    protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException {
        //設置ContentType,關閉緩存
        resp.setContentType("text/plain;charset=UTF-8");
        resp.setHeader("Cache-Control","private");
        resp.setHeader("Pragma","no-cache");
        final PrintWriter writer= resp.getWriter();
        writer.println("老師檢查做業了");
        writer.flush();
        List<String> zuoyes=new ArrayList<String>();
        for (int i = 0; i < 10; i++) {
            zuoyes.add("zuoye"+i);;
        }
        //開啓異步請求
        final AsyncContext ac=req.startAsync();
        doZuoye(ac, zuoyes);
        writer.println("老師佈置做業");
        writer.flush();
    }

    private void doZuoye(final AsyncContext ac, final List<String> zuoyes) {
        ac.setTimeout(1*60*60*1000L);
        ac.start(new Runnable() {
            @Override
            public void run() {
                //經過response得到字符輸出流
                try {
                    PrintWriter writer=ac.getResponse().getWriter();
                    for (String zuoye:zuoyes) {
                        writer.println("\""+zuoye+"\"請求處理中");
                        Thread.sleep(1*1000L);
                        writer.flush();
                    }
                    ac.complete();
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
    }
}

實現serlvet的關鍵在於http採起了長鏈接,也就是當請求打過來的時候就算有返回也不會關閉,由於可能還會有數據,直到返回關閉指令。 AsyncContext ac=req.startAsync(); 用於獲取異步上下文,後續咱們經過這個異步上下文進行回調返回數據,有點像咱們買衣服的時候,給老闆一個電話,而這個上下文也是一個電話,當有衣服到的時候,也就是當有數據準備好的時候就能夠打電話發送數據了。 ac.complete(); 用來進行長連接的關閉。

4.1springmvc異步化

如今其實不多人來進行serlvet編程,都是直接採用現成的一些框架,好比struts2,springmvc。下面介紹下使用springmvc如何進行異步化:

  • 首先確認你的項目中的Servlet是3.0以上的!!,其次springMVC4.0+
<dependency>
      <groupId>javax.servlet</groupId>
      <artifactId>javax.servlet-api</artifactId>
      <version>3.1.0</version>
      <scope>provided</scope>
    </dependency>
    <dependency>
      <groupId>org.springframework</groupId>
      <artifactId>spring-webmvc</artifactId>
      <version>4.2.3.RELEASE</version>
    </dependency>
  • web.xml頭部聲明,必需要3.0,filter和serverlet設置爲異步
<?xml version="1.0" encoding="UTF-8"?>
<web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee 
    http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd">
    <filter>
        <filter-name>testFilter</filter-name>
        <filter-class>com.TestFilter</filter-class>
        <async-supported>true</async-supported>
    </filter>
   
    <servlet>
        <servlet-name>mvc-dispatcher</servlet-name>
        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>
        .........
        <async-supported>true</async-supported>
    </servlet>
  • 使用springmvc封裝了servlet的AsyncContext,使用起來比較簡單。之前咱們同步的模式的Controller是返回額ModelAndView,而異步模式直接生成一個defrredResult(支持咱們超時擴展)便可保存上下文,下面給出如何和咱們HttpClient搭配的簡單demo
@RequestMapping(value="/asynctask", method = RequestMethod.GET)
    public DeferredResult<String> asyncTask() throws IOReactorException {
        IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build();
        ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
        PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor);
        conManager.setMaxTotal(100);
        conManager.setDefaultMaxPerRoute(100);
        CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build();
        // Start the client
        httpclient.start();
        //設置超時時間200ms
        final DeferredResult<String> deferredResult = new DeferredResult<String>(200L);
        deferredResult.onTimeout(new Runnable() {
            @Override
            public void run() {
                System.out.println("異步調用執行超時!thread id is : " + Thread.currentThread().getId());
                deferredResult.setResult("超時了");
            }
        });
        System.out.println("/asynctask 調用!thread id is : " + Thread.currentThread().getId());
        final HttpGet request2 = new HttpGet("http://www.apache.org/");
        httpclient.execute(request2, new FutureCallback<HttpResponse>() {
​
            public void completed(final HttpResponse response2) {
                System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine());
                deferredResult.setResult(request2.getRequestLine() + "->" + response2.getStatusLine());
            }
​
            public void failed(final Exception ex) {
                System.out.println(request2.getRequestLine() + "->" + ex);
            }
​
            public void cancelled() {
                System.out.println(request2.getRequestLine() + " cancelled");
            }
​
        });
        return deferredResult;
    }
​

注意: 在serlvet異步化中有個問題是filter的後置結果處理,無法使用,對於咱們一些打點,結果統計直接使用serlvet異步是無法用的。在springmvc中就很好的解決了這個問題,springmvc採用了一個比較取巧的方式經過請求轉發,能讓請求再次過濾器。可是又引入了新的一個問題那就是過濾器會處理兩次,這裏能夠經過SpringMVC源碼中自身判斷的方法,咱們能夠在filter中使用下面這句話來進行判斷是否是屬於springmvc轉發過來的請求,從而不處理filter的前置事件,只處理後置事件:

Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE);
return asyncManagerAttr instanceof WebAsyncManager ;

5.全鏈路異步化

上面咱們介紹了serlvet的異步化,相信細心的同窗都看出來彷佛並無解決根本的問題,個人IO阻塞依然存在,只是換了個位置而已,當IO調用頻繁一樣會讓業務線程池快速變滿,雖然serlvet容器線程不被阻塞,可是這個業務依然會變得不可用。

那麼怎麼才能解決上面的問題呢?答案就是全鏈路異步化,全鏈路異步追求的是沒有阻塞,打滿你的CPU,把機器的性能壓榨到極致模型圖以下:

具體的NIO client到底作了什麼事呢,具體以下面模型:

上面就是咱們全鏈路異步的圖了(部分線程池能夠優化)。全鏈路的核心在於只要咱們遇到IO調用的時候,咱們就可使用NIO,從而避免阻塞,也就解決了以前說的業務線程池被打滿獲得尷尬場景。

5.1遠程調用異步化

咱們通常遠程調用使用rpc或者http。對於rpc來講通常thrift,http,motan等支持都異步調用,其內部原理也都是採用事件驅動的NIO模型,對於http來講通常的apachehttpclient和okhttp也都提供了異步調用。 下面簡單介紹下Http異步化調用是怎麼作的: 首先來看一個例子:

public class HTTPAsyncClientDemo {
    public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException {
      //具體參數含義下文會講
       //apache提供了ioReactor的參數配置,這裏咱們配置IO 線程爲1
        IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build();
      //根據這個配置建立一個ioReactor
        ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig);
      //asyncHttpClient使用PoolingNHttpClientConnectionManager管理咱們客戶端鏈接
        PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor);
      //設置總共的鏈接的最大數量
        conManager.setMaxTotal(100);
      //設置每一個路由的鏈接的最大數量
        conManager.setDefaultMaxPerRoute(100);
      //建立一個Client
        CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build();
        // Start the client
        httpclient.start();

        // Execute request
        final HttpGet request1 = new HttpGet("http://www.apache.org/");
        Future<HttpResponse> future = httpclient.execute(request1, null);
        // and wait until a response is received
        HttpResponse response1 = future.get();
        System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine());

        // One most likely would want to use a callback for operation result
        final HttpGet request2 = new HttpGet("http://www.apache.org/");
        httpclient.execute(request2, new FutureCallback<HttpResponse>() {
						//Complete成功後會回調這個方法
            public void completed(final HttpResponse response2) {
                System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine());
            }

            public void failed(final Exception ex) {
                System.out.println(request2.getRequestLine() + "->" + ex);
            }

            public void cancelled() {
                System.out.println(request2.getRequestLine() + " cancelled");
            }

        });
    }
}

下面給出httpAsync的整個類圖:

對於咱們的HTTPAysncClient 其實最後使用的是InternalHttpAsyncClient,在InternalHttpAsyncClient中有個ConnectionManager,這個就是咱們管理鏈接的管理器,而在httpAsync中只有一個實現那就是PoolingNHttpClientConnectionManager,這個鏈接管理器中有兩個咱們比較關心的一個是Reactor,一個是Cpool。

  • Reactor :全部的Reactor這裏都是實現了IOReactor接口。在PoolingNHttpClientConnectionManager中會有擁有一個Reactor,那就是DefaultConnectingIOReactor,這個DefaultConnectingIOReactor,負責處理Acceptor。在DefaultConnectingIOReactor有個excutor方法,生成IOReactor也就是咱們圖中的BaseIOReactor,進行IO的操做。這個模型就是咱們上面的1.2.2的模型

  • CPool :在PoolingNHttpClientConnectionManager中有個CPool,主要是負責控制咱們鏈接,咱們上面所說的maxTotal和defaultMaxPerRoute,都是由其進行控制,若是每一個路由的滿了他會斷開最老的一個連接,若是總共的total滿了他會放入leased隊列,釋放空間的時候就會將其從新鏈接。

5.2數據庫調用異步化

對於數據庫調用通常的框架並無提供異步化的方法,這裏推薦本身封裝或者使用網上開源的,這裏咱們公司有個開源的 https://github.com/ainilife/zebra-dao/blob/master/README_ZH.md 能很好的支持異步化

6.最後

異步化並非高併發的銀彈,可是有了異步化的確能提升你機器的qps,吞吐量等等。上述講的一些模型若是能合理的作一些優化,而後進行應用,相信能對你的服務有很大的幫助的。

想要獲取更多信息請關注個人公衆號吧

最後這篇文章被我收錄於JGrowing,一個全面,優秀,由社區一塊兒共建的Java學習路線,若是您想參與開源項目的維護,能夠一塊兒共建,github地址爲:https://github.com/javagrowing/JGrowing 麻煩給個小星星喲。

相關文章
相關標籤/搜索