零.引子數組
線程是程序運行中一個很是重要的概念。一般狀況下,程序從靜態代碼,到解析爲機器碼被加載入內存開始動態運行,就轉變爲一個進程。也能夠說,程序是一個靜態概念,程序運行起來後就變成了一個進程,進程是計算機分配CPU、內存等各類資源的基本單位。緩存
咱們平時在電腦中開啓一些程序時,好比開啓eclipse,idea等開發工具時,會發現程序啓動較慢,這是由於進程運行所依賴的資源較多,故開啓一個進程耗費的資源和時間也較多。一個進程中每每有多個子任務,從而輪番獲取CPU的執行權,這些須要獲取CPU執行權的子任務就稱爲線程,線程是計算機分配CPU執行權的最小單位。併發
相對於進程而言,線程的運行僅須要CPU執行權便可,耗費資源與時間大大減小。然而,如今的計算機操做系統基本上都承擔着處理高併發任務的職責,若是併發的線程數量不少,而且每一個線程都是執行一個時間很短的任務就結束了,那麼頻繁建立線程就會大大下降系統的效率,這是由於,頻繁建立線程和銷燬線程也須要時間。eclipse
這就不禁讓咱們考慮一個問題:有沒有一種辦法使得線程能夠複用,從而避免線程的頻繁建立?也就是說,當線程執行完一個任務時,並不被銷燬,而是能夠繼續執行其餘的任務?ide
答案是能夠的。在Java中,咱們能夠經過線程池來達到線程複用的效果。咱們接下來詳細講解Java的線程池,爲此,咱們先從最核心的ThreadPoolExecutor類中的方法講起,而後再講述其實現原理,接着給出一個使用示例,最後討論如何合理配置線程池的大小。高併發
一.工具
爲了對線程池有個比較全面的理解,咱們先來看類ThreadPoolExecutor的繼承體系:開發工具
經過上圖可知,接口Executor是線程池體系的頂級接口。上述線程池繼承體系的主要API以下:this
1 public interface Executor { 2 void execute(Runnable command); 3 } 4 public interface ExecutorService extends Executor { 5 void shutdown(); 6 boolean isShutdown(); 7 boolean isTerminated(); 8 boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; 9 <T> Future<T> submit(Callable<T> task); 10 <T> Future<T> submit(Runnable task, T result); 11 Future<?> submit(Runnable task); 12 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; 13 <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, throws InterruptedException; 14 <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; 15 <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) 16 throws InterruptedException, ExecutionException, TimeoutException; 17 } 18 public abstract class AbstractExecutorService implements ExecutorService { 19 protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; 20 protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; 21 public Future<?> submit(Runnable task) {}; 22 public <T> Future<T> submit(Runnable task, T result) { }; 23 public <T> Future<T> submit(Callable<T> task) { }; 24 private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,boolean timed, long nanos) 25 throws InterruptedException, ExecutionException, TimeoutException { }; 26 public <T> T invokeAny(Collection<? extends Callable<T>> tasks) 27 throws InterruptedException, ExecutionException { }; 28 public <T> T invokeAny(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) 29 throws InterruptedException, ExecutionException, TimeoutException { }; 30 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) 31 throws InterruptedException { }; 32 public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,long timeout, TimeUnit unit) 33 throws InterruptedException { 34 }; 35 } 36 public class ThreadPoolExecutor extends AbstractExecutorService { 37 ..... 38 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, 39 BlockingQueue<Runnable> workQueue); 40 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, 41 BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory); 42 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, 43 BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler); 44 public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit, 45 BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler); 46 ... 47 }
在類ThreadPoolExecutor類中,提供了4個構造器,經過對其源代碼觀察,咱們發現前三個構造器都是調用了第四個構造器進行了初始化工做。在此,咱們看下第四個構造器中各個參數的含義。idea
corePoolSize:核心池的大小。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中。若是調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字也能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。
maximumPoolSize:線程池最大線程數,它表示在線程池中最多能建立多少個線程。
keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用。也就是說,當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0。
unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
1 TimeUnit.DAYS; //天 2 TimeUnit.HOURS; //小時 3 TimeUnit.MINUTES; //分鐘 4 TimeUnit.SECONDS; //秒 5 TimeUnit.MILLISECONDS; //毫秒 6 TimeUnit.MICROSECONDS; //微妙 7 TimeUnit.NANOSECONDS; //納秒
workQueue:一個阻塞隊列,用來存儲等待執行的任務,會對線程池的運行過程產生重大影響。通常來講,這裏的阻塞隊列有如下幾種選擇:
1 ArrayBlockingQueue; 2 PriorityBlockingQueue; 3 //上面兩種使用較少,咱們一般使用下面兩種 4 LinkedBlockingQueue; 5 SynchronousQueue; 6 //線程池的排隊策略與BlockingQueue有關
threadFactory:線程工廠,主要用來建立線程。
handler:表示當拒絕處理任務時的策略,有如下四種取值:
1 ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 2 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 3 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) 4 ThreadPoolExecutor.CallerRunsPolicy:調用其餘線程處理該任務
在介紹了類ThreadPoolExecutor的構造器裏的參數的含義後,咱們接下來看該類中幾個很是重要的方法:
1 execute() 2 submit() 3 shutdown() 4 shutdownNow()
execute()方法其實是接口Executor中聲明的方法,在類ThreadPoolExecutor中進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。
submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務的執行結果。
shutdown()和shutdownNow()是用來關閉線程池的。
另外,類ThreadPoolExecutor中還定義了一些其餘的方法,如getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關的屬性。
二.
咱們在上面從宏觀層面介紹了類ThreadPoolExecutor,咱們接下來從幾個方面深刻了解線程池的內部實現原理。
1.線程池狀態
2.任務的執行
3.線程池中的線程初始化
4.任務緩存隊列及排隊策略
5.任務拒絕策略
6.線程池的關閉
7.線程池容量的動態調整
在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:
1 volatile int runState; 2 static final int RUNNING = 0; 3 static final int SHUTDOWN = 1; 4 static final int STOP = 2; 5 static final int TERMINATED = 3;
runState表示當前線程池的狀態,它用一個volatile變量來保證線程之間的可見性。runState下面的幾個static final變量則表示runState可能的幾個取值:
當建立線程池後,初始時,線程池處於RUNNING狀態;
若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢;
若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;
當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。
2.任務的執行
咱們先來看一下類ThreadPoolExecutor中定義的一些比較重要的成員變量:
1 private final BlockingQueue<Runnable> workQueue; //任務緩存隊列,用來存放等待執行的任務 2 private final ReentrantLock mainLock = new ReentrantLock(); //線程池的主要狀態鎖,對線程池狀態(好比線程池大小,runState等)的改變都要使用這個鎖 3 private final HashSet<Worker> workers = new HashSet<Worker>(); //用來存放工做集 4 private volatile long keepAliveTime; //線程存活時間 5 private volatile boolean allowCoreThreadTimeOut; //是否容許爲核心線程設置存活時間 6 private volatile int corePoolSize; //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列) 7 private volatile int maximumPoolSize; //線程池最大能容忍的線程數 8 private volatile int poolSize; //線程池中當前的線程數 9 private volatile RejectedExecutionHandler handler; //任務拒絕策略 10 private volatile ThreadFactory threadFactory; //線程工廠,用來建立線程 11 private int largestPoolSize; //用來記錄線程池中曾經出現過的最大線程數 12 private long completedTaskCount; //用來記錄已經執行完畢的任務個數
總的來講,任務提交給線程池以後的處理策略,主要有4個方面:
- 若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
- 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
- 若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
- 若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。
3.線程池中的線程初始化
默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。
在實際中,若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法實現:
- prestartCoreThread():初始化一個核心線程;
- prestartAllCoreThreads():初始化全部核心線程
4.任務緩存隊列及排隊策略
咱們在前面提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。
workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:
1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;
2)LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。
5.任務拒絕策略
當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:
1 ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 2 ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 3 ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) 4 ThreadPoolExecutor.CallerRunsPolicy:調用其餘線程處理該任務
6.線程池的關閉
ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:
- shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
- shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務
7.線程池容量的動態調整
ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),
- setCorePoolSize:設置核心池大小
- setMaximumPoolSize:設置線程池最大能建立的線程數目大小
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。
三.
線程池的使用示例。
1 public class Test6 { 2 public static void main(String[] args) { 3 ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); 4 for(int i=0;i<15;i++){ 5 MyTask myTask=new MyTask(i); 6 executor.execute(myTask); 7 System.out.println("線程池中線程數目:"+executor.getPoolSize()+ 8 ",隊列中等待的任務數目:"+executor.getQueue().size()+ 9 ",已執行完其餘的任務數目:"+executor.getCompletedTaskCount()); 10 } 11 executor.shutdown(); 12 } 13 static class MyTask implements Runnable{ 14 private int taskNum; 15 public MyTask(int taskNum){ 16 this.taskNum=taskNum; 17 } 18 @Override 19 public void run() { 20 System.out.println("正在執行task:"+taskNum); 21 try { 22 Thread.currentThread().sleep(1000); 23 } catch (InterruptedException e) { 24 e.printStackTrace(); 25 } 26 System.out.println("task--"+taskNum+"執行完畢"); 27 } 28 } 29 }
上述代碼執行結果以下:
正在執行task:0 線程池中線程數目:1,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 線程池中線程數目:2,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 正在執行task:1 線程池中線程數目:3,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 正在執行task:2 線程池中線程數目:4,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 正在執行task:3 線程池中線程數目:5,隊列中等待的任務數目:0,已執行完其餘的任務數目:0 線程池中線程數目:5,隊列中等待的任務數目:1,已執行完其餘的任務數目:0 正在執行task:4 線程池中線程數目:5,隊列中等待的任務數目:2,已執行完其餘的任務數目:0 線程池中線程數目:5,隊列中等待的任務數目:3,已執行完其餘的任務數目:0 線程池中線程數目:5,隊列中等待的任務數目:4,已執行完其餘的任務數目:0 線程池中線程數目:5,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 線程池中線程數目:6,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:10 線程池中線程數目:7,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:11 線程池中線程數目:8,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:12 線程池中線程數目:9,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:13 線程池中線程數目:10,隊列中等待的任務數目:5,已執行完其餘的任務數目:0 正在執行task:14 task--0執行完畢 正在執行task:5 task--12執行完畢 task--11執行完畢 正在執行task:6 task--3執行完畢 正在執行task:8 task--4執行完畢 正在執行task:9 task--13執行完畢 task--2執行完畢 正在執行task:7 task--14執行完畢 task--10執行完畢 task--1執行完畢 task--5執行完畢 task--9執行完畢 task--7執行完畢 task--8執行完畢 task--6執行完畢
從執行結果能夠看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了以後,便建立新的線程。若是上面程序中,將for循環中改爲執行20個任務,就會拋出任務拒絕異常了。
在Java doc中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池,具體以下:
1 Executors.newCachedThreadPool(); //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE 2 Executors.newSingleThreadExecutor(); //建立容量爲1的緩衝池 3 Executors.newFixedThreadPool(int); //建立固定容量大小的緩衝池
上述三個靜態方法的實現以下:
1 public static ExecutorService newFixedThreadPool(int nThreads) { 2 return new ThreadPoolExecutor(nThreads, nThreads, 3 0L, TimeUnit.MILLISECONDS, 4 new LinkedBlockingQueue<Runnable>()); 5 } 6 public static ExecutorService newSingleThreadExecutor() { 7 return new FinalizableDelegatedExecutorService 8 (new ThreadPoolExecutor(1, 1, 9 0L, TimeUnit.MILLISECONDS, 10 new LinkedBlockingQueue<Runnable>())); 11 } 12 public static ExecutorService newCachedThreadPool() { 13 return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 14 60L, TimeUnit.SECONDS, 15 new SynchronousQueue<Runnable>()); 16 }
從上述三個靜態方法的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過都已配置好了參數。
newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;
newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置爲1,也使用的LinkedBlockingQueue;
newCachedThreadPool將corePoolSize設置爲0,將maximumPoolSize設置爲Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立線程運行,當線程空閒超過60秒,就銷燬線程。
實際使用中,若是Executors提供的三個靜態方法能知足要求,就儘可能使用它提供的三個方法,由於本身去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。
另外,若是ThreadPoolExecutor達不到要求,能夠本身繼承ThreadPoolExecutor類並重寫裏面的方法。
最後,咱們再討論下如何合理配置線程池的大小?
通常狀況下,咱們須要根據任務的類型來配置線程池大小:
若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1
若是是IO密集型任務,參考值能夠設置爲2*NCPU
固然,上述建議只是參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。