深刻理解Java線程池

  零.引子數組

  線程是程序運行中一個很是重要的概念。一般狀況下,程序從靜態代碼,到解析爲機器碼被加載入內存開始動態運行,就轉變爲一個進程。也能夠說,程序是一個靜態概念,程序運行起來後就變成了一個進程,進程是計算機分配CPU、內存等各類資源的基本單位。緩存

  咱們平時在電腦中開啓一些程序時,好比開啓eclipse,idea等開發工具時,會發現程序啓動較慢,這是由於進程運行所依賴的資源較多,故開啓一個進程耗費的資源和時間也較多。一個進程中每每有多個子任務,從而輪番獲取CPU的執行權,這些須要獲取CPU執行權的子任務就稱爲線程,線程是計算機分配CPU執行權的最小單位。併發

  相對於進程而言,線程的運行僅須要CPU執行權便可,耗費資源與時間大大減小。然而,如今的計算機操做系統基本上都承擔着處理高併發任務的職責,若是併發的線程數量不少,而且每一個線程都是執行一個時間很短的任務就結束了,那麼頻繁建立線程就會大大下降系統的效率,這是由於,頻繁建立線程和銷燬線程也須要時間。eclipse

  這就不禁讓咱們考慮一個問題:有沒有一種辦法使得線程能夠複用,從而避免線程的頻繁建立?也就是說,當線程執行完一個任務時,並不被銷燬,而是能夠繼續執行其餘的任務?ide

  答案是能夠的。在Java中,咱們能夠經過線程池來達到線程複用的效果。咱們接下來詳細講解Java的線程池,爲此,咱們先從最核心的ThreadPoolExecutor類中的方法講起,而後再講述其實現原理,接着給出一個使用示例,最後討論如何合理配置線程池的大小。高併發

  一.工具

  爲了對線程池有個比較全面的理解,咱們先來看類ThreadPoolExecutor的繼承體系:開發工具

  經過上圖可知,接口Executor是線程池體系的頂級接口。上述線程池繼承體系的主要API以下:this

 1 public interface Executor {  2     void execute(Runnable command);  3 }  4 public interface ExecutorService extends Executor {  5     void shutdown();  6     boolean isShutdown();  7     boolean isTerminated();  8     boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException;  9     <T> Future<T> submit(Callable<T> task); 10     <T> Future<T> submit(Runnable task, T result); 11     Future<?> submit(Runnable task); 12     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; 13     <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, throws InterruptedException; 14     <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; 15     <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) 16         throws InterruptedException, ExecutionException, TimeoutException; 17 } 18 public abstract class AbstractExecutorService implements ExecutorService { 19     protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; 20     protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; 21     public Future<?> submit(Runnable task) {}; 22     public <T> Future<T> submit(Runnable task, T result) { }; 23     public <T> Future<T> submit(Callable<T> task) { }; 24     private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos) 25         throws InterruptedException, ExecutionException, TimeoutException { }; 26     public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 27         throws InterruptedException, ExecutionException { }; 28     public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) 29         throws InterruptedException, ExecutionException, TimeoutException { }; 30     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 31         throws InterruptedException { }; 32     public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) 33         throws InterruptedException { 34  }; 35 } 36 public class ThreadPoolExecutor extends AbstractExecutorService { 37  ..... 38     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, 39             BlockingQueue<Runnable> workQueue); 40     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, 41             BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); 42     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, 43             BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); 44     public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, 45         BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); 46  ... 47 }

  在類ThreadPoolExecutor類中,提供了4個構造器,經過對其源代碼觀察,咱們發現前三個構造器都是調用了第四個構造器進行了初始化工做。在此,咱們看下第四個構造器中各個參數的含義。idea

  corePoolSize:核心池的大小。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中。若是調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字也能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。

  maximumPoolSize:線程池最大線程數,它表示在線程池中最多能建立多少個線程。

  keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用。也就是說,當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0。

  unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

1 TimeUnit.DAYS;               //
2 TimeUnit.HOURS;             //小時
3 TimeUnit.MINUTES;           //分鐘
4 TimeUnit.SECONDS;           //
5 TimeUnit.MILLISECONDS;      //毫秒
6 TimeUnit.MICROSECONDS;      //微妙
7 TimeUnit.NANOSECONDS;       //納秒

  workQueue:一個阻塞隊列,用來存儲等待執行的任務,會對線程池的運行過程產生重大影響。通常來講,這裏的阻塞隊列有如下幾種選擇:

1 ArrayBlockingQueue; 2 PriorityBlockingQueue; 3 //上面兩種使用較少,咱們一般使用下面兩種
4 LinkedBlockingQueue; 5 SynchronousQueue; 6 //線程池的排隊策略與BlockingQueue有關

  threadFactory:線程工廠,主要用來建立線程。

  handler:表示當拒絕處理任務時的策略,有如下四種取值:

1 ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 2 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 3 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) 4 ThreadPoolExecutor.CallerRunsPolicy:調用其餘線程處理該任務

  在介紹了類ThreadPoolExecutor的構造器裏的參數的含義後,咱們接下來看該類中幾個很是重要的方法:

1 execute() 2 submit() 3 shutdown() 4 shutdownNow()

  execute()方法其實是接口Executor中聲明的方法,在類ThreadPoolExecutor中進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。

  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務的執行結果。

  shutdown()和shutdownNow()是用來關閉線程池的。

  另外,類ThreadPoolExecutor中還定義了一些其餘的方法,如getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關的屬性。

  二.

  咱們在上面從宏觀層面介紹了類ThreadPoolExecutor,咱們接下來從幾個方面深刻了解線程池的內部實現原理。

  1.線程池狀態

  2.任務的執行

  3.線程池中的線程初始化

  4.任務緩存隊列及排隊策略

  5.任務拒絕策略

  6.線程池的關閉

  7.線程池容量的動態調整

  在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:  

1 volatile int runState; 2 static final int RUNNING    = 0; 3 static final int SHUTDOWN   = 1; 4 static final int STOP       = 2; 5 static final int TERMINATED = 3;

  runState表示當前線程池的狀態,它用一個volatile變量來保證線程之間的可見性。runState下面的幾個static final變量則表示runState可能的幾個取值:

  當建立線程池後,初始時,線程池處於RUNNING狀態;

  若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢;

  若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;

  當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。

  2.任務的執行

  咱們先來看一下類ThreadPoolExecutor中定義的一些比較重要的成員變量:

 1 private final BlockingQueue<Runnable> workQueue;    //任務緩存隊列,用來存放等待執行的任務
 2 private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態鎖,對線程池狀態(好比線程池大小,runState等)的改變都要使用這個鎖
 3 private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工做集 
 4 private volatile long  keepAliveTime;    //線程存活時間 
 5 private volatile boolean allowCoreThreadTimeOut;   //是否容許爲核心線程設置存活時間
 6 private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
 7 private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數 
 8 private volatile int   poolSize;       //線程池中當前的線程數 
 9 private volatile RejectedExecutionHandler handler; //任務拒絕策略
10 private volatile ThreadFactory threadFactory;   //線程工廠,用來建立線程 
11 private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數 
12 private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

  總的來講,任務提交給線程池以後的處理策略,主要有4個方面:  

  • 若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
  • 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
  • 若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
  • 若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

  3.線程池中的線程初始化  

  默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。

  在實際中,若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法實現:

  • prestartCoreThread():初始化一個核心線程;
  • prestartAllCoreThreads():初始化全部核心線程

  4.任務緩存隊列及排隊策略

  咱們在前面提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。  

  workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:

  1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;

  2)LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;

  3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

  5.任務拒絕策略

  當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:  

1 ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 2 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 3 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) 4 ThreadPoolExecutor.CallerRunsPolicy:調用其餘線程處理該任務

  6.線程池的關閉  

  ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
  • shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務  

  7.線程池容量的動態調整

  ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設置核心池大小
  • setMaximumPoolSize:設置線程池最大能建立的線程數目大小

  當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。

  三.

  線程池的使用示例。

  

 1 public class Test6 {  2     public static void main(String[] args) {  3         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));  4         for(int i=0;i<15;i++){  5             MyTask myTask=new MyTask(i);  6  executor.execute(myTask);  7             System.out.println("線程池中線程數目:"+executor.getPoolSize()+
 8                     ",隊列中等待的任務數目:"+executor.getQueue().size()+
 9                     ",已執行完其餘的任務數目:"+executor.getCompletedTaskCount()); 10  } 11  executor.shutdown(); 12  } 13     static class MyTask implements Runnable{ 14         private int taskNum; 15         public MyTask(int taskNum){ 16             this.taskNum=taskNum; 17  } 18  @Override 19         public void run() { 20             System.out.println("正在執行task:"+taskNum); 21             try { 22                 Thread.currentThread().sleep(1000); 23             } catch (InterruptedException e) { 24  e.printStackTrace(); 25  } 26             System.out.println("task--"+taskNum+"執行完畢"); 27  } 28  } 29 }

  上述代碼執行結果以下:

正在執行task:0 線程池中線程數目:1,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 線程池中線程數目:2,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 正在執行task:1 線程池中線程數目:3,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 正在執行task:2 線程池中線程數目:4,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 正在執行task:3 線程池中線程數目:5,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 線程池中線程數目:5,隊列中等待的任務數目:1,已執行完其餘的任務數目:0 正在執行task:4 線程池中線程數目:5,隊列中等待的任務數目:2,已執行完其餘的任務數目:0 線程池中線程數目:5,隊列中等待的任務數目:3,已執行完其餘的任務數目:0 線程池中線程數目:5,隊列中等待的任務數目:4,已執行完其餘的任務數目:0 線程池中線程數目:5,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 線程池中線程數目:6,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:10 線程池中線程數目:7,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:11 線程池中線程數目:8,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:12 線程池中線程數目:9,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:13 線程池中線程數目:10,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:14 task--0執行完畢 正在執行task:5 task--12執行完畢 task--11執行完畢 正在執行task:6 task--3執行完畢 正在執行task:8 task--4執行完畢 正在執行task:9 task--13執行完畢 task--2執行完畢 正在執行task:7 task--14執行完畢 task--10執行完畢 task--1執行完畢 task--5執行完畢 task--9執行完畢 task--7執行完畢 task--8執行完畢 task--6執行完畢

  從執行結果能夠看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了以後,便建立新的線程。若是上面程序中,將for循環中改爲執行20個任務,就會拋出任務拒絕異常了。

  在Java doc中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池,具體以下:  

1 Executors.newCachedThreadPool();        //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE
2 Executors.newSingleThreadExecutor();   //建立容量爲1的緩衝池
3 Executors.newFixedThreadPool(int);    //建立固定容量大小的緩衝池

  上述三個靜態方法的實現以下:  

 1 public static ExecutorService newFixedThreadPool(int nThreads) {  2     return new ThreadPoolExecutor(nThreads, nThreads,  3                                   0L, TimeUnit.MILLISECONDS,  4                                   new LinkedBlockingQueue<Runnable>());  5 }  6 public static ExecutorService newSingleThreadExecutor() {  7     return new FinalizableDelegatedExecutorService  8         (new ThreadPoolExecutor(1, 1,  9                                 0L, TimeUnit.MILLISECONDS, 10                                 new LinkedBlockingQueue<Runnable>())); 11 } 12 public static ExecutorService newCachedThreadPool() { 13     return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 14                                   60L, TimeUnit.SECONDS, 15                                   new SynchronousQueue<Runnable>()); 16 }

  從上述三個靜態方法的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過都已配置好了參數。  

  newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置爲1,也使用的LinkedBlockingQueue;

  newCachedThreadPool將corePoolSize設置爲0,將maximumPoolSize設置爲Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立線程運行,當線程空閒超過60秒,就銷燬線程。  

  實際使用中,若是Executors提供的三個靜態方法能知足要求,就儘可能使用它提供的三個方法,由於本身去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

  另外,若是ThreadPoolExecutor達不到要求,能夠本身繼承ThreadPoolExecutor類並重寫裏面的方法。

  最後,咱們再討論下如何合理配置線程池的大小?  

  通常狀況下,咱們須要根據任務的類型來配置線程池大小:

  若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1

  若是是IO密集型任務,參考值能夠設置爲2*NCPU

  固然,上述建議只是參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。

相關文章
相關標籤/搜索