java多線程詳解(7)-線程池的使用

在前面的文章中,咱們使用線程的時候就去建立一個線程,這樣實現起來很是簡便,可是就會有一個問題:html

若是併發的線程數量不少,而且每一個線程都是執行一個時間很短的任務就結束了,java

這樣頻繁建立線程就會大大下降系統的效率,由於頻繁建立線程和銷燬線程須要時間。數組

這個是時候咱們須要使用線程池技術建立多線程。緩存

本文目錄大綱:多線程

一.Java中的ThreadPoolExecutor類併發

二.深刻剖析線程池實現原理ide

三.使用示例oop

四.如何合理配置線程池的大小ui

 

一.Java中的ThreadPoolExecutor類

java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的類,所以若是要深刻理解Java中的線程池,this

必須深刻理解這個類。咱們來看一下ThreadPoolExecutor類的源碼。

ThreadPoolExecutor類中提供了四個構造方法:

 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } 
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } 
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), handler); } 
 public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

從上面的代碼能夠得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,

而且前面三個構造器都是調用的第四個構造器進行的初始化工做。

 下面解釋下一下構造器中各個參數的含義:

corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,

線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()

或者prestartCoreThread()方法,從這2個方法的名字就能夠看出,是預建立線程的意思,

即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;

maximumPoolSize:線程池最大線程數,這個參數也是一個很是重要的參數,它表示在線程池中最多能建立多少個線程;

keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,

keepAliveTime纔會起做用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,

若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。

可是若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,

keepAliveTime參數也會起做用,直到線程池中的線程數爲0;

unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:

TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒

workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇

ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

threadFactory:線程工廠,主要用來建立線程;

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務 

具體參數的配置與線程池的關係將在下一節講述。

從上面給出的ThreadPoolExecutor類的代碼能夠知道,ThreadPoolExecutor繼承了AbstractExecutorService,看一下AbstractExecutorService的實現:

handler:表示當拒絕處理任務時的策略,有如下四種取值:

public abstract class AbstractExecutorService implements ExecutorService { protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { }; protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { }; public Future<?> submit(Runnable task) {}; public <T> Future<T> submit(Runnable task, T result) { }; public <T> Future<T> submit(Callable<T> task) { }; private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks, boolean timed, long nanos) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException { }; public <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException { }; public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException { }; }

AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。

咱們看ExecutorService接口的實現:

public interface ExecutorService extends Executor { void shutdown(); boolean isShutdown(); boolean isTerminated(); boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException; <T> Future<T> submit(Callable<T> task); <T> Future<T> submit(Runnable task, T result); Future<?> submit(Runnable task); <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) throws InterruptedException; <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException; <T> T invokeAny(Collection<? extends Callable<T>> tasks) throws InterruptedException, ExecutionException; <T> T invokeAny(Collection<? extends Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException; }

而ExecutorService又是繼承了Executor接口,咱們看一下Executor接口的實現:

public interface Executor { void execute(Runnable command); }

咱們知道 ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關係了。

Executor是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable),返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的;

而後ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法;

而後ThreadPoolExecutor繼承了類AbstractExecutorService。

在ThreadPoolExecutor類中有幾個很是重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法其實是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,

經過這個方法能夠向線程池提交一個任務,交由線程池去執行。

submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,

在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,

它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,

只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

shutdown()和shutdownNow()是用來關閉線程池的。

二.深刻剖析線程池實現原理

在上一節咱們從宏觀上介紹了ThreadPoolExecutor,下面咱們來深刻解析一下線程池的具體實現原理,將從下面幾個方面講解:

1.線程池狀態

2.任務的執行

3.線程池中的線程初始化

4.任務緩存隊列及排隊策略

5.任務拒絕策略

6.線程池的關閉

7.線程池容量的動態調整

 

1.線程池狀態

在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:

 

volatile int runState; static final int RUNNING    = 0; static final int SHUTDOWN   = 1; static final int STOP       = 2; static final int TERMINATED = 3;

runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;

下面的幾個static final變量表示runState可能的幾個取值。

當建立線程池後,初始時,線程池處於RUNNING狀態;

若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢;

若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;

當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。

 

2.任務的執行

在瞭解將任務提交給線程池到任務執行完畢整個過程以前,咱們先來看一下ThreadPoolExecutor類中其餘的一些比較重要成員變量:

private final BlockingQueue<Runnable> workQueue;              //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態鎖,對線程池狀態(好比線程池大小 //、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工做集
 
private volatile long  keepAliveTime;    //線程存貨時間 
private volatile boolean allowCoreThreadTimeOut;   //是否容許爲核心線程設置存活時間
private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數
 
private volatile int   poolSize;       //線程池中當前的線程數
 
private volatile RejectedExecutionHandler handler; //任務拒絕策略
 
private volatile ThreadFactory threadFactory;   //線程工廠,用來建立線程
 
private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數
 
private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

每一個變量的做用都已經標明出來了,這裏要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

corePoolSize在不少地方被翻譯成核心池大小,其實個人理解這個就是線程池的大小

3.線程池中的線程初始化

默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。

在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:

prestartCoreThread():初始化一個核心線程;

prestartAllCoreThreads():初始化全部核心線程

下面是這2個方法的實現:

public boolean prestartCoreThread() { return addIfUnderCorePoolSize(null); //注意傳進去的參數是null
} public int prestartAllCoreThreads() { int n = 0; while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
        ++n; return n; }

注意上面傳進去的參數是null,根據第2小節的分析可知若是傳進去的參數爲null,則最後執行線程會阻塞在getTask方法中的

r = workQueue.take();

即等待任務隊列中有任務。

4.任務緩存隊列及排隊策略

在前面咱們屢次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:

(1).ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;

(2).LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;

(3).synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

5.任務拒絕策略

當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

6.線程池的關閉

ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務

shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

7.線程池容量的動態調整

ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

setCorePoolSize:設置核心池大小

setMaximumPoolSize:設置線程池最大能建立的線程數目大小

當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。

三.使用實例

public class Test { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<15;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+ executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } } class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在執行task "+taskNum); try { Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+taskNum+"執行完畢"); } }

從執行結果能夠看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了以後,便建立新的線程。

若是上面程序中,將for循環中改爲執行20個任務,就會拋出任務拒絕異常了。

不過在java中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池:

Executors.newCachedThreadPool();        //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //建立容量爲1的緩衝池
Executors.newFixedThreadPool(int);    //建立固定容量大小的緩衝池

下面是這三個靜態方法的具體實現;

public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); }

代碼以下:

/**
 * 經過線程池實現多線程
 * 
 * @author cary
 * @version 1.0.0
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
        ExecutorService threadPool1 = Executors.newFixedThreadPool(5);
        ExecutorService threadPool2 = Executors.newCachedThreadPool();
        ExecutorService threadPool = Executors.newSingleThreadExecutor();
        for (int i = 0; i < 10; i++) {
            final int task = i;
            threadPool.execute(new Runnable() {

                @Override
                public void run() {
                    for (int j = 1; j <= 10; j++) {
                        try {
                            Thread.sleep(20);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                        System.out.println(Thread.currentThread().getName()
                                + " is looping of " + j + " for  task of "
                                + task);
                    }

                }
            });
        }

        System.out.println("all of 10 tasks have committed! ");
        // threadPool.shutdownNow();

        Executors.newScheduledThreadPool(3).scheduleAtFixedRate(new Runnable() {
            public void run() {
                System.out.println("begining!");

            }
        }, 6, 2, TimeUnit.SECONDS);
    }

從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置爲1,也使用的LinkedBlockingQueue;

newCachedThreadPool將corePoolSize設置爲0,將maximumPoolSize設置爲Integer.MAX_VALUE,使用的SynchronousQueue,

也就是說來了任務就建立線程運行,當線程空閒超過60秒,就銷燬線程。

實際中,若是Executors提供的三個靜態方法能知足要求,就儘可能使用它提供的三個方法,由於本身去手動配置ThreadPoolExecutor的參數有點麻煩,

要根據實際任務的類型和數量來進行配置。

另外,若是ThreadPoolExecutor達不到要求,能夠本身繼承ThreadPoolExecutor類進行重寫。

四.如何合理配置線程池的大小

如何合理配置線程池大小,僅供參考。

通常須要根據任務的類型來配置線程池大小:

若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1

若是是IO密集型任務,參考值能夠設置爲2*NCPU

固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,

再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。

參考資料:

http://ifeve.com/java-threadpool/

http://blog.163.com/among_1985/blog/static/275005232012618849266/

http://developer.51cto.com/art/201203/321885.htm

http://blog.csdn.net/java2000_wl/article/details/22097059

http://blog.csdn.net/cutesource/article/details/6061229

http://blog.csdn.net/xieyuooo/article/details/8718741

http://www.cnblogs.com/dolphin0520/p/3932921.html

相關文章
相關標籤/搜索