Java線程池(ThreadPool)詳解

線程五個狀態(生命週期):

線程運行時間

    假設一個服務器完成一項任務所需時間爲:T1 建立線程時間,T2 在線程中執行任務的時間,T3 銷燬線程時間。

       若是:T1 + T3 遠大於 T2,則能夠採用線程池,以提升服務器性能。
html

線程池技術

  一個線程池包括如下四個基本組成部分java

       一、線程池管理器(ThreadPool):用於建立並管理線程池,包括 建立線程池,銷燬線程池,添加新任務;編程

 

                二、工做線程(PoolWorker):線程池中線程,在沒有任務時處於等待狀態,能夠循環的執行任務;數組

                三、任務接口(Task):每一個任務必須實現的接口,以供工做線程調度任務的執行,它主要規定了任務的入口,任務執行完後的收尾工做,任務的執行狀態等;緩存

                四、任務隊列(taskQueue):用於存放沒有處理的任務。提供一種緩衝機制。安全

   線程池技術正是關注如何縮短或調整T1,T3時間的技術,從而提升服務器程序性能的。它把T1,T3分別安排在服務器程序的啓動和結束的時間段或者一些空閒的時間段,這樣在服務器程序處理客戶請求時,不會有T1,T3的開銷了。
  線程池不只調整T1,T3產生的時間段,並且它還顯著減小了建立線程的數目,看一個例子:
  假設一個服務器一天要處理50000個請求,而且每一個請求須要一個單獨的線程完成。在線程池中,線程數通常是固定的,因此產生線程總數不會超過線程池中線程的數目,而若是服務器不利用線程池來處理這些請求則線程總數爲50000。通常線程池大小是遠小於50000。因此利用線程池的服務器程序不會爲了建立50000而在處理請求時浪費時間,從而提升效率。服務器

 

java併發編程:線程池的使用併發

    咱們使用線程的時候就去建立一個線程,這樣實現起來很是簡便,可是就會有一個問題:ide

  若是併發的線程數量不少,而且每一個線程都是執行一個時間很短的任務就結束了,這樣頻繁建立線程就會大大下降系統的效率,由於頻繁建立線程和銷燬線程須要時間。post

  那麼有沒有一種辦法使得線程能夠複用,就是執行完一個任務,並不被銷燬,而是能夠繼續執行其餘的任務?

  在Java中能夠經過線程池來達到這樣的效果。下面咱們就來詳細講解一下Java的線程池,首先咱們從最核心的ThreadPoolExecutor類中的方法講起,而後再講述它的實現原理,接着給出了它的使用示例,最後討論了一下如何合理配置線程池的大小。

  本文的目錄大綱:

  一.Java中的ThreadPoolExecutor類

  二.深刻剖析線程池實現原理

  三.使用示例

  四.如何合理配置線程池的大小 

  如有不正之處請多多諒解,並歡迎批評指正。 

一.Java中的ThreadPoolExecutor類

  java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,所以若是要透徹地瞭解Java中的線程池,必須先了解這個類。下面咱們來看一下ThreadPoolExecutor類的具體實現源碼。

  在ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);

   從上面的代碼能夠得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,經過觀察每一個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工做。

   下面解釋下一下構造器中各個參數的含義:

  • corePoolSize:核心池的大小,這個參數與後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中
  • maximumPoolSize:線程池最大線程數,它表示在線程池中最多能建立多少個線程
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用,直到線程池中的線程數不大於corePoolSize:即當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize;可是若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0;
  • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性,以及TimeUtil的源碼展現
TimeUnit.DAYS;               //
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
public enum TimeUnit {
    NANOSECONDS {
        public long toNanos(long d)   { return d; }
        public long toMicros(long d)  { return d/(C1/C0); }
        public long toMillis(long d)  { return d/(C2/C0); }
        public long toSeconds(long d) { return d/(C3/C0); }
        public long toMinutes(long d) { return d/(C4/C0); }
        public long toHours(long d)   { return d/(C5/C0); }
        public long toDays(long d)    { return d/(C6/C0); }
        public long convert(long d, TimeUnit u) { return u.toNanos(d); }
        int excessNanos(long d, long m) { return (int)(d - (m*C2)); }
    },
    MICROSECONDS {
        public long toNanos(long d)   { return x(d, C1/C0, MAX/(C1/C0)); }
        public long toMicros(long d)  { return d; }
        public long toMillis(long d)  { return d/(C2/C1); }
        public long toSeconds(long d) { return d/(C3/C1); }
        public long toMinutes(long d) { return d/(C4/C1); }
        public long toHours(long d)   { return d/(C5/C1); }
        public long toDays(long d)    { return d/(C6/C1); }
        public long convert(long d, TimeUnit u) { return u.toMicros(d); }
        int excessNanos(long d, long m) { return (int)((d*C1) - (m*C2)); }
    },
    MILLISECONDS {
        public long toNanos(long d)   { return x(d, C2/C0, MAX/(C2/C0)); }
        public long toMicros(long d)  { return x(d, C2/C1, MAX/(C2/C1)); }
        public long toMillis(long d)  { return d; }
        public long toSeconds(long d) { return d/(C3/C2); }
        public long toMinutes(long d) { return d/(C4/C2); }
        public long toHours(long d)   { return d/(C5/C2); }
        public long toDays(long d)    { return d/(C6/C2); }
        public long convert(long d, TimeUnit u) { return u.toMillis(d); }
        int excessNanos(long d, long m) { return 0; }
    },
    SECONDS {
        public long toNanos(long d)   { return x(d, C3/C0, MAX/(C3/C0)); }
        public long toMicros(long d)  { return x(d, C3/C1, MAX/(C3/C1)); }
        public long toMillis(long d)  { return x(d, C3/C2, MAX/(C3/C2)); }
        public long toSeconds(long d) { return d; }
        public long toMinutes(long d) { return d/(C4/C3); }
        public long toHours(long d)   { return d/(C5/C3); }
        public long toDays(long d)    { return d/(C6/C3); }
        public long convert(long d, TimeUnit u) { return u.toSeconds(d); }
        int excessNanos(long d, long m) { return 0; }
    },
    MINUTES {
        public long toNanos(long d)   { return x(d, C4/C0, MAX/(C4/C0)); }
        public long toMicros(long d)  { return x(d, C4/C1, MAX/(C4/C1)); }
        public long toMillis(long d)  { return x(d, C4/C2, MAX/(C4/C2)); }
        public long toSeconds(long d) { return x(d, C4/C3, MAX/(C4/C3)); }
        public long toMinutes(long d) { return d; }
        public long toHours(long d)   { return d/(C5/C4); }
        public long toDays(long d)    { return d/(C6/C4); }
        public long convert(long d, TimeUnit u) { return u.toMinutes(d); }
        int excessNanos(long d, long m) { return 0; }
    },
    HOURS {
        public long toNanos(long d)   { return x(d, C5/C0, MAX/(C5/C0)); }
        public long toMicros(long d)  { return x(d, C5/C1, MAX/(C5/C1)); }
        public long toMillis(long d)  { return x(d, C5/C2, MAX/(C5/C2)); }
        public long toSeconds(long d) { return x(d, C5/C3, MAX/(C5/C3)); }
        public long toMinutes(long d) { return x(d, C5/C4, MAX/(C5/C4)); }
        public long toHours(long d)   { return d; }
        public long toDays(long d)    { return d/(C6/C5); }
        public long convert(long d, TimeUnit u) { return u.toHours(d); }
        int excessNanos(long d, long m) { return 0; }
    },
    DAYS {
        public long toNanos(long d)   { return x(d, C6/C0, MAX/(C6/C0)); }
        public long toMicros(long d)  { return x(d, C6/C1, MAX/(C6/C1)); }
        public long toMillis(long d)  { return x(d, C6/C2, MAX/(C6/C2)); }
        public long toSeconds(long d) { return x(d, C6/C3, MAX/(C6/C3)); }
        public long toMinutes(long d) { return x(d, C6/C4, MAX/(C6/C4)); }
        public long toHours(long d)   { return x(d, C6/C5, MAX/(C6/C5)); }
        public long toDays(long d)    { return d; }
        public long convert(long d, TimeUnit u) { return u.toDays(d); }
        int excessNanos(long d, long m) { return 0; }
    };

    static final long C0 = 1L;
    static final long C1 = C0 * 1000L;
    static final long C2 = C1 * 1000L;
    static final long C3 = C2 * 1000L;
    static final long C4 = C3 * 60L;
    static final long C5 = C4 * 60L;
    static final long C6 = C5 * 24L;

    static final long MAX = Long.MAX_VALUE;

    static long x(long d, long m, long over) {
        if (d >  over) return Long.MAX_VALUE;
        if (d < -over) return Long.MIN_VALUE;
        return d * m;
    }
    public long convert(long sourceDuration, TimeUnit sourceUnit) {
        throw new AbstractMethodError();
    }
    public long toNanos(long duration) {
        throw new AbstractMethodError();
    }
    public long toMicros(long duration) {
        throw new AbstractMethodError();
    }
    public long toMillis(long duration) {
        throw new AbstractMethodError();
    }
    public long toSeconds(long duration) {
        throw new AbstractMethodError();
    }
    public long toMinutes(long duration) {
        throw new AbstractMethodError();
    }
    public long toHours(long duration) {
        throw new AbstractMethodError();
    }
    public long toDays(long duration) {
        throw new AbstractMethodError();
    }
    public void timedWait(Object obj, long timeout)
            throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            obj.wait(ms, ns);
        }
    }
    public void timedJoin(Thread thread, long timeout)
            throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            thread.join(ms, ns);
        }
    }
    public void sleep(long timeout) throws InterruptedException {
        if (timeout > 0) {
            long ms = toMillis(timeout);
            int ns = excessNanos(timeout, ms);
            Thread.sleep(ms, ns);
        }
    }

}
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇:
ArrayBlockingQueue;
LinkedBlockingQueue;
SynchronousQueue;

  ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • threadFactory:線程工廠,主要用來建立線程;
  • handler:表示當拒絕處理任務時的策略,有如下四種取值:
ThreadPoolExecutor.AbortPolicy;//丟棄任務並拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy;//也是丟棄任務,可是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy;//丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy;//由調用線程處理該任務 

   具體參數的配置與線程池的關係將在下一節講述。

public class ThreadPoolExecutor extends AbstractExecutorService {
.......
public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial parameters and default rejected execution handler.*/
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial parameters and default thread factory.*/
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }
    /**
     * Creates a new {@code ThreadPoolExecutor} with the given initial parameters.*/
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }
.......
}

  從ThreadPoolExecutor類的代碼能夠知道,ThreadPoolExecutor繼承了AbstractExecutorService,咱們來看一下AbstractExecutorService的源碼:

public abstract class AbstractExecutorService implements ExecutorService {
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

   AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。

  咱們接着看看ExecutorService接口的源代碼:

public interface ExecutorService extends Executor {
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

   而ExecutorService又是繼承了Executor接口,咱們看一下Executor接口的實現:

public interface Executor {
    void execute(Runnable command);
}

   到這裏,咱們就應該明白了ThreadPoolExecutor類、AbstractExecutorService類、ExecutorService接口和Executor接口幾個之間的關係了。

  Executor是最頂層接口,在它裏面只聲明瞭一個方法execute(Runnable),返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的;

  ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

  AbstractExecutorService抽象類實現了ExecutorService接口,基本實現了ExecutorService接口中聲明的全部方法;

  ThreadPoolExecutor類繼承了抽象類AbstractExecutorService

  在ThreadPoolExecutor類中有幾個很是重要的方法:   

    public void execute(Runnable command) {...........}
    public void shutdown() {........}
    public List<Runnable> shutdownNow() {..............}

  繼承AbstractExecutorService抽象類中的方法

public Future<?> submit(Runnable task) {
  if (task == null) throw new NullPointerException();
    RunnableFuture<Void> ftask = newTaskFor(task, null);
    execute(ftask);
    return ftask;
}

  execute()方法其實是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。

  submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務執行結果。

  shutdown()和shutdownNow()是用來關閉線程池的。

  還有不少其餘的方法:

  好比:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法,有興趣的朋友能夠自行查閱API。

二.剖析線程池實現原理

  以上內容,咱們從宏觀上介紹了ThreadPoolExecutor,下面咱們來深刻分析一下線程池的具體實現原理,將從下面幾個方面講解:

  1.線程池狀態     2.任務的執行     3.線程池中的線程初始化  4.任務緩存隊列及排隊策略  

  5.任務拒絕策略  6.線程池的關閉  7.線程池容量的動態調整

1.線程池狀態

  在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:

// runState is stored in the high-order bits
volatile
int runState;
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;

  runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;

  下面的幾個static final變量表示runState可能的幾個取值。

  當建立線程池後,初始時,線程池處於RUNNING狀態;

  若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢;

  若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;

  當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。

2.任務的執行

  在瞭解將任務提交給線程池到任務執行完畢整個過程以前,咱們先來看一下ThreadPoolExecutor類中其餘的一些比較重要成員變量:

private final BlockingQueue<Runnable> workQueue;              //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態鎖,對線程池狀態(好比線程池大小
                                                              //、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工做集
 
private volatile long  keepAliveTime;    //線程存貨時間   
private volatile boolean allowCoreThreadTimeOut;   //是否容許爲核心線程設置存活時間
private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數
 
private volatile int   poolSize;       //線程池中當前的線程數
 
private volatile RejectedExecutionHandler handler; //任務拒絕策略
 
private volatile ThreadFactory threadFactory;   //線程工廠,用來建立線程
 
private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數
 
private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

   每一個變量都有其各自的做用,這裏要重點解釋一下corePoolSize、maximumPoolSize、largestPoolSize三個變量。

  corePoolSize在不少地方被翻譯成核心池大小,其實個人理解這個就是線程池的大小。舉個簡單的例子:

  假若有一個工廠,工廠裏面有10個工人,每一個工人同時只能作一件任務。

  所以只要當10個工人中有工人是空閒的,來了任務就分配給空閒的工人作;

  當10個工人都有任務在作時,若是還來了任務,就把任務進行排隊等待;

  若是說新任務數目增加的速度遠遠大於工人作任務的速度,那麼此時工廠主管可能會想補救措施,好比從新招4個臨時工人進來;

  而後就將任務也分配給這4個臨時工人作;

  若是說着14個工人作任務的速度仍是不夠,此時工廠主管可能就要考慮再也不接收新的任務或者拋棄前面的一些任務了。

  當這14個工人當中有人空閒時,而新任務增加的速度又比較緩慢,工廠主管可能就考慮辭掉4個臨時工了,只保持原來的10個工人,畢竟請額外的工人是要花錢的。

 

  這個例子中的corePoolSize就是10,而maximumPoolSize就是14(10+4)。

  也就是說corePoolSize就是線程池大小,maximumPoolSize在我看來是線程池的一種補救措施,即任務量忽然過大時的一種補救措施。

  不過爲了方便理解,在本文後面仍是將corePoolSize翻譯成核心池大小。

  largestPoolSize只是一個用來起記錄做用的變量,用來記錄線程池中曾經有過的最大線程數目,跟線程池的容量沒有任何關係

 

  下面咱們看一下任務從提交到最終執行完畢經歷了哪些過程。

  JDK1.5  execute()方法實現原理

  在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然經過submit也能夠提交任務,可是實際上submit方法裏面最終調用的仍是execute()方法,因此咱們只須要研究execute()方法的實現原理便可:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

   上面的代碼可能看起來不是那麼容易理解,下面咱們一句一句解釋:

  首先,判斷提交的任務command是否爲null,如果null,則拋出空指針異常;

  接着是這句,這句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

   因爲是或條件運算符,因此先計算前半部分的值,若是線程池中當前線程數不小於核心池大小,那麼就會直接進入下面的if語句塊了。

  若是線程池中當前線程數小於核心池大小,則接着執行後半部分,也就是執行

addIfUnderCorePoolSize(command)

  若是執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,不然整個方法就直接執行完畢了。

  若是執行完addIfUnderCorePoolSize這個方法返回false,而後接着判斷:

if (runState == RUNNING && workQueue.offer(command))

   若是當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列;若是當前線程池不處於RUNNING狀態或者任務放入緩存隊列失敗,則執行:

addIfUnderMaximumPoolSize(command)

  若是執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。

  回到前面:

if (runState == RUNNING && workQueue.offer(command))

   這句的執行,若是說當前線程池處於RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:

if (runState != RUNNING || poolSize == 0)

   這句判斷是爲了防止在將此任務添加進任務緩存隊列的同時其餘線程忽然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。若是是這樣就執行:

ensureQueuedTaskHandled(command)

   進行應急處理,從名字能夠看出是保證 添加到任務緩存隊列中的任務獲得處理。

咱們接着看2個關鍵方法的實現:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //建立線程去執行firstTask任務   
        } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

  這個是addIfUnderCorePoolSize方法的具體實現,從名字能夠看出它的意圖就是當低於核心池大小時執行的方法。下面看其具體實現,首先獲取到鎖,由於這地方涉及到線程池狀態的變化,先經過if語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小纔會執行addIfUnderCorePoolSize方法的,爲什麼這地方還要繼續判斷?緣由很簡單,前面的判斷過程當中並無加鎖,所以可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完以後,在其餘線程中又向線程池提交了任務,就可能致使poolSize不小於corePoolSize了,因此須要在這個地方繼續判斷。而後接着判斷線程池的狀態是否爲RUNNING,緣由也很簡單,由於有可能在其餘線程中調用了shutdown或者shutdownNow方法。而後就是執行

t = addThread(firstTask);

   這個方法也很是關鍵,傳進去的參數爲提交的任務,返回值爲Thread類型。而後接着在下面判斷t是否爲空,爲空則代表建立線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),不然調用t.start()方法啓動線程。

  咱們來看一下addThread方法的實現:

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //建立一個線程,執行任務   
    if (t != null) {
        w.thread = t;            //將建立的線程的引用賦值爲w的成員變量       
        workers.add(w);
        int nt = ++poolSize;     //當前線程數加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

   在addThread方法中,首先用提交的任務建立了一個Worker對象,而後調用線程工廠threadFactory建立了一個新的線程t,而後將線程t的引用賦值給了Worker對象的成員變量thread,接着經過workers.add(w)將Worker對象添加到工做集當中。

 

JDK1.7版本execute()方法實現略有不一樣

 public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

  下面咱們一句一句理解:

  首先,判斷提交的任務command是否爲null,如果null,則拋出空指針異常;

  接着是這句,這句要好好理解一下:

if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
  }

   若是線程池中當前線程數<核心池大小,就麼就不進入執行如下代碼,若是小於核心池大小,線程正在運行,嘗試將給定的命令做爲首要任務啓動一個新線程。通知addworker自動檢查runstate和workercount,因此添加線程時,防止誤報,它不該該,經過返回false。

官方這麼解釋

/*If fewer than corePoolSize threads are running, try to start a new thread with the given command as its first task.  The call to addWorker atomic  ally checks runState and workerCount, and so prevents false alarms that would add threads when it shouldn't, by returning false.
*/

  

if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
    }

若是線程池中當前線程數>=核心池大小,就麼就不進入執行以上代碼,官方解釋:

/* If a task can be successfully queued, then we still need
  * to double-check whether we should have added a thread
  * (because existing ones died since last checking) or that
  * the pool shut down since entry into this method. So we
  * recheck state and if necessary roll back the enqueuing if
  * stopped, or start a new thread if there are none.
  */

 

else if (!addWorker(command, false))
            reject(command);

官方解釋

/*If we cannot queue task, then we try to add a new
    * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task
    */

 

下面咱們看一下Worker類的源碼,在1.5JDK中Worker類只實現了Runnable接口:

public class ThreadPoolExecutor extends AbstractExecutorService {
.........省略部分代碼.......
private final class Worker implements Runnable { private final ReentrantLock runLock = new ReentrantLock(); private Runnable firstTask; volatile long completedTasks; Thread thread; Worker(Runnable firstTask) { this.firstTask = firstTask; } boolean isActive() { return runLock.isLocked(); } void interruptIfIdle() { final ReentrantLock runLock = this.runLock; if (runLock.tryLock()) { try { if (thread != Thread.currentThread()) thread.interrupt(); } finally { runLock.unlock(); } } } void interruptNow() { thread.interrupt(); } private void runTask(Runnable task) { final ReentrantLock runLock = this.runLock; runLock.lock(); try { if (runState < STOP && Thread.interrupted() && runState >= STOP) boolean ran = false; beforeExecute(thread, task); //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶能夠根據 //本身須要重載這個方法和後面的afterExecute方法來進行一些統計信息,好比某個任務的執行時間等 try { task.run(); ran = true; afterExecute(task, null); ++completedTasks; } catch (RuntimeException ex) { if (!ran) afterExecute(task, ex); throw ex; } } finally { runLock.unlock(); } } public void run() { try { Runnable task = firstTask; firstTask = null; while (task != null || (task = getTask()) != null) { runTask(task); task = null; } } finally { workerDone(this); //當任務隊列中沒有任務時,進行清理工做 } } }
..........省略部分代碼........ }

 

   它實際上實現了Runnable接口,所以上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本同樣:

Thread t = new Thread(w);

   至關於傳進去了一個Runnable任務,在線程t中執行這個Runnable。

  既然Worker實現了Runnable接口,那麼天然最核心的方法即是run()方法了:

public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

   從run方法的實現能夠看出,它首先執行的是經過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask以後,在while循環裏面不斷經過getTask()去取新的任務來執行,那麼去哪裏取呢?天然是從任務緩存隊列裏面去取,getTask是ThreadPoolExecutor類中的方法,並非Worker類中的方法,下面是getTask方法的實現:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是線程數大於核心池大小或者容許爲核心池線程設置空閒時間,
                //則經過poll取任務,若等待必定的時間取不到任務,則返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //若是沒取到任務,即r爲null,則判斷當前的worker是否能夠退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中斷處於空閒狀態的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

   在getTask中,先判斷當前線程池狀態,若是runState大於SHUTDOWN(即爲STOP或者TERMINATED),則直接返回null。

  若是runState爲SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。

  若是當前線程池的線程數大於核心池大小corePoolSize或者容許爲核心池中的線程設置空閒存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待必定的時間,若是取不到任務就返回null。

  而後判斷取到的任務r是否爲null,爲null則經過調用workerCanExit()方法來判斷當前worker是否能夠退出,咱們看一下workerCanExit()的實現:

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //若是runState大於等於STOP,或者任務緩存隊列爲空了
    //或者  容許爲核心池線程設置空閒存活時間而且線程池中的線程數目大於1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

   也就是說若是線程池處於STOP狀態、或者任務隊列已爲空或者容許爲核心池線程設置空閒存活時間而且線程數大於1時,容許worker退出。若是容許worker退出,則調用interruptIdleWorkers()中斷處於空閒狀態的worker,咱們看一下interruptIdleWorkers()的實現:

void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)  //實際上調用的是worker的interruptIfIdle()方法
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

   從實現能夠看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意這裏,是調用tryLock()來獲取鎖的,由於若是當前worker正在執行任務,鎖已經被獲取了,是沒法獲取到鎖的
                                //若是成功獲取了鎖,說明當前worker處於空閒狀態
        try {
    if (thread != Thread.currentThread())  
    thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

    這裏有一個很是巧妙的設計方式,假如咱們來設計線程池,可能會有一個任務分派線程,當發現有線程空閒時,就從任務緩存隊列中取一個任務交給空閒線程執行。可是在這裏,並無採用這樣的方式,由於這樣會要額外地對任務分派線程進行管理,無形地會增長難度和複雜度,這裏直接讓執行完任務的線程去任務緩存隊列裏面取任務來執行。

   咱們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想很是類似,惟一的區別在於addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小而且往任務隊列中添加任務失敗的狀況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

   看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本如出一轍,只是if語句判斷條件中的poolSize < maximumPoolSize不一樣而已。

  到這裏,大部分朋友應該對任務提交給線程池以後到被執行的整個過程有了一個基本的瞭解,下面總結一下:

  1)首先,要清楚corePoolSize和maximumPoolSize的含義;

  2)其次,要知道Worker是用來起到什麼做用的;

  3)要知道任務提交給線程池以後的處理策略,這裏總結一下主要有4點:

  • 若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
  • 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
  • 若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
  • 若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

JDK1.7 Worker做爲ThreadPoolExecutor在繼承AbstractQueuedSynchronizer類也實現了Runnable接口

 

public class ThreadPoolExecutor extends AbstractExecutorService {
........................
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; /** Thread this worker is running in. Null if factory fails. */ final Thread thread; /** Initial task to run. Possibly null. */ Runnable firstTask; /** Per-thread task counter */ volatile long completedTasks;
Worker(Runnable firstTask) { setState(-1); // inhibit interrupts until runWorker this.firstTask = firstTask; this.thread = getThreadFactory().newThread(this); } public void run() { runWorker(this); } protected boolean isHeldExclusively() { return getState() != 0; } protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() { acquire(1); } public boolean tryLock() { return tryAcquire(1); } public void unlock() { release(1); } public boolean isLocked() { return isHeldExclusively(); } void interruptIfStarted() { Thread t; if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { t.interrupt(); } catch (SecurityException ignore) { } } } }
...................... }

3.線程池中的線程初始化

  默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。

  在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:

  • prestartCoreThread():初始化一個核心線程;
  • prestartAllCoreThreads():初始化全部核心線程

  下面是這2個方法的實現:

public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意傳進去的參數是null
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
        ++n;
    return n;
}

   注意上面傳進去的參數是null,根據第2小節的分析可知若是傳進去的參數爲null,則最後執行線程會阻塞在getTask方法中的

r = workQueue.take();

   即等待任務隊列中有任務。

4.任務緩存隊列及排隊策略

  在前面咱們屢次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

  workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:

  • ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;
  • LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;
  • synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

5.任務拒絕策略

  當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:

ThreadPoolExecutor.AbortPolicy;//丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy;//也是丟棄任務,可是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy;//丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy;//由調用線程處理該任務

6.線程池的關閉

  ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
  • shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

7.線程池容量的動態調整

  ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設置核心池大小
  • setMaximumPoolSize:設置線程池最大能建立的線程數目大小

  當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。

三.使用示例

  前面咱們討論了關於線程池的實現原理,這一節咱們來看一下它的具體使用:

public class Test {
     public static void main(String[] args) {   
         ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                 new ArrayBlockingQueue<Runnable>(5));
          
         for(int i=0;i<15;i++){
             MyTask myTask = new MyTask(i);
             executor.execute(myTask);
             System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
             executor.getQueue().size()+",已執行玩別的任務數目:"+executor.getCompletedTaskCount());
         }
         executor.shutdown();
     }
}
 
 
class MyTask implements Runnable {
    private int taskNum;
     
    public MyTask(int num) {
        this.taskNum = num;
    }
     
    @Override
    public void run() {
        System.out.println("正在執行task "+taskNum);
        try {
            Thread.currentThread().sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println("task "+taskNum+"執行完畢");
    }
}

  執行結果:

正在執行task 0
線程池中線程數目:1,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
線程池中線程數目:2,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 1
線程池中線程數目:3,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 2
線程池中線程數目:4,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 3
線程池中線程數目:5,隊列中等待執行的任務數目:0,已執行玩別的任務數目:0
正在執行task 4
線程池中線程數目:5,隊列中等待執行的任務數目:1,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:2,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:3,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:4,已執行玩別的任務數目:0
線程池中線程數目:5,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
線程池中線程數目:6,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 10
線程池中線程數目:7,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 11
線程池中線程數目:8,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 12
線程池中線程數目:9,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
線程池中線程數目:10,隊列中等待執行的任務數目:5,已執行玩別的任務數目:0
正在執行task 14
正在執行task 13
task 0執行完畢
正在執行task 5
task 1執行完畢
正在執行task 6
task 4執行完畢
正在執行task 7
task 2執行完畢
正在執行task 8
task 10執行完畢
正在執行task 9
task 3執行完畢
task 12執行完畢
task 11執行完畢
task 13執行完畢
task 14執行完畢
task 5執行完畢
task 6執行完畢
task 8執行完畢
task 7執行完畢
task 9執行完畢

 

  從執行結果能夠看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了以後,便建立新的線程。若是上面程序中,將for循環中改爲執行20個任務,就會拋出任務拒絕異常了。

  不過在java doc中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池:

Executors.newCachedThreadPool();        //建立一個緩衝池,緩衝池容量大小爲Integer.MAX_VALUE
Executors.newSingleThreadExecutor();   //建立容量爲1的緩衝池
Executors.newFixedThreadPool(int);    //建立固定容量大小的緩衝池

   下面是這三個靜態方法的具體實現;

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

  從它們的具體實現來看,它們實際上也是調用了ThreadPoolExecutor,只不過參數都已配置好了。

  newFixedThreadPool建立的線程池corePoolSize和maximumPoolSize值是相等的,它使用的LinkedBlockingQueue;

  newSingleThreadExecutor將corePoolSize和maximumPoolSize都設置爲1,也使用的LinkedBlockingQueue;

  newCachedThreadPool將corePoolSize設置爲0,將maximumPoolSize設置爲Integer.MAX_VALUE,使用的SynchronousQueue,也就是說來了任務就建立線程運行,當線程空閒超過60秒,就銷燬線程。

  實際中,若是Executors提供的三個靜態方法能知足要求,就儘可能使用它提供的三個方法,由於本身去手動配置ThreadPoolExecutor的參數有點麻煩,要根據實際任務的類型和數量來進行配置。

  另外,若是ThreadPoolExecutor達不到要求,能夠本身繼承ThreadPoolExecutor類進行重寫。

四.如何合理配置線程池的大小

  本節來討論一個比較重要的話題:如何合理配置線程池大小,僅供參考。

  通常須要根據任務的類型來配置線程池大小:

  若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1

  若是是IO密集型任務,參考值能夠設置爲2*NCPU

  固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。

五.線程池的使用舉例

代碼實現中並無實現任務接口,而是把Runnable對象加入到線程池管理器(ThreadPool),而後剩下的事情就由線程池管理器(ThreadPool)來完成了

package mine.util.thread;  
  
import java.util.LinkedList;  
import java.util.List;  
  
/** 
 * 線程池類,線程管理器:建立線程,執行任務,銷燬線程,獲取線程基本信息 
 */  
public final class ThreadPool {  
    // 線程池中默認線程的個數爲5  
    private static int worker_num = 5;  
    // 工做線程  
    private WorkThread[] workThrads;  
    // 未處理的任務  
    private static volatile int finished_task = 0;  
    // 任務隊列,做爲一個緩衝,List線程不安全  
    private List<Runnable> taskQueue = new LinkedList<Runnable>();  
    private static ThreadPool threadPool;  
  
    // 建立具備默認線程個數的線程池  
    private ThreadPool() {  
        this(5);  
    }  
  
    // 建立線程池,worker_num爲線程池中工做線程的個數  
    private ThreadPool(int worker_num) {  
        ThreadPool.worker_num = worker_num;  
        workThrads = new WorkThread[worker_num];  
        for (int i = 0; i < worker_num; i++) {  
            workThrads[i] = new WorkThread();  
            workThrads[i].start();// 開啓線程池中的線程  
        }  
    }  
  
    // 單態模式,得到一個默認線程個數的線程池  
    public static ThreadPool getThreadPool() {  
        return getThreadPool(ThreadPool.worker_num);  
    }  
  
    // 單態模式,得到一個指定線程個數的線程池,worker_num(>0)爲線程池中工做線程的個數  
    // worker_num<=0建立默認的工做線程個數  
    public static ThreadPool getThreadPool(int worker_num1) {  
        if (worker_num1 <= 0)  
            worker_num1 = ThreadPool.worker_num;  
        if (threadPool == null)  
            threadPool = new ThreadPool(worker_num1);  
        return threadPool;  
    }  
  
    // 執行任務,其實只是把任務加入任務隊列,何時執行有線程池管理器覺定  
    public void execute(Runnable task) {  
        synchronized (taskQueue) {  
            taskQueue.add(task);  
            taskQueue.notify();  
        }  
    }  
  
    // 批量執行任務,其實只是把任務加入任務隊列,何時執行有線程池管理器覺定  
    public void execute(Runnable[] task) {  
        synchronized (taskQueue) {  
            for (Runnable t : task)  
                taskQueue.add(t);  
            taskQueue.notify();  
        }  
    }  
  
    // 批量執行任務,其實只是把任務加入任務隊列,何時執行有線程池管理器覺定  
    public void execute(List<Runnable> task) {  
        synchronized (taskQueue) {  
            for (Runnable t : task)  
                taskQueue.add(t);  
            taskQueue.notify();  
        }  
    }  
  
    // 銷燬線程池,該方法保證在全部任務都完成的狀況下才銷燬全部線程,不然等待任務完成才銷燬  
    public void destroy() {  
        while (!taskQueue.isEmpty()) {// 若是還有任務沒執行完成,就先睡會吧  
            try {  
                Thread.sleep(10);  
            } catch (InterruptedException e) {  
                e.printStackTrace();  
            }  
        }  
        // 工做線程中止工做,且置爲null  
        for (int i = 0; i < worker_num; i++) {  
            workThrads[i].stopWorker();  
            workThrads[i] = null;  
        }  
        threadPool=null;  
        taskQueue.clear();// 清空任務隊列  
    }  
  
    // 返回工做線程的個數  
    public int getWorkThreadNumber() {  
        return worker_num;  
    }  
  
    // 返回已完成任務的個數,這裏的已完成是隻出了任務隊列的任務個數,可能該任務並無實際執行完成  
    public int getFinishedTasknumber() {  
        return finished_task;  
    }  
  
    // 返回任務隊列的長度,即還沒處理的任務個數  
    public int getWaitTasknumber() {  
        return taskQueue.size();  
    }  
  
    // 覆蓋toString方法,返回線程池信息:工做線程個數和已完成任務個數  
    @Override  
    public String toString() {  
        return "WorkThread number:" + worker_num + "  finished task number:"  
                + finished_task + "  wait task number:" + getWaitTasknumber();  
    }  
  
    /** 
     * 內部類,工做線程 
     */  
    private class WorkThread extends Thread {  
        // 該工做線程是否有效,用於結束該工做線程  
        private boolean isRunning = true;  
  
        /* 
         * 關鍵所在啊,若是任務隊列不空,則取出任務執行,若任務隊列空,則等待 
         */  
        @Override  
        public void run() {  
            Runnable r = null;  
            while (isRunning) {// 注意,若線程無效則天然結束run方法,該線程就沒用了  
                synchronized (taskQueue) {  
                    while (isRunning && taskQueue.isEmpty()) {// 隊列爲空  
                        try {  
                            taskQueue.wait(20);  
                        } catch (InterruptedException e) {  
                            e.printStackTrace();  
                        }  
                    }  
                    if (!taskQueue.isEmpty())  
                        r = taskQueue.remove(0);// 取出任務  
                }  
                if (r != null) {  
                    r.run();// 執行任務  
                }  
                finished_task++;  
                r = null;  
            }  
        }  
  
        // 中止工做,讓該線程天然執行完run方法,天然結束  
        public void stopWorker() {  
            isRunning = false;  
        }  
    }  
}  

測試代碼:

package mine.util.thread;  
  
//測試線程池  
public class TestThreadPool {  
    public static void main(String[] args) {  
        // 建立3個線程的線程池  
        ThreadPool t = ThreadPool.getThreadPool(3);  
        t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
        t.execute(new Runnable[] { new Task(), new Task(), new Task() });  
        System.out.println(t);  
        t.destroy();// 全部線程都執行完成才destory  
        System.out.println(t);  
    }  
  
    // 任務類  
    static class Task implements Runnable {  
        private static volatile int i = 1;  
  
        @Override  
        public void run() {// 執行任務  
            System.out.println("任務 " + (i++) + " 完成");  
        }  
    }  
}  

運行結果

WorkThread number:3  finished task number:0  wait task number:6
任務 1 完成
任務 2 完成
任務 3 完成
任務 4 完成
任務 5 完成
任務 6 完成
WorkThread number:3  finished task number:6  wait task number:0

分析:因爲並無任務接口,傳入的能夠是自定義的任何任務,因此線程池並不能準確的判斷該任務是否真正的已經完成(真正完成該任務是這個任務的run方法執行完畢),只能知道該任務已經出了任務隊列,正在執行或者已經完成。

JAVA類庫中提供的線程池簡介:

     java提供的線程池更增強大,相信理解線程池的工做原理,看類庫中的線程池就不會感到陌生了。

executors類源碼

   public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }
    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }
    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }
    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }
    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
.......還有一些內部類.............

executorService接口繼承了executor接口

excutors接口定義了返回executorService對象的靜態方法

此文章感謝做者--海子:http://www.cnblogs.com/dolphin0520/p/3932921.html

        --Hsuxu:http://blog.csdn.net/hsuxu/article/details/8985931

相關文章
相關標籤/搜索