[Java併發-22-併發設計模式] Thread-Per-Message 與 Worker Thread 模式

咱們曾經把併發編程領域的問題總結爲三個核心問題:分工、同步和互斥。其中,同步和互斥相關問題更多地源自微觀,而分工問題則是源自宏觀。咱們解決問題,每每都是從宏觀入手,一樣,解決併發編程問題,首要問題也是解決宏觀的分工問題編程

併發編程領域裏,解決分工問題也有一系列的設計模式,比較經常使用的主要有 Thread-Per-Message 模式、Worker Thread 模式、生產者 - 消費者模式等等。今天咱們重點介紹 Thread-Per-Message 模式。設計模式

如何理解 Thread-Per-Message 模式

好比寫一個 HTTP Server,很顯然只能在主線程中接收請求,而不能處理 HTTP 請求,由於若是在主線程中處理 HTTP 請求的話,那同一時間只能處理一個請求,太慢了!怎麼辦呢?能夠利用代辦的思路,建立一個子線程,委託子線程去處理 HTTP 請求。網絡

這種委託他人辦理的方式,在併發編程領域被總結爲一種設計模式,叫作Thread-Per-Message 模式,簡言之就是爲每一個任務分配一個獨立的線程。這是一種最簡單的分工方法,實現起來也很是簡單。多線程

用 Thread 實現 Thread-Per-Message 模式

Thread-Per-Message 模式的一個最經典的應用場景是網絡編程裏服務端的實現,服務端爲每一個客戶端請求建立一個獨立的線程,當線程處理完請求後,自動銷燬,這是一種最簡單的併發處理網絡請求的方法。併發

下面咱們就以 echo 程序的服務端爲例,介紹如何實現 Thread-Per-Message 模式。編程語言

final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(
    new InetSocketAddress(8080));
// 處理請求    
try {
  while (true) {
    // 接收請求
    SocketChannel sc = ssc.accept();
    // 每一個請求都建立一個線程
    new Thread(()->{
      try {
        // 讀 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模擬處理請求
        Thread.sleep(2000);
        // 寫 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip();
        sc.write(wb);
        // 關閉 Socket
        sc.close();
      }catch(Exception e){
        throw new UncheckedIOException(e);
      }
    }).start();
  }
} finally {
  ssc.close();
}

若是你熟悉網絡編程,相信你必定會提出一個很尖銳的問題:上面這個 echo 服務的實現方案是不具有可行性的。緣由在於 Java 中的線程是一個重量級的對象,建立成本很高,一方面建立線程比較耗時,另外一方面線程佔用的內存也比較大。因此,爲每一個請求建立一個新的線程並不適合高併發場景。高併發

因而,你開始質疑 Thread-Per-Message 模式,並且開始從新思索解決方案,這時候極可能你會想到 Java 提供的線程池。工具

Thread-Per-Message 模式雖然做爲一種最簡單的分工方案,Java 語言支持不了,顯然是 Java 語言自己的問題。性能

Java 語言裏,Java 線程是和操做系統線程一一對應的,這種作法本質上是將 Java 線程的調度權徹底委託給操做系統,而操做系統在這方面很是成熟,因此這種作法的好處是穩定、可靠,可是也繼承了操做系統線程的缺點:建立成本高。爲了解決這個缺點,Java 併發包裏提供了線程池等工具類。這個思路在很長一段時間裏都是很穩妥的方案,可是這個方案並非惟一的方案。學習

業界還有另一種方案,叫作輕量級線程。這個方案在 Java 領域知名度並不高,可是在其餘編程語言裏卻叫得很響,例如 Go 語言、Lua 語言裏的協程,本質上就是一種輕量級的線程。輕量級的線程,建立的成本很低,基本上和建立一個普通對象的成本類似;而且建立的速度和內存佔用相比操做系統線程至少有一個數量級的提高,因此基於輕量級線程實現 Thread-Per-Message 模式就徹底沒有問題了。

Worker Thread 模式及其實現

Thread-Per-Message 模式,對應到現實世界,其實就是委託代辦。這種分工模式若是用 Java Thread 實現,頻繁地建立、銷燬線程很是影響性能,同時無限制地建立線程還可能致使 OOM,因此在 Java 領域使用場景就受限了。

要想有效避免線程的頻繁建立、銷燬以及 OOM 問題。也是 Java 領域使用最多的 Worker Thread 模式。

Worker Thread 模式能夠類比現實世界裏車間的工做模式:車間裏的工人,有活兒了,你們一塊兒幹,沒活兒了就聊聊天等着。你能夠參考下面的示意圖來理解,Worker Thread 模式中Worker Thread 對應到現實世界裏,其實指的就是車間裏的工人。不過這裏須要注意的是,車間裏的工人數量每每是肯定的。

車間工做示意圖###

那在編程領域該如何模擬車間的這種工做模式呢?或者說如何去實現 Worker Thread 模式呢?經過上面的圖,你很容易就能想到用阻塞隊列作任務池,而後建立固定數量的線程消費阻塞隊列中的任務。其實你仔細想會發現,這個方案就是 Java 語言提供的線程池。

線程池有不少優勢,例如可以避免重複建立、銷燬線程,同時可以限制建立線程的上限等等。學習完上一篇文章後你已經知道,用 Java 的 Thread 實現 Thread-Per-Message 模式難以應對高併發場景,緣由就在於頻繁建立、銷燬 Java 線程的成本有點高,並且無限制地建立線程還可能致使應用 OOM。線程池,則剛好能解決這些問題。

下面的示例代碼是用線程池實現的 echo 服務端,相比於 Thread-Per-Message 模式的實現,改動很是少,僅僅是建立了一個最多線程數爲 500 的線程池 es,而後經過 es.execute() 方法將請求處理的任務提交給線程池處理。

ExecutorService es = Executors
  .newFixedThreadPool(500);
final ServerSocketChannel ssc = 
  ServerSocketChannel.open().bind(
    new InetSocketAddress(8080));
// 處理請求    
try {
  while (true) {
    // 接收請求
    SocketChannel sc = ssc.accept();
    // 將請求處理任務提交給線程池
    es.execute(()->{
      try {
        // 讀 Socket
        ByteBuffer rb = ByteBuffer
          .allocateDirect(1024);
        sc.read(rb);
        // 模擬處理請求
        Thread.sleep(2000);
        // 寫 Socket
        ByteBuffer wb = 
          (ByteBuffer)rb.flip();
        sc.write(wb);
        // 關閉 Socket
        sc.close();
      }catch(Exception e){
        throw new UncheckedIOException(e);
      }
    });
  }
} finally {
  ssc.close();
  es.shutdown();
}

正確地建立線程池

Java 的線程池既可以避免無限制地建立線程致使 OOM,也能避免無限制地接收任務致使 OOM。只不事後者常常容易被咱們忽略,例如在上面的實現中,就被咱們忽略了。因此強烈建議你用建立有界的隊列來接收任務

當請求量大於有界隊列的容量時,就須要合理地拒絕請求。如何合理地拒絕呢?這須要你結合具體的業務場景來制定,建議你在建立線程池時,清晰地指明拒絕策略

同時,爲了便於調試和診斷問題,我也強烈建議你在實際工做中給線程賦予一個業務相關的名字.

ExecutorService es = new ThreadPoolExecutor(
  50, 500,
  60L, TimeUnit.SECONDS,
  // 注意要建立有界隊列
  new LinkedBlockingQueue<Runnable>(2000),
  // 建議根據業務需求實現 ThreadFactory
  r->{
    return new Thread(r, "echo-"+ r.hashCode());
  },
  // 建議根據業務需求實現 RejectedExecutionHandler
  new ThreadPoolExecutor.CallerRunsPolicy());

避免線程死鎖

使用線程池過程當中,還要注意一種線程死鎖的場景。若是提交到相同線程池的任務不是相互獨立的,而是有依賴關係的,那麼就有可能致使線程死鎖。具體現象是應用每運行一段時間偶爾就會處於無響應的狀態,監控數據看上去一切都正常,可是實際上已經不能正常工做了

咱們能夠用下面的示例代碼來模擬該應用,若是你執行下面的這段代碼,會發現它永遠執行不到最後一行。執行過程當中沒有任何異常,可是應用已經中止響應了。

//L一、L2 階段共用的線程池
ExecutorService es = Executors.
  newFixedThreadPool(2);
//L1 階段的閉鎖    
CountDownLatch l1=new CountDownLatch(2);
for (int i=0; i<2; i++){
  System.out.println("L1");
  // 執行 L1 階段任務
  es.execute(()->{
    //L2 階段的閉鎖 
    CountDownLatch l2=new CountDownLatch(2);
    // 執行 L2 階段子任務
    for (int j=0; j<2; j++){
      es.execute(()->{
        System.out.println("L2");
        l2.countDown();
      });
    }
    // 等待 L2 階段任務執行完
    l2.await();
    l1.countDown();
  });
}
// 等着 L1 階段任務執行完
l1.await();
System.out.println("end");

當應用出現相似問題時,首選的診斷方法是查看線程棧。你會發現線程池中的兩個線程所有都阻塞在 l2.await()這裏

緣由找到了,那如何解決就簡單了,最簡單粗暴的辦法就是將線程池的最大線程數調大,若是可以肯定任務的數量不是很是多的話,這個辦法也是可行的,不然這個辦法就行不通了。其實這種問題通用的解決方案是爲不一樣的任務建立不一樣的線程池

提交到相同線程池中的任務必定是相互獨立的,不然就必定要慎重

總結

解決併發編程裏的分工問題,最好的辦法是和現實世界作對比。對比現實世界構建編程領域的模型,可以讓模型更容易理解。 Thread-Per-Message 模式,相似於現實世界裏的委託他人辦理。 Worker Thread 模式則相似於車間裏工人的工做模式。

Worker Thread 模式和 Thread-Per-Message 模式的區別有哪些呢?從現實世界的角度看,你委託代辦人作事,每每是和代辦人直接溝通的;對應到編程領域,其實現也是主線程直接建立了一個子線程,主子線程之間是能夠直接通訊的。而車間工人的工做方式則是徹底圍繞任務展開的,一個具體的任務被哪一個工人執行,預先是沒法知道的;對應到編程領域,則是主線程提交任務到線程池,但主線程並不關心任務被哪一個線程執行。

Worker Thread 模式能避免線程頻繁建立、銷燬的問題,並且可以限制線程的最大數量。Java 語言裏能夠直接使用線程池來實現 Worker Thread 模式,線程池是一個很是基礎和優秀的工具類,甚至有些大廠的編碼規範都不容許用 new Thread() 來建立線程的,必須使用線程池。

使用線程池仍是須要格外謹慎的,如何正確建立線程池、如何避免線程死鎖問題,還須要注意前面咱們曾經提到的 ThreadLocal 內存泄露問題。同時對於提交到線程池的任務,還要作好異常處理,避免異常的任務從眼前溜走,有時沒有發現異常的任務後果每每都很嚴重。

相關文章
相關標籤/搜索