線程池不容許使用Executors去建立,而是經過ThreadPoolExecutor的方式,這樣的處理方式讓寫的同窗更加明確線程池的運行規則,規避資源耗盡的風險。

前言:jdk1.7中java.util.concurrent.Executor線程池體系介紹

java.util.concurrent.Executor : 負責線程的使用與調度的根接口 
|–ExecutorService:Executor的子接口,線程池的主要接口 
   |–ThreadPoolExecutor:ExecutorService的實現類 
 |–ScheduledExecutorService:ExecutorService的子接口,負責線程的調度 
        |–ScheduledThreadPoolExecutor:繼承了ThreadPoolExecutor實現了ScheduledExecutorServicejava

1、Executors的四種線程池

  • newCachedThreadPool 
    建立一個可緩存線程池,若是線程池長度超過處理須要,可靈活回收空閒線程,若無可回收,則新建線程。線程池爲無限大,當執行第二個任務時第一個任務已經完成,會複用執行第一個任務的線程,而不用每次新建線程。 
    建立方式: Executors.newCachedThreadPool();
  • newFixedThreadPool 
    建立一個定長線程池,可控制線程最大併發數,超出的線程會在隊列中等待。定長線程池的大小最好根據系統資源進行設置,如Runtime.getRuntime().availableProcessors()。 
    建立方式: Executors.newFixedThreadPool();
  • newScheduledThreadPool 
    建立一個定長線程池,支持定時及週期性任務執行。 
    建立方式: Executors.newScheduledThreadPool ();
  • newSingleThreadExecutor 
    建立一個單線程化的線程池,它只會用惟一的工做線程來執行任務,保證全部任務按照指定順序(FIFO, LIFO, 優先級)執行。 
    建立方式: Executors.newSingleThreadExecutor ();

newScheduledThreadPool示例:

其餘的建立完線程池後,使用 threadPool.execute(new Runnable())方式執行任務。數組

public static void main(String[] args) {  
  ScheduledExecutorService scheduledThreadPool = Executors.newScheduledThreadPool(5);  
  // 表示延遲3秒執行
  scheduledThreadPool.schedule(new Runnable() {  
   public void run() {  
    System.out.println("delay 3 seconds");  
   }  
  }, 3, TimeUnit.SECONDS);  
 }  
 // 表示延遲1秒後每3秒執行一次
 scheduledThreadPool.scheduleAtFixedRate(new Runnable() {  
   public void run() {  
    System.out.println("delay 1 seconds, and excute every 3 seconds");  
   }  
  }, 1, 3, TimeUnit.SECONDS);  
 }

Executors源碼:

查看Executors源碼咱們知道,Executors 類提供了使用了 ThreadPoolExecutor 的簡單的 ExecutorService 實現,也就是上面所說的四種Executors線程池,可是 ThreadPoolExecutor 提供的功能遠不止於此。 
不過在java doc中,並不提倡咱們直接使用ThreadPoolExecutor,而是使用Executors類中提供的幾個靜態方法來建立線程池 
咱們能夠在建立 ThreadPoolExecutor 實例時指定活動線程的數量,咱們也能夠限制線程池的大小而且建立咱們本身的 RejectedExecutionHandler 實現來處理不能適應工做隊列的工做。 
下面咱們就先了解一下ThreadPoolExecutor,而後在看個示例代碼。緩存

Executors 源碼:併發

public class Executors {

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

    public static ExecutorService newWorkStealingPool(int parallelism) {
        return new ForkJoinPool
            (parallelism,
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

    public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>(),
                                      threadFactory);
    }

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
    public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>(),
                                    threadFactory));
    }

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

    public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>(),
                                      threadFactory);
    }
    public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

    public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1, threadFactory));
    }

    public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
        return new ScheduledThreadPoolExecutor(corePoolSize);
    }

    public static ScheduledExecutorService newScheduledThreadPool(
            int corePoolSize, ThreadFactory threadFactory) {
        return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
    }
}

ThreadPoolExecutor類詳解:

java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,所以若是要透徹地瞭解Java中的線程池,必須先了解這個類。下面咱們來看一下ThreadPoolExecutor類的具體實現源碼。ide

在ThreadPoolExecutor類中提供了四個構造方法:ui

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);

    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

上面的代碼能夠得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,經過觀察每一個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工做。this

構造器中各個參數的含義:atom

  • corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有很是大的關係。在建立了線程池後,默認狀況下,線程池中並無任何線程,而是等待有任務到來才建立線程去執行任務,除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。默認狀況下,在建立了線程池後,線程池中的線程數爲0,當有任務來以後,就會建立一個線程去執行任務,當線程池中的線程數目達到corePoolSize後,就會把到達的任務放到緩存隊列當中;
  • maximumPoolSize:線程池最大線程數,這個參數也是一個很是重要的參數,它表示在線程池中最多能建立多少個線程;
  • keepAliveTime:表示線程沒有任務執行時最多保持多久時間會終止。默認狀況下,只有當線程池中的線程數大於corePoolSize時,keepAliveTime纔會起做用,直到線程池中的線程數不大於corePoolSize,即當線程池中的線程數大於corePoolSize時,若是一個線程空閒的時間達到keepAliveTime,則會終止,直到線程池中的線程數不超過corePoolSize。可是若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0;
  • unit:參數keepAliveTime的時間單位,有7種取值。TimeUnit.DAYS、TimeUnit.HOURS、TimeUnit.MINUTES、TimeUnit.SECONDS、TimeUnit.MILLISECONDS、TimeUnit.MICROSECONDS、TimeUnit.NANOSECONDS
  • workQueue:一個阻塞隊列,用來存儲等待執行的任務,這個參數的選擇也很重要,會對線程池的運行過程產生重大影響,通常來講,這裏的阻塞隊列有如下幾種選擇:ArrayBlockingQueue、LinkedBlockingQueue、SynchronousQueue。 
    ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。
  • threadFactory:線程工廠,主要用來建立線程;
  • handler:表示當拒絕處理任務時的策略,有如下四種取值: 
    ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
    ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 
    ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) 
    ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

源碼總結: 

一、Executor是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable),返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的; 
二、而後ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等; 
三、抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法; 
四、而後ThreadPoolExecutor繼承了類AbstractExecutorService。spa

ThreadPoolExecutor類中幾個重要方法: 

一、execute()方法其實是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。 
二、submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。 
三、shutdown()和shutdownNow()是用來關閉線程池的。 
四、還有一大波get的方法, 能夠獲取與線程池相關屬性的方法。線程

2、剖析線程池實現原理

一、線程池狀態

volatile int runState;  // 前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性
static final int RUNNING    = 0; //  當建立線程池後,初始時,線程池處於RUNNING狀態
static final int SHUTDOWN   = 1; 若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢
static final int STOP       = 2; // 若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;
static final int TERMINATED = 3; // 當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。

二、任務的執行

private final BlockingQueue<Runnable> workQueue;              //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態鎖,對線程池狀態(好比線程池大小、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工做集
private volatile long  keepAliveTime;    //線程存貨時間   
private volatile boolean allowCoreThreadTimeOut;   //是否容許爲核心線程設置存活時間
private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數, 當線程數大於corePoolSize時,建立新的先線程,可是建立新的線程數 + corePoolSize不能大於maximumPoolSize
private volatile int   poolSize;       //線程池中當前的線程數
private volatile RejectedExecutionHandler handler; //任務拒絕策略
private volatile ThreadFactory threadFactory;   //線程工廠,用來建立線程
private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數
private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        else if (!addWorker(command, false))
            reject(command);
    }

從代碼註釋,咱們知道:

  • 若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
  • 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
  • 若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
  • 若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

addWorker() 添加任務, 建立Worker, Worker 繼承 AbstractQueuedSynchronizer 實現 Runnable 
addWorker()幾個關鍵步驟:

w = new Worker(firstTask);
 final Thread t = w.thread; // 從worker取得線程
 if (workerAdded) {
    t.start(); // worker添加成功,執行任務
     workerStarted = true;
 }

三、線程池中的線程初始化 

默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。 
在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:

// 初始化一個核心線程;
public boolean prestartCoreThread() {
     return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
}

// 初始化全部核心線程
public int prestartAllCoreThreads() {
    int n = 0;
    while (addWorker(null, true))
        ++n;
   return n;
}

四、任務緩存隊列及排隊策略 

workQueue的類型爲BlockingQueue,一般能夠取下面三種類型: 
1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小; 
2)LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE; 
3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

五、任務拒絕策略 

前面已經講過, 當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略: 
ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。 
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。 
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程) 
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

六、線程池的關閉 

shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務 
shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

七、線程池容量的動態調整 

ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(), 
setCorePoolSize:設置核心池大小 
setMaximumPoolSize:設置線程池最大能建立的線程數目大小 
當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。

3、使用示例

咱們能夠在建立 ThreadPoolExecutor 實例時指定活動線程的數量,咱們也能夠限制線程池的大小而且建立咱們本身的 RejectedExecutionHandler 實現來處理不能適應工做隊列的工做。

public class RejectedExecutionHandlerImpl implements RejectedExecutionHandler {  

    @Override  
    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {  
        System.out.println(r.toString() + " is rejected");  
    }  

}

hreadPoolExecutor 提供了一些方法,咱們可使用這些方法來查詢 executor 的當前狀態,線程池大小,活動線程數量以及任務數量。所以我是用來一個監控線程在特定的時間間隔內打印 executor 信息。 
MyMonitorThread.java

public class MyMonitorThread implements Runnable  
{  
    private ThreadPoolExecutor executor;  

    private int seconds;  

    private boolean run=true;  

    public MyMonitorThread(ThreadPoolExecutor executor, int delay)  
    {  
        this.executor = executor;  
        this.seconds=delay;  
    }  

    public void shutdown(){  
        this.run=false;  
    }  

    @Override  
    public void run()  
    {  
        while(run){  
                System.out.println(  
                    String.format("[monitor] [%d/%d] Active: %d, Completed: %d, Task: %d, isShutdown: %s, isTerminated: %s",  
                        this.executor.getPoolSize(),  
                        this.executor.getCorePoolSize(),  
                        this.executor.getActiveCount(),  
                        this.executor.getCompletedTaskCount(),  
                        this.executor.getTaskCount(),  
                        this.executor.isShutdown(),  
                        this.executor.isTerminated()));  
                try {  
                    Thread.sleep(seconds*1000);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
        }  

    }  
}

這裏是使用 ThreadPoolExecutor 的線程池實現例子。 
WorkerPool.java

public class WorkerPool {  

    public static void main(String args[]) throws InterruptedException{  
        //RejectedExecutionHandler implementation  
        RejectedExecutionHandlerImpl rejectionHandler = new RejectedExecutionHandlerImpl();  
        //Get the ThreadFactory implementation to use  
        ThreadFactory threadFactory = Executors.defaultThreadFactory();  
        //creating the ThreadPoolExecutor  
        ThreadPoolExecutor executorPool = new ThreadPoolExecutor(2, 4, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(2), threadFactory, rejectionHandler);  
        //start the monitoring thread  
        MyMonitorThread monitor = new MyMonitorThread(executorPool, 3);  
        Thread monitorThread = new Thread(monitor);  
        monitorThread.start();  
        //submit work to the thread pool  
        for(int i=0; i<10; i++){  
            executorPool.execute(new WorkerThread("cmd"+i));  
        }  

        Thread.sleep(30000);  
        //shut down the pool  
        executorPool.shutdown();  
        //shut down the monitor thread  
        Thread.sleep(5000);  
        monitor.shutdown();  

    }  
}

意在初始化 ThreadPoolExecutor 時,咱們保持初始池大小爲 2,最大池大小爲 4 而工做隊列大小爲 2。所以若是已經有四個正在執行的任務而此時分配來更多任務的話,工做隊列將僅僅保留他們(新任務)中的兩個,其餘的將會被 RejectedExecutionHandlerImpl 處理。

4、合理配置線程池的大小

遵循兩原則:  一、若是是CPU密集型任務,就須要儘可能壓榨CPU,參考值能夠設爲 NCPU+1  二、若是是IO密集型任務,參考值能夠設置爲2*NCPU  固然,這只是一個參考值,具體的設置還須要根據實際狀況進行調整,好比能夠先將線程池大小設置爲參考值,再觀察任務運行狀況和系統負載、資源利用率來進行適當調整。

相關文章
相關標籤/搜索