高併發的「大殺器」:異步化、並行化

高併發的大殺器:異步化java

同步和異步,阻塞和非阻塞程序員

同步和異步,阻塞和非阻塞,這幾個詞已是老生常談,可是仍是有不少同窗分不清楚,覺得同步確定就是阻塞,異步確定就是非阻塞,其實他們並非一回事。web

同步和異步關注的是結果消息的通訊機制:算法

  • 同步:調用方須要主動等待結果的返回。
  • 異步:不須要主動等待結果的返回,而是經過其餘手段,好比狀態通知,回調函數等。

阻塞和非阻塞主要關注的是等待結果返回調用方的狀態:spring

  • 阻塞:是指結果返回以前,當前線程被掛起,不作任何事。
  • 非阻塞:是指結果在返回以前,線程能夠作一些其餘事,不會被掛起。

能夠看見同步和異步,阻塞和非阻塞主要關注的點不一樣,有人會問同步還能非阻塞,異步還能阻塞?sql

固然是能夠的,下面爲了更好的說明它們的組合之間的意思,用幾個簡單的例子說明:數據庫

同步阻塞:同步阻塞基本也是編程中最多見的模型,打個比方你去商店買衣服,你去了以後發現衣服賣完了,那你就在店裏面一直等,期間不作任何事(包括看手機),等着商家進貨,直到有貨爲止,這個效率很低。apache

  • 同步非阻塞:同步非阻塞在編程中能夠抽象爲一個輪詢模式,你去了商店以後,發現衣服賣完了。

這個時候不須要傻傻的等着,你能夠去其餘地方好比奶茶店,買杯水,可是你仍是須要時不時的去商店問老闆新衣服到了嗎。編程

  • 異步阻塞:異步阻塞這個編程裏面用的較少,有點相似你寫了個線程池,submit 而後立刻 future.get(),這樣線程其實仍是掛起的。

有點像你去商店買衣服,這個時候發現衣服沒有了,這個時候你就給老闆留個電話,說衣服到了就給我打電話,而後你就守着這個電話,一直等着它響什麼事也不作。這樣感受的確有點傻,因此這個模式用得比較少。api

  • 異步非阻塞:這也是如今高併發編程的一個核心,也是今天主要講的一個核心。

比如你去商店買衣服,衣服沒了,你只須要給老闆說這是個人電話,衣服到了就打。而後你就爲所欲爲的去玩,也不用操心衣服何時到,衣服一到,電話一響就能夠去買衣服了。

同步阻塞 PK 異步非阻塞

上面已經看到了同步阻塞的效率是多麼的低,若是使用同步阻塞的方式去買衣服,你有可能一天只能買一件衣服,其餘什麼事都不能幹;若是用異步非阻塞的方式去買,買衣服只是你一天中進行的一個小事。

咱們把這個映射到咱們代碼中,當咱們的線程發生一次 RPC 調用或者 HTTP 調用,又或者其餘的一些耗時的 IO 調用。

發起以後,若是是同步阻塞,咱們的這個線程就會被阻塞掛起,直到結果返回,試想一下,若是 IO 調用很頻繁那咱們的 CPU 使用率會很低很低。

正所謂是物盡其用,既然 CPU 的使用率被 IO 調用搞得很低,那咱們就可使用異步非阻塞。

當發生 IO 調用時我並不立刻關心結果,我只須要把回調函數寫入此次 IO 調用,這個時候線程能夠繼續處理新的請求,當 IO 調用結束時,會調用回調函數。

而咱們的線程始終處於忙碌之中,這樣就能作更多的有意義的事了。這裏首先要說明的是,異步化不是萬能,異步化並不能縮短你整個鏈路調用時間長的問題,可是它能極大的提高你的最大 QPS。

通常咱們的業務中有兩處比較耗時:

  • CPU:CPU 耗時指的是咱們的通常的業務處理邏輯,好比一些數據的運算,對象的序列化。這些異步化是不能解決的,得須要靠一些算法的優化,或者一些高性能框架。
  • IO Wait:IO 耗時就像咱們上面說的,通常發生在網絡調用,文件傳輸中等等,這個時候線程通常會掛起阻塞。而咱們的異步化一般用於解決這部分的問題。

哪些能夠異步化

上面說了異步化是用於解決 IO 阻塞的問題,而咱們通常項目中可使用異步化的狀況以下:

  • Servlet 異步化
  • Spring MVC 異步化
  • RPC 調用如(Dubbo,Thrift),HTTP 調用異步化
  • 數據庫調用,緩存調用異步化

下面我會從上面幾個方面進行異步化的介紹。

Servlet 異步化

對於 Java 開發程序員來講 Servlet 並不陌生,在項目中不論你使用 Struts2,仍是使用的 Spring MVC,本質上都是封裝的 Servlet。

可是咱們通常的開發都是使用的同步阻塞,模式以下:

 

 

 

 

 

上面的模式優勢在於編碼簡單,適合在項目啓動初期,訪問量較少,或者是 CPU 運算較多的項目。

缺點在於,業務邏輯線程和 Servlet 容器線程是同一個,通常的業務邏輯總得發生點 IO,好比查詢數據庫,好比產生 RPC 調用,這個時候就會發生阻塞。

而咱們的 Servlet 容器線程確定是有限的,當 Servlet 容器線程都被阻塞的時候咱們的服務這個時候就會發生拒絕訪問,線程不夠我固然能夠經過增長機器的一系列手段來解決這個問題。

可是俗話說得好靠人不如靠本身,靠別人替我分擔請求,還不如我本身搞定。

因此在 Servlet 3.0 以後支持了異步化,咱們採用異步化以後,模式變成以下:

 

 

 

 

 

在這裏咱們採用新的線程處理業務邏輯,IO 調用的阻塞就不會影響咱們的 Serlvet 了,實現異步 Serlvet 的代碼也比較簡單,以下:

@WebServlet(name = "WorkServlet",urlPatterns = "/work",asyncSupported =true) 
public class WorkServlet extends HttpServlet{ 
   private static final long serialVersionUID = 1L; 
   @Override 
   protected void doGet(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { 
       this.doPost(req, resp); 
   } 
 
   @Override 
   protected void doPost(HttpServletRequest req, HttpServletResponse resp) throws ServletException, IOException { 
       //設置ContentType,關閉緩存 
       resp.setContentType("text/plain;charset=UTF-8"); 
       resp.setHeader("Cache-Control","private"); 
       resp.setHeader("Pragma","no-cache"); 
       final PrintWriter writer= resp.getWriter(); 
       writer.println("老師檢查做業了"); 
       writer.flush(); 
       List<String> zuoyes=new ArrayList<String>(); 
       for (int i = 0; i < 10; i++) { 
           zuoyes.add("zuoye"+i);; 
       } 
       //開啓異步請求 
       final AsyncContext ac=req.startAsync(); 
       doZuoye(ac, zuoyes); 
       writer.println("老師佈置做業"); 
       writer.flush(); 
   } 
 
   private void doZuoye(final AsyncContext ac, final List<String> zuoyes) { 
       ac.setTimeout(1*60*60*1000L); 
       ac.start(new Runnable() { 
           @Override 
           public void run() { 
               //經過response得到字符輸出流 
               try { 
                   PrintWriter writer=ac.getResponse().getWriter(); 
                   for (String zuoye:zuoyes) { 
                       writer.println("\""+zuoye+"\"請求處理中"); 
                       Thread.sleep(1*1000L); 
                       writer.flush(); 
                   } 
                   ac.complete(); 
               } catch (Exception e) { 
                   e.printStackTrace(); 
               } 
           } 
       }); 
   } 
} 

實現 Serlvet 的關鍵在於 HTTP 採起了長鏈接,也就是當請求打過來的時候就算有返回也不會關閉,由於可能還會有數據,直到返回關閉指令。

AsyncContext ac=req.startAsync();用於獲取異步上下文,後續咱們經過這個異步上下文進行回調返回數據,有點像咱們買衣服的時候,留給老闆一個電話。

而這個上下文也是一個電話,當有衣服到的時候,也就是當有數據準備好的時候就能夠打電話發送數據了。ac.complete();用來進行長連接的關閉。

Spring MVC 異步化

如今其實不多人來進行 Serlvet 編程,都是直接採用現成的一些框架,好比 Struts2,Spring MVC。下面介紹下使用 Spring MVC 如何進行異步化:

首先確認你的項目中的 Servlet 是 3.0 以上,其次 Spring MVC 4.0+:

<dependency> 
     <groupId>javax.servlet</groupId> 
     <artifactId>javax.servlet-api</artifactId> 
     <version>3.1.0</version> 
     <scope>provided</scope> 
   </dependency> 
   <dependency> 
     <groupId>org.springframework</groupId> 
     <artifactId>spring-webmvc</artifactId> 
     <version>4.2.3.RELEASE</version> 
   </dependency> 

web.xml 頭部聲明,必需要 3.0,Filter 和 Serverlet 設置爲異步:

<?xml version="1.0" encoding="UTF-8"?> 
<web-app version="3.0" xmlns="http://java.sun.com/xml/ns/javaee" 
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
   xsi:schemaLocation="http://java.sun.com/xml/ns/javaee 
   http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"> 
   <filter> 
       <filter-name>testFilter</filter-name> 
       <filter-class>com.TestFilter</filter-class> 
       <async-supported>true</async-supported> 
   </filter> 
 
   <servlet> 
       <servlet-name>mvc-dispatcher</servlet-name> 
       <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class> 
       ......... 
       <async-supported>true</async-supported> 
   </servlet> 

使用 Spring MVC 封裝了 Servlet 的 AsyncContext,使用起來比較簡單。之前咱們同步的模式的 Controller 是返回 ModelAndView。

而異步模式直接生成一個 DeferredResult(支持咱們超時擴展)便可保存上下文,下面給出如何和咱們 HttpClient 搭配的簡單 demo:

@RequestMapping(value="/asynctask", method = RequestMethod.GET) 
   public DeferredResult<String> asyncTask() throws IOReactorException { 
       IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); 
       ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); 
       PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); 
       conManager.setMaxTotal(100); 
       conManager.setDefaultMaxPerRoute(100); 
       CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); 
       // Start the client 
       httpclient.start(); 
       //設置超時時間200ms 
       final DeferredResult<String> deferredResult = new DeferredResult<String>(200L); 
       deferredResult.onTimeout(new Runnable() { 
           @Override 
           public void run() { 
               System.out.println("異步調用執行超時!thread id is : " + Thread.currentThread().getId()); 
               deferredResult.setResult("超時了"); 
           } 
       }); 
       System.out.println("/asynctask 調用!thread id is : " + Thread.currentThread().getId()); 
       final HttpGet request2 = new HttpGet("http://www.apache.org/"); 
       httpclient.execute(request2, new FutureCallback<HttpResponse>() { 
 
           public void completed(final HttpResponse response2) { 
               System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); 
               deferredResult.setResult(request2.getRequestLine() + "->" + response2.getStatusLine()); 
           } 
 
           public void failed(final Exception ex) { 
               System.out.println(request2.getRequestLine() + "->" + ex); 
           } 
 
           public void cancelled() { 
               System.out.println(request2.getRequestLine() + " cancelled"); 
           } 
 
       }); 
       return deferredResult; 
   } 

注意:在 Serlvet 異步化中有個問題是 Filter 的後置結果處理,無法使用,對於咱們一些打點,結果統計直接使用 Serlvet 異步是無法用的。

在 Spring MVC 中就很好的解決了這個問題,Spring MVC 採用了一個比較取巧的方式經過請求轉發,能讓請求再次經過過濾器。

可是又引入了新的一個問題那就是過濾器會處理兩次,這裏能夠經過 Spring MVC 源碼中自身判斷的方法。

咱們能夠在 Filter 中使用下面這句話來進行判斷是否是屬於 Spring MVC 轉發過來的請求,從而不處理 Filter 的前置事件,只處理後置事件:

Object asyncManagerAttr = servletRequest.getAttribute(WEB_ASYNC_MANAGER_ATTRIBUTE); 
return asyncManagerAttr instanceof WebAsyncManager ; 

全鏈路異步化

上面咱們介紹了 Serlvet 的異步化,相信細心的同窗都看出來彷佛並無解決根本的問題,個人 IO 阻塞依然存在,只是換了個位置而已。

當 IO 調用頻繁一樣會讓業務線程池快速變滿,雖然 Serlvet 容器線程不被阻塞,可是這個業務依然會變得不可用。

 

 

 

 

 

那麼怎麼才能解決上面的問題呢?答案就是全鏈路異步化,全鏈路異步追求的是沒有阻塞,打滿你的 CPU,把機器的性能壓榨到極致。模型圖以下:

 

 

 

 

 

具體的 NIO Client 到底作了什麼事呢,具體以下面模型:

 

 

 

 

 

上面就是咱們全鏈路異步的圖了(部分線程池能夠優化)。全鏈路的核心在於只要咱們遇到 IO 調用的時候,咱們就可使用 NIO,從而避免阻塞,也就解決了以前說的業務線程池被打滿的尷尬場景。

遠程調用異步化

咱們通常遠程調用使用 RPC 或者 HTTP:

  • 對於 RPC 來講,通常 Thrift,HTTP,Motan 等支持都異步調用,其內部原理也都是採用事件驅動的 NIO 模型。
  • 對於 HTTP 來講,通常的 Apache HTTP Client 和 Okhttp 也都提供了異步調用。

下面簡單介紹下 HTTP 異步化調用是怎麼作的。首先來看一個例子:

public class HTTPAsyncClientDemo { 
   public static void main(String[] args) throws ExecutionException, InterruptedException, IOReactorException { 
     //具體參數含義下文會講 
      //apache提供了ioReactor的參數配置,這裏咱們配置IO 線程爲1 
       IOReactorConfig ioReactorConfig = IOReactorConfig.custom().setIoThreadCount(1).build(); 
     //根據這個配置建立一個ioReactor 
       ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor(ioReactorConfig); 
     //asyncHttpClient使用PoolingNHttpClientConnectionManager管理咱們客戶端鏈接 
       PoolingNHttpClientConnectionManager conManager = new PoolingNHttpClientConnectionManager(ioReactor); 
     //設置總共的鏈接的最大數量 
       conManager.setMaxTotal(100); 
     //設置每一個路由的鏈接的最大數量 
       conManager.setDefaultMaxPerRoute(100); 
     //建立一個Client 
       CloseableHttpAsyncClient httpclient = HttpAsyncClients.custom().setConnectionManager(conManager).build(); 
       // Start the client 
       httpclient.start(); 
 
       // Execute request 
       final HttpGet request1 = new HttpGet("http://www.apache.org/"); 
       Future<HttpResponse> future = httpclient.execute(request1, null); 
       // and wait until a response is received 
       HttpResponse response1 = future.get(); 
       System.out.println(request1.getRequestLine() + "->" + response1.getStatusLine()); 
 
       // One most likely would want to use a callback for operation result 
       final HttpGet request2 = new HttpGet("http://www.apache.org/"); 
       httpclient.execute(request2, new FutureCallback<HttpResponse>() { 
                       //Complete成功後會回調這個方法 
           public void completed(final HttpResponse response2) { 
               System.out.println(request2.getRequestLine() + "->" + response2.getStatusLine()); 
           } 
 
           public void failed(final Exception ex) { 
               System.out.println(request2.getRequestLine() + "->" + ex); 
           } 
 
           public void cancelled() { 
               System.out.println(request2.getRequestLine() + " cancelled"); 
           } 
 
       }); 
   } 
} 

下面給出 httpAsync 的整個類圖:

 

 

 

 

 

對於咱們的 HTTPAysncClient 最後使用的是 InternalHttpAsyncClient,在 InternalHttpAsyncClient 中有個 ConnectionManager,這個就是咱們管理鏈接的管理器。

而在 httpAsync 中只有一個實現那就是 PoolingNHttpClientConnectionManager。

這個鏈接管理器中有兩個咱們比較關心的,一個是 Reactor,一個是 Cpool:

  • Reactor:全部的 Reactor 這裏都是實現了 IOReactor 接口。在 PoolingNHttpClientConnectionManager 中會有擁有一個 Reactor,那就是 DefaultConnectingIOReactor,這個 DefaultConnectingIOReactor,負責處理 Acceptor。

在 DefaultConnectingIOReactor 有個 excutor 方法,生成 IOReactor 也就是咱們圖中的 BaseIOReactor,進行 IO 的操做。這個模型就是咱們上面的 1.2.2 的模型。

  • CPool:在 PoolingNHttpClientConnectionManager 中有個 CPool,主要是負責控制咱們鏈接,咱們上面所說的 maxTotal 和 defaultMaxPerRoute,都是由其進行控制。

若是每一個路由有滿了,它會斷開最老的一個連接;若是總共的 total 滿了,它會放入 leased 隊列,釋放空間的時候就會將其從新鏈接。

數據庫調用異步化

對於數據庫調用通常的框架並無提供異步化的方法,這裏推薦本身封裝或者使用網上開源的。

異步化並非高併發的銀彈,可是有了異步化的確能提升你機器的 QPS,吞吐量等等。

上述講的一些模型若是能合理的作一些優化,而後進行應用,相信能對你的服務有很大的幫助。

高併發大殺器:並行化

想必熱愛遊戲的同窗小時候都幻想過要是本身會分身之術,就能一邊打遊戲一邊上課了。

惋惜現實中並無這個技術,你要麼只有老老實實的上課,要麼就只有逃課去打遊戲了。

雖然在現實中咱們沒法實現分身這樣的技術,可是咱們能夠在計算機世界中實現這樣的願望。

計算機中的分身術

計算機中的分身術不是天生就有了。在 1971 年,英特爾推出的全球第一顆通用型微處理器 4004,由 2300 個晶體管構成。

當時,公司的聯合創始人之一戈登摩爾就提出大名鼎鼎的「摩爾定律」——每過 18 個月,芯片上能夠集成的晶體管數目將增長一倍。

最初的主頻 740KHz(每秒運行 74 萬次),如今過了快 50 年了,你們去買電腦的時候會發現如今的主頻都能達到 4.0GHZ了(每秒 40 億次)。

可是主頻越高帶來的收益倒是愈來愈小:

  • 據測算,主頻每增長 1G,功耗將上升 25 瓦,而在芯片功耗超過 150 瓦後,現有的風冷散熱系統將沒法知足散熱的須要。有部分 CPU 均可以用來煎雞蛋了。
  • 流水線過長,使得單位頻率效能低下,越大的主頻其實總體性能反而不如小的主頻。
  • 戈登摩爾認爲摩爾定律將來 10-20 年會失效。

在單核主頻遇到瓶頸的狀況下,多核 CPU 應運而生,不只提高了性能,而且下降了功耗。

因此多核 CPU 逐漸成爲如今市場的主流,這樣讓咱們的多線程編程也更加的容易。

說到了多核 CPU 就必定要說 GPU,你們可能對這個比較陌生,可是一說到顯卡就確定不陌生,筆者搞過一段時間的 CUDA 編程,我才意識到這個纔是真正的並行計算。

你們都知道圖片像素點吧,好比 1920*1080 的圖片有 210 萬個像素點,若是想要把一張圖片的每一個像素點都進行轉換一下,那在咱們 Java 裏面可能就要循環遍歷 210 萬次。

就算咱們用多線程 8 核 CPU,那也得循環幾十萬次。可是若是使用 Cuda,最多能夠 365535*512 = 100661760(一億)個線程並行執行,就這種級別的圖片那也是立刻處理完成。

可是 Cuda 通常適合於圖片這種,有大量的像素點須要同時處理,可是指令集不多因此邏輯不能太複雜。

應用中的並行

一提及讓你的服務高性能的手段,那麼異步化,並行化這些確定會第一時間在你腦海中顯現出來,並行化能夠用來配合異步化,也能夠用來單獨作優化。

咱們能夠想一想有這麼一個需求,在你下外賣訂單的時候,這筆訂單可能還須要查用戶信息,折扣信息,商家信息,菜品信息等。

用同步的方式調用,以下圖所示:

 

 

 

 

 

設想一下這 5 個查詢服務,平均每次消耗 50ms,那麼本次調用至少是 250ms,咱們細想一下,這五個服務其實並無任何的依賴,誰先獲取誰後獲取均可以。

那麼咱們能夠想一想,是否能夠用多重影分身之術,同時獲取這五個服務的信息呢?

優化以下:

 

 

 

 

 

將這五個查詢服務並行查詢,在理想狀況下能夠優化至 50ms。固然提及來簡單,咱們真正如何落地呢?

CountDownLatch/Phaser

CountDownLatch 和 Phaser 是 JDK 提供的同步工具類。Phaser 是 1.7 版本以後提供的工具類。而 CountDownLatch 是 1.5 版本以後提供的工具類。

這裏簡單介紹一下 CountDownLatch,能夠將其當作是一個計數器,await()方法能夠阻塞至超時或者計數器減至 0,其餘線程當完成本身目標的時候能夠減小 1,利用這個機制咱們能夠用來作併發。

能夠用以下的代碼實現咱們上面的下訂單的需求:

public class CountDownTask { 
   private static final int CORE_POOL_SIZE = 4; 
   private static final int MAX_POOL_SIZE = 12; 
   private static final long KEEP_ALIVE_TIME = 5L; 
   private final static int QUEUE_SIZE = 1600; 
 
   protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 
           KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); 
   public static void main(String[] args) throws InterruptedException { 
       // 新建一個爲5的計數器 
       CountDownLatch countDownLatch = new CountDownLatch(5); 
       OrderInfo orderInfo = new OrderInfo(); 
       THREAD_POOL.execute(() -> { 
           System.out.println("當前任務Customer,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setCustomerInfo(new CustomerInfo()); 
           countDownLatch.countDown(); 
       }); 
       THREAD_POOL.execute(() -> { 
           System.out.println("當前任務Discount,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setDiscountInfo(new DiscountInfo()); 
           countDownLatch.countDown(); 
       }); 
       THREAD_POOL.execute(() -> { 
           System.out.println("當前任務Food,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setFoodListInfo(new FoodListInfo()); 
           countDownLatch.countDown(); 
       }); 
       THREAD_POOL.execute(() -> { 
           System.out.println("當前任務Tenant,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setTenantInfo(new TenantInfo()); 
           countDownLatch.countDown(); 
       }); 
       THREAD_POOL.execute(() -> { 
           System.out.println("當前任務OtherInfo,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setOtherInfo(new OtherInfo()); 
           countDownLatch.countDown(); 
       }); 
       countDownLatch.await(1, TimeUnit.SECONDS); 
       System.out.println("主線程:"+ Thread.currentThread().getName()); 
   } 
} 

創建一個線程池(具體配置根據具體業務,具體機器配置),進行併發的執行咱們的任務(生成用戶信息,菜品信息等),最後利用 await 方法阻塞等待結果成功返回。

CompletableFuture

相信各位同窗已經發現,CountDownLatch 雖然能實現咱們須要知足的功能可是其仍然有個問題是,咱們的業務代碼須要耦合 CountDownLatch 的代碼。

好比在咱們獲取用戶信息以後,咱們會執行 countDownLatch.countDown(),很明顯咱們的業務代碼顯然不該該關心這一部分邏輯,而且在開發的過程當中萬一寫漏了,那咱們的 await 方法將只會被各類異常喚醒。

因此在 JDK 1.8 中提供了一個類 CompletableFuture,它是一個多功能的非阻塞的 Future。(什麼是 Future:用來表明異步結果,而且提供了檢查計算完成,等待完成,檢索結果完成等方法。)

咱們將每一個任務的計算完成的結果都用 CompletableFuture 來表示,利用 CompletableFuture.allOf 匯聚成一個大的 CompletableFuture,那麼利用 get()方法就能夠阻塞。

public class CompletableFutureParallel { 
   private static final int CORE_POOL_SIZE = 4; 
   private static final int MAX_POOL_SIZE = 12; 
   private static final long KEEP_ALIVE_TIME = 5L; 
   private final static int QUEUE_SIZE = 1600; 
 
   protected final static ExecutorService THREAD_POOL = new ThreadPoolExecutor(CORE_POOL_SIZE, MAX_POOL_SIZE, 
           KEEP_ALIVE_TIME, TimeUnit.SECONDS, new LinkedBlockingQueue<>(QUEUE_SIZE)); 
   public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException { 
       OrderInfo orderInfo = new OrderInfo(); 
       //CompletableFuture 的List 
       List<CompletableFuture> futures = new ArrayList<>(); 
       futures.add(CompletableFuture.runAsync(() -> { 
           System.out.println("當前任務Customer,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setCustomerInfo(new CustomerInfo()); 
       }, THREAD_POOL)); 
       futures.add(CompletableFuture.runAsync(() -> { 
           System.out.println("當前任務Discount,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setDiscountInfo(new DiscountInfo()); 
       }, THREAD_POOL)); 
       futures.add( CompletableFuture.runAsync(() -> { 
           System.out.println("當前任務Food,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setFoodListInfo(new FoodListInfo()); 
       }, THREAD_POOL)); 
       futures.add(CompletableFuture.runAsync(() -> { 
           System.out.println("當前任務Other,線程名字爲:" + Thread.currentThread().getName()); 
           orderInfo.setOtherInfo(new OtherInfo()); 
       }, THREAD_POOL)); 
       CompletableFuture allDoneFuture = CompletableFuture.allOf(futures.toArray(new CompletableFuture[futures.size()])); 
       allDoneFuture.get(10, TimeUnit.SECONDS); 
       System.out.println(orderInfo); 
   } 
} 

能夠看見咱們使用 CompletableFuture 能很快的完成需求,固然這還不夠。

Fork/Join

咱們上面用 CompletableFuture 完成了對多組任務並行執行,可是它依然是依賴咱們的線程池。

在咱們的線程池中使用的是阻塞隊列,也就是當咱們某個線程執行完任務的時候須要經過這個阻塞隊列進行,那麼確定會發生競爭,因此在 JDK 1.7 中提供了 ForkJoinTask 和 ForkJoinPool。

 

 

 

 

 

ForkJoinPool 中每一個線程都有本身的工做隊列,而且採用 Work-Steal 算法防止線程飢餓。

Worker 線程用 LIFO 的方法取出任務,可是會用 FIFO 的方法去偷取別人隊列的任務,這樣就減小了鎖的衝突。

 

 

 

 

 

網上這個框架的例子不少,咱們看看如何使用代碼完成咱們上面的下訂單需求:

public class OrderTask extends RecursiveTask<OrderInfo> { 
   @Override 
   protected OrderInfo compute() { 
       System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName()); 
       // 定義其餘五種並行TasK 
       CustomerTask customerTask = new CustomerTask(); 
       TenantTask tenantTask = new TenantTask(); 
       DiscountTask discountTask = new DiscountTask(); 
       FoodTask foodTask = new FoodTask(); 
       OtherTask otherTask = new OtherTask(); 
       invokeAll(customerTask, tenantTask, discountTask, foodTask, otherTask); 
       OrderInfo orderInfo = new OrderInfo(customerTask.join(), tenantTask.join(), discountTask.join(), foodTask.join(), otherTask.join()); 
       return orderInfo; 
   } 
   public static void main(String[] args) { 
       ForkJoinPool forkJoinPool = new ForkJoinPool(Runtime.getRuntime().availableProcessors() -1 ); 
       System.out.println(forkJoinPool.invoke(new OrderTask())); 
   } 
} 
class CustomerTask extends RecursiveTask<CustomerInfo>{ 
 
   @Override 
   protected CustomerInfo compute() { 
       System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName()); 
       return new CustomerInfo(); 
   } 
} 
class TenantTask extends RecursiveTask<TenantInfo>{ 
 
   @Override 
   protected TenantInfo compute() { 
       System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName()); 
       return new TenantInfo(); 
   } 
} 
class DiscountTask extends RecursiveTask<DiscountInfo>{ 
 
   @Override 
   protected DiscountInfo compute() { 
       System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName()); 
       return new DiscountInfo(); 
   } 
} 
class FoodTask extends RecursiveTask<FoodListInfo>{ 
 
   @Override 
   protected FoodListInfo compute() { 
       System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName()); 
       return new FoodListInfo(); 
   } 
} 
class OtherTask extends RecursiveTask<OtherInfo>{ 
 
   @Override 
   protected OtherInfo compute() { 
       System.out.println("執行"+ this.getClass().getSimpleName() + "線程名字爲:" + Thread.currentThread().getName()); 
       return new OtherInfo(); 
   } 
} 

咱們定義一個 Order Task 而且定義五個獲取信息的任務,在 Compute 中分別 Fork 執行這五個任務,最後在將這五個任務的結果經過 Join 得到,最後完成咱們的並行化的需求。

parallelStream

在 JDK 1.8 中提供了並行流的 API,當咱們使用集合的時候能很好的進行並行處理。

下面舉了一個簡單的例子從 1 加到 100:

public class ParallelStream { 
   public static void main(String[] args) { 
       ArrayList<Integer> list = new ArrayList<Integer>(); 
       for (int i = 1; i <= 100; i++) { 
           list.add(i); 
       } 
       LongAdder sum = new LongAdder(); 
       list.parallelStream().forEach(integer -> { 
//            System.out.println("當前線程" + Thread.currentThread().getName()); 
           sum.add(integer); 
       }); 
       System.out.println(sum); 
   } 
} 

parallelStream 中底層使用的那一套也是 Fork/Join 的那一套,默認的併發程度是可用 CPU 數 -1。

分片

能夠想象有這麼一個需求,天天定時對 ID 在某個範圍之間的用戶發券,好比這個範圍之間的用戶有幾百萬,若是給一臺機器發的話,可能所有發完須要好久的時間。

因此分佈式調度框架好比:elastic-job 都提供了分片的功能,好比你用 50 臺機器,那麼 id%50 = 0 的在第 0 臺機器上;=1 的在第 1 臺機器上發券,那麼咱們的執行時間其實就分攤到了不一樣的機器上了。

並行化注意事項

線程安全:在 parallelStream 中咱們列舉的代碼中使用的是 LongAdder,並無直接使用咱們的 Integer 和 Long,這個是由於在多線程環境下 Integer 和 Long 線程不安全。因此線程安全咱們須要特別注意。

合理參數配置:能夠看見咱們須要配置的參數比較多,好比咱們的線程池的大小,等待隊列大小,並行度大小以及咱們的等待超時時間等等。

咱們都須要根據本身的業務不斷的調優防止出現隊列不夠用或者超時時間不合理等等。

上面介紹了什麼是並行化,並行化的各類歷史,在 Java 中如何實現並行化,以及並行化的注意事項。但願你們對並行化有個比較全面的認識。

歡迎工做一到五年的Java工程師朋友們加入Java架構開發: 855835163 羣內提供免費的Java架構學習資料(裏面有高可用、高併發、高性能及分佈式、Jvm性能調優、Spring源碼,MyBatis,Netty,Redis,Kafka,Mysql,Zookeeper,Tomcat,Docker,Dubbo,Nginx等多個知識點的架構資料)合理利用本身每一分每一秒的時間來學習提高本身,不要再用"沒有時間「來掩飾本身思想上的懶惰!趁年輕,使勁拼,給將來的本身一個交代!

相關文章
相關標籤/搜索