論如何優雅的自定義ThreadPoolExecutor線程池

更好的markDown閱讀體驗可直接訪問個人CSDN博客:https://blog.csdn.net/u012881584/article/details/85221635java

前言

線程池想必你們也都用過,JDK的Executors 也自帶一些線程池。可是不知道你們有沒有想過,如何纔是最優雅的方式去使用過線程池嗎? 生產環境要怎麼去配置本身的線程池纔是合理的呢?
今天週末,恰好有時間來總結一下本身所認爲的'優雅', 若有問題歡迎你們指正。面試

線程池使用規則

要使用好線程池,那麼必定要遵循幾個規則:數據庫

  1. 線程個數大小的設置
  2. 線程池相關參數配置
  3. 利用Hook嵌入你的行爲
  4. 線程池的關閉

線程池配置相關

線程池大小的設置

這實際上是一個面試的考點,不少面試官會問你線程池coreSize 的大小來考察你對於線程池的理解。
首先針對於這個問題,咱們必需要明確咱們的需求是計算密集型仍是IO密集型,只有瞭解了這一點,咱們才能更好的去設置線程池的數量進行限制。編程

一、計算密集型:
顧名思義就是應用須要很是多的CPU計算資源,在多核CPU時代,咱們要讓每個CPU核心都參與計算,將CPU的性能充分利用起來,這樣纔算是沒有浪費服務器配置,若是在很是好的服務器配置上還運行着單線程程序那將是多麼重大的浪費。對於計算密集型的應用,徹底是靠CPU的核數來工做,因此爲了讓它的優點徹底發揮出來,避免過多的線程上下文切換,比較理想方案是:緩存

線程數 = CPU核數+1,也能夠設置成CPU核數*2,但還要看JDK的版本以及CPU配置(服務器的CPU有超線程)。服務器

通常設置CPU * 2便可。網絡

二、IO密集型
咱們如今作的開發大部分都是WEB應用,涉及到大量的網絡傳輸,不只如此,與數據庫,與緩存間的交互也涉及到IO,一旦發生IO,線程就會處於等待狀態,當IO結束,數據準備好後,線程纔會繼續執行。所以從這裏能夠發現,對於IO密集型的應用,咱們能夠多設置一些線程池中線程的數量,這樣就能讓在等待IO的這段時間內,線程能夠去作其它事,提升併發處理效率。那麼這個線程池的數據量是否是能夠隨便設置呢?固然不是的,請必定要記得,線程上下文切換是有代價的。目前總結了一套公式,對於IO密集型應用:
線程數 = CPU核心數/(1-阻塞係數) 這個阻塞係數通常爲0.8~0.9之間,也能夠取0.8或者0.9。
套用公式,對於雙核CPU來講,它比較理想的線程數就是20,固然這都不是絕對的,須要根據實際狀況以及實際業務來調整:final int poolSize = (int)(cpuCore/(1-0.9))併發

針對於阻塞係數,《Programming Concurrency on the JVM Mastering》即《Java 虛擬機併發編程》中有提到一句話:app

對於阻塞係數,咱們能夠先試着猜想,抑或採用一些性能分析工具或java.lang.management API 來肯定線程花在系統/IO操做上的時間與CPU密集任務所耗的時間比值。ide

線程池相關參數配置

說到這一點,咱們只須要謹記一點,必定不要選擇沒有上限限制的配置項
這也是爲何不建議使用Executors 中建立線程的方法。
好比,Executors.newCachedThreadPool的設置與無界隊列的設置由於某些不可預期的狀況,線程池會出現系統異常,致使線程暴增的狀況或者任務隊列不斷膨脹,內存耗盡致使系統崩潰和異常。 咱們推薦使用自定義線程池來避免該問題,這也是在使用線程池規範的首要原則! 當心無大錯,千萬別過分自信!
能夠看下Executors中四個建立線程池的方法:

//使用無界隊列
public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

//線程池數量是無限的
public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

其餘的就再也不列舉了,你們能夠自行查閱源碼。

第二,合理設置線程數量、和線程空閒回收時間,根據具體的任務執行週期和時間去設定,避免頻繁的回收和建立,雖然咱們使用線程池的目的是爲了提高系統性能和吞吐量,可是也要考慮下系統的穩定性,否則出現不可預期問題會很麻煩!
第三,根據實際場景,選擇適用於本身的拒絕策略。進行補償,不要亂用JDK支持的自動補償機制!儘可能採用自定義的拒絕策略去進行兜底!
第四,線程池拒絕策略,自定義拒絕策略能夠實現RejectedExecutionHandler接口。
JDK自帶的拒絕策略以下:
AbortPolicy:直接拋出異常阻止系統正常工做。
CallerRunsPolicy:只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務。
DiscardOldestPolicy:丟棄最老的一個請求,嘗試再次提交當前任務。
DiscardPolicy:丟棄沒法處理的任務,不給予任何處理。

利用Hook

利用Hook,留下線程池執行軌跡:
ThreadPoolExecutor提供了protected類型能夠被覆蓋的鉤子方法,容許用戶在任務執行以前會執行以後作一些事情。咱們能夠經過它來實現好比初始化ThreadLocal、收集統計信息、如記錄日誌等操做。這類Hook如beforeExecute和afterExecute。另外還有一個Hook能夠用來在任務被執行完的時候讓用戶插入邏輯,如rerminated 。
若是hook方法執行失敗,則內部的工做線程的執行將會失敗或被中斷。

咱們可使用beforeExecute和afterExecute來記錄線程以前前和後的一些運行狀況,也能夠直接把運行完成後的狀態記錄到ELK等日誌系統。

關閉線程池

內容當線程池不在被引用而且工做線程數爲0的時候,線程池將被終止。咱們也能夠調用shutdown來手動終止線程池。若是咱們忘記調用shutdown,爲了讓線程資源被釋放,咱們還可使用keepAliveTime和allowCoreThreadTimeOut來達到目的!
固然,穩妥的方式是使用虛擬機Runtime.getRuntime().addShutdownHook方法,手工去調用線程池的關閉方法!

線程池使用實例

線程池核心代碼:

public class AsyncProcessQueue {

 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

 /**
  * Task 包裝類<br>
  * 此類型的意義是記錄可能會被 Executor 吃掉的異常<br>
  */
 public static class TaskWrapper implements Runnable {
  private static final Logger _LOGGER = LoggerFactory.getLogger(TaskWrapper.class);

  private final Runnable gift;

  public TaskWrapper(final Runnable target) {
   this.gift = target;
  }

  @Override
  public void run() {

   // 捕獲異常,避免在 Executor 裏面被吞掉了
   if (gift != null) {

    try {
     gift.run();
    } catch (Exception e) {
     _LOGGER.error("Wrapped target execute exception.", e);
    }
   }
  }
 }

 // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

 /**
  * 執行指定的任務
  * 
  * @param task
  * @return
  */
 public static boolean execute(final Runnable task) {
  return AsyncProcessor.executeTask(new TaskWrapper(task));
 }
}
public class AsyncProcessor {
 static final Logger LOGGER = LoggerFactory.getLogger(AsyncProcessor.class);

 /**
  * 默認最大併發數<br>
  */
 private static final int DEFAULT_MAX_CONCURRENT = Runtime.getRuntime().availableProcessors() * 2;

 /**
  * 線程池名稱格式
  */
 private static final String THREAD_POOL_NAME = "ExternalConvertProcessPool-%d";

 /**
  * 線程工廠名稱
  */
 private static final ThreadFactory FACTORY = new BasicThreadFactory.Builder().namingPattern(THREAD_POOL_NAME)
   .daemon(true).build();

 /**
  * 默認隊列大小
  */
 private static final int DEFAULT_SIZE = 500;

 /**
  * 默認線程存活時間
  */
 private static final long DEFAULT_KEEP_ALIVE = 60L;

 /**NewEntryServiceImpl.java:689
  * Executor
  */
 private static ExecutorService executor;

 /**
  * 執行隊列
  */
 private static BlockingQueue<Runnable> executeQueue = new ArrayBlockingQueue<>(DEFAULT_SIZE);

 static {
  // 建立 Executor
  // 此處默認最大值改成處理器數量的 4 倍
  try {
   executor = new ThreadPoolExecutor(DEFAULT_MAX_CONCURRENT, DEFAULT_MAX_CONCURRENT * 4, DEFAULT_KEEP_ALIVE,
     TimeUnit.SECONDS, executeQueue, FACTORY);

   // 關閉事件的掛鉤
   Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {

    @Override
    public void run() {
     AsyncProcessor.LOGGER.info("AsyncProcessor shutting down.");

     executor.shutdown();

     try {

      // 等待1秒執行關閉
      if (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
       AsyncProcessor.LOGGER.error("AsyncProcessor shutdown immediately due to wait timeout.");
       executor.shutdownNow();
      }
     } catch (InterruptedException e) {
      AsyncProcessor.LOGGER.error("AsyncProcessor shutdown interrupted.");
      executor.shutdownNow();
     }

     AsyncProcessor.LOGGER.info("AsyncProcessor shutdown complete.");
    }
   }));
  } catch (Exception e) {
   LOGGER.error("AsyncProcessor init error.", e);
   throw new ExceptionInInitializerError(e);
  }
 }

 /**
  * 此類型沒法實例化
  */
 private AsyncProcessor() {
 }

 /**
  * 執行任務,不論是否成功<br>
  * 其實也就是包裝之後的 {@link Executer} 方法
  * 
  * @param task
  * @return
  */
 public static boolean executeTask(Runnable task) {

  try {
   executor.execute(task);
  } catch (RejectedExecutionException e) {
   LOGGER.error("Task executing was rejected.", e);
   return false;
  }

  return true;
 }

 /**
  * 提交任務,並能夠在稍後獲取其執行狀況<br>
  * 當提交失敗時,會拋出 {@link }
  * 
  * @param task
  * @return
  */
 public static <T> Future<T> submitTask(Callable<T> task) {

  try {
   return executor.submit(task);
  } catch (RejectedExecutionException e) {
   LOGGER.error("Task executing was rejected.", e);
   throw new UnsupportedOperationException("Unable to submit the task, rejected.", e);
  }
 }
}

使用方式:

AsyncProcessQueue.execute(new Runnable() {
          @Override
         public void run() {
               //do something
        }
});

能夠根據本身的使用場景靈活變動,我這裏並無用到beforeExecute和afterExecute以及拒絕策略。

相關文章
相關標籤/搜索