深刻分析java線程池的理解

                           深刻分析java線程池的理解

1.概述

Java1.5後引入的Executor框架的最大優勢是把任務的提交和執行解耦,只需把Task描述清楚,而後提交便可。至於這個Task是怎麼被執行的,被誰執行的,何時執行的,就所有交給線程池管理。java

2.先上案例

MyTask.javaweb

package com.qu.webservice;

public class MyTask implements Runnable{
    private int taskNum;
    
    public MyTask(int taskNum){
    	this.taskNum = taskNum;
    }
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println("正在執行的任務"+taskNum);
		try {
			Thread.sleep(4000);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println("任務"+taskNum+"執行完畢!");
	}

}

main函數:數組

package com.qu.webservice;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class TestExecutor {

	public static void main(String[] args) {
		// TODO Auto-generated method stub
        ThreadPoolExecutor executor = new ThreadPoolExecutor(
        		5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5));
        for(int i= 0;i<5;i++){
        	MyTask task = new MyTask(i);
        	executor.execute(task);
        	System.out.println("線程池中線程數目:"+executor.getPoolSize()+",隊列中等待執行的任務數目:"+
                    executor.getQueue().size()+",已執行完畢的任務數目:"+executor.getCompletedTaskCount());
        }
        executor.shutdown();
	}

}

當咱們任務的個數爲5時,運行後效果以下:緩存

當咱們任務的個數爲10時,運行後效果以下:框架

 從執行結果能夠看出,當線程池中線程的數目大於5時,便將任務放入任務緩存隊列裏面,當任務緩存隊列滿了以後,便建立新的線程。若是上面程序中,將for循環中改爲執行15個任務,此時線程池中線程數爲10,已經達到了線程池容許的最大線程數,效果圖以下:ide

若是上面程序中,將for循環中改爲執行16個任務時,此時就會拋出拒絕異常,效果圖以下:函數

可能有人會問,爲何會出現這樣的狀況呢?今天我就帶着你們從源碼理解線程池的工做原理,來搞清楚咱們如今的疑問。工具

3.Executor框架成員

線程池實現框架中包含了一堆實現類,它們之間的關係以下,只有瞭解了各個類之間的關係,才能方便咱們更好的理解線程池的實現,來張圖給你們看看。ui

從圖中能夠看到Executor、ExecutorService、ScheduledExecutorService定義線程池接口,ThreadPoolExecutor和ScheduledThreadPoolExecutor是線程池的實現,前者是一個普通的線程池,後者一個按期調度的線程池,Executors是輔助工具,用以幫助咱們快速定義線程池。this

3.1首先看一下ThreadPoolExecutor這個類

 java.uitl.concurrent.ThreadPoolExecutor類是線程池中最核心的一個類,所以若是要透徹地瞭解Java中的線程池,必須先了解這個類。下面咱們來看一下ThreadPoolExecutor類的具體實現源碼。

 在ThreadPoolExecutor類中提供了四個構造方法:

public class ThreadPoolExecutor extends AbstractExecutorService {
    .....
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
            BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler);
 
    public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,
        BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler);
    ...
}

  從上面的代碼能夠得知,ThreadPoolExecutor繼承了AbstractExecutorService類,並提供了四個構造器,事實上,經過觀察每一個構造器的源碼具體實現,發現前面三個構造器都是調用的第四個構造器進行的初始化工做。

構造參數的選擇對於不一樣的場景顯得極爲重要,下面解釋下一下構造器中各個參數的含義:

  • corePoolSize:核心池的大小,這個參數跟後面講述的線程池的實現原理有很是大的關係。默認狀況下,線程池建立後,線程池中的個數是爲0,只有來了任務以後,纔會去建立線程來執行任務。若是當線程池中的線程數大於corePoolSize時,在提交到線程池的任務就會被存放到緩存任務隊列當中。除非調用了prestartAllCoreThreads()或者prestartCoreThread()方法,從這2個方法的名字就能夠看出,是預建立線程的意思,即在沒有任務到來以前就建立corePoolSize個線程或者一個線程。
  • maximumPoolSize:線程池 中最大線程數,這個參數也是一個很是重要的參數,它表示在線程池中容許最多能建立多少個線程。
  • keepAliveTime:表示線程沒有執行任務時可以存活的時間,只有當線程池中的線程數大於corePoolSize,keepAliveTime這個參數纔會起做用。也就是說當線程池中的線程數大於corePoolSize時,此時有一個空閒的線程空閒時間爲keepAliveTime,則這個線程就會被終止。直到線程池中的線程數不超過corePoolSize。可是若是調用了allowCoreThreadTimeOut(boolean)方法,在線程池中的線程數不大於corePoolSize時,keepAliveTime參數也會起做用,直到線程池中的線程數爲0;
  • unit:參數keepAliveTime的時間單位,有7種取值,在TimeUnit類中有7種靜態屬性:
TimeUnit.DAYS;               //天
TimeUnit.HOURS;             //小時
TimeUnit.MINUTES;           //分鐘
TimeUnit.SECONDS;           //秒
TimeUnit.MILLISECONDS;      //毫秒
TimeUnit.MICROSECONDS;      //微妙
TimeUnit.NANOSECONDS;       //納秒
  • workQueue:一個阻塞隊列,用來存取等待執行的任務,這個參數的選擇也極爲重要,阻塞隊列有以下幾種:
ArrayBlockingQueue:基於數組結構的有界阻塞隊列,按FIFO原則對元素進行排序。
LinkedBlockingQuene:基於鏈表結構的阻塞隊列,按FIFO排序元素,吞吐量一般要高於ArrayBlockingQuene。
SynchronousQuene:一個不存儲元素的阻塞隊列,每一個插入操做必須等到另外一個線程調用移除操做,不然插入操做一直處於阻塞狀態,吞吐量一般要高於LinkedBlockingQuene。
priorityBlockingQuene:具備優先級的無界阻塞隊列。

 ArrayBlockingQueue和PriorityBlockingQueue使用較少,通常使用LinkedBlockingQueue和Synchronous。線程池的排隊策略與BlockingQueue有關。

  • threadFactory:建立線程的工廠。能夠經過自定義線程工廠給每一個線程設置有意義的名稱。如guava提供的ThreadFactoryBuilder。
new ThreadFactoryBuilder().setNameFormat("XX-task-%d").build();
  • rejectedExecutionHandler:飽和策略,當阻塞隊列滿了且沒有空閒的工做線程,說明線程池處於飽和狀態,那麼必須採起一種策略處理提交的新任務。這個策略在默認狀況下是AbortPolicy,表示沒法處理新任務時拋出異常。不過,線程池提供了4種策略:
一、AbortPolicy:直接拋出異常。
二、CallerRunsPolicy:只用調用者所在的線程來運行任務。
三、DiscardOldestPolicy:丟棄阻塞隊列中最近的一個任務,並執行當前任務。
四、DiscardPolicy:直接丟棄。
固然,也能夠根據應用場景來實現RejectedExecutionHandler接口自定義飽和策略,如記錄日誌或持久化存儲不能處理的任務。

從上面給出的ThreadPoolExecutor類的代碼能夠知道,ThreadPoolExecutor繼承了AbstractExecutorService,咱們來看一下AbstractExecutorService的實現:

3.2AbstractExecutorService

咱們來看一下AbstractExecutorService這個抽象類的實現:

public abstract class AbstractExecutorService implements ExecutorService {
 
     
    protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) { };
    protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) { };
    public Future<?> submit(Runnable task) {};
    public <T> Future<T> submit(Runnable task, T result) { };
    public <T> Future<T> submit(Callable<T> task) { };
    private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
                            boolean timed, long nanos)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException {
    };
    public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                           long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException {
    };
    public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                         long timeout, TimeUnit unit)
        throws InterruptedException {
    };
}

 這是一個抽象類,主要定義了3個方法:invokeAll(), invokeAny(), submit()等方法,並無作其餘的。

AbstractExecutorService是一個抽象類,它實現了ExecutorService接口。

3.2ExecutorService

咱們來看一下ExecutorService這個接口的實現:

public interface ExecutorService extends Executor {
 
    void shutdown();
    boolean isShutdown();
    boolean isTerminated();
    boolean awaitTermination(long timeout, TimeUnit unit)
        throws InterruptedException;
    <T> Future<T> submit(Callable<T> task);
    <T> Future<T> submit(Runnable task, T result);
    Future<?> submit(Runnable task);
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
        throws InterruptedException;
    <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                  long timeout, TimeUnit unit)
        throws InterruptedException;
 
    <T> T invokeAny(Collection<? extends Callable<T>> tasks)
        throws InterruptedException, ExecutionException;
    <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                    long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

這個接口也是定義了一些方法,具體的方法在這兒就不依依介紹了。

而ExecutorService又是繼承了Executor接口,咱們看一下Executor接口的實現:

3.3Executor

public interface Executor {
    void execute(Runnable command);
}

 到這裏,你們應該明白了ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor幾個之間的關係了。

總結一下:

Executor是一個頂層接口,在它裏面只聲明瞭一個方法execute(Runnable),返回值爲void,參數爲Runnable類型,從字面意思能夠理解,就是用來執行傳進去的任務的;

而後ExecutorService接口繼承了Executor接口,並聲明瞭一些方法:submit、invokeAll、invokeAny以及shutDown等;

抽象類AbstractExecutorService實現了ExecutorService接口,基本實現了ExecutorService中聲明的全部方法;而後ThreadPoolExecutor繼承了類AbstractExecutorService。

在ThreadPoolExecutor類中有幾個很是重要的方法:

execute()
submit()
shutdown()
shutdownNow()

execute()方法其實是Executor中聲明的方法,在ThreadPoolExecutor進行了具體的實現,這個方法是ThreadPoolExecutor的核心方法,經過這個方法能夠向線程池提交一個任務,交由線程池去執行。

submit()方法是在ExecutorService中聲明的方法,在AbstractExecutorService就已經有了具體的實現,在ThreadPoolExecutor中並無對其進行重寫,這個方法也是用來向線程池提交任務的,可是它和execute()方法不一樣,它可以返回任務執行的結果,去看submit()方法的實現,會發現它實際上仍是調用的execute()方法,只不過它利用了Future來獲取任務執行結果(Future相關內容將在下一篇講述)。

shutdown()和shutdownNow()是用來關閉線程池的。

還有不少其餘的方法:好比:getQueue() 、getPoolSize() 、getActiveCount()、getCompletedTaskCount()等獲取與線程池相關屬性的方法

4.深刻剖析線程池實現的原理

下面我將從如下幾個方法具體的詳解線程池實現的原理:

1.線程池狀態

 在ThreadPoolExecutor中定義了一個volatile變量,另外定義了幾個static final變量表示線程池的各個狀態:

volatile int runState;
static final int RUNNING    = 0;
static final int SHUTDOWN   = 1;
static final int STOP       = 2;
static final int TERMINATED = 3;

runState表示當前線程池的狀態,它是一個volatile變量用來保證線程之間的可見性;

下面的幾個static final變量表示runState可能的幾個取值。

  • 當建立線程池後,初始時,線程池處於RUNNING狀態;
  • 若是調用了shutdown()方法,則線程池處於SHUTDOWN狀態,此時線程池不可以接受新的任務,它會等待全部任務執行完畢;
  • 若是調用了shutdownNow()方法,則線程池處於STOP狀態,此時線程池不能接受新的任務,而且會去嘗試終止正在執行的任務;
  • 當線程池處於SHUTDOWN或STOP狀態,而且全部工做線程已經銷燬,任務緩存隊列已經清空或執行結束後,線程池被設置爲TERMINATED狀態。

2.任務的執行

在瞭解將任務提交給線程池到任務執行完畢整個過程以前,咱們先來看一下ThreadPoolExecutor類中其餘的一些比較重要成員變量:

private final BlockingQueue<Runnable> workQueue;              //任務緩存隊列,用來存放等待執行的任務
private final ReentrantLock mainLock = new ReentrantLock();   //線程池的主要狀態鎖,對線程池狀態(好比線程池大小
                                                              //、runState等)的改變都要使用這個鎖
private final HashSet<Worker> workers = new HashSet<Worker>();  //用來存放工做集
 
private volatile long  keepAliveTime;    //線程存貨時間   
private volatile boolean allowCoreThreadTimeOut;   //是否容許爲核心線程設置存活時間
private volatile int   corePoolSize;     //核心池的大小(即線程池中的線程數目大於這個參數時,提交的任務會被放進任務緩存隊列)
private volatile int   maximumPoolSize;   //線程池最大能容忍的線程數
 
private volatile int   poolSize;       //線程池中當前的線程數
 
private volatile RejectedExecutionHandler handler; //任務拒絕策略
 
private volatile ThreadFactory threadFactory;   //線程工廠,用來建立線程
 
private int largestPoolSize;   //用來記錄線程池中曾經出現過的最大線程數
 
private long completedTaskCount;   //用來記錄已經執行完畢的任務個數

在ThreadPoolExecutor類中,最核心的任務提交方法是execute()方法,雖然經過submit也能夠提交任務,可是實際上submit方法裏面最終調用的仍是execute()方法,因此咱們只須要研究execute()方法的實現原理便可:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
        if (runState == RUNNING && workQueue.offer(command)) {
            if (runState != RUNNING || poolSize == 0)
                ensureQueuedTaskHandled(command);
        }
        else if (!addIfUnderMaximumPoolSize(command))
            reject(command); // is shutdown or saturated
    }
}

1.判斷提交的任務command是否爲null,如果null,則拋出空指針異常;接着是這句,這句要好好理解一下:

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))

因爲是或條件運算符,因此先計算前半部分的值,若是線程池中當前線程數不小於核心池大小,那麼就會直接進入下面的if語句塊了。若是線程池中當前線程數小於核心池大小,則接着執行後半部分,也就是執行

addIfUnderCorePoolSize(command)方法。

2.若是執行完addIfUnderCorePoolSize這個方法返回false,則繼續執行下面的if語句塊,不然整個方法就直接執行完畢了。若是執行完addIfUnderCorePoolSize這個方法返回false,而後接着判斷:

if (runState == RUNNING && workQueue.offer(command))

若是當前線程池處於RUNNING狀態,則將任務放入任務緩存隊列;若是當前線程池不處於RUNNING狀態或者任務放入緩存隊列失敗,則執行:addIfUnderMaximumPoolSize(command)

3.若是執行addIfUnderMaximumPoolSize方法失敗,則執行reject()方法進行任務拒絕處理。

回到前面:if (runState == RUNNING && workQueue.offer(command

這句的執行,若是說當前線程池處於RUNNING狀態且將任務放入任務緩存隊列成功,則繼續進行判斷:
if (runState != RUNNING || poolSize == 0)

這句判斷是爲了防止在將此任務添加進任務緩存隊列的同時其餘線程忽然調用shutdown或者shutdownNow方法關閉了線程池的一種應急措施。若是是這樣就執行:

ensureQueuedTaskHandled(command)進行應急處理,從名字能夠看出是保證 添加到任務緩存隊列中的任務獲得處理。

從上面分析源碼能夠看出主要是2個方法:addIfUnderCorePoolSize和addIfUnderMaximumPoolSize

咱們先看addIfUnderCorePoolSize()這個方法:

private boolean addIfUnderCorePoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < corePoolSize && runState == RUNNING)
            t = addThread(firstTask);        //建立線程去執行firstTask任務   
        } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

這個是addIfUnderCorePoolSize方法的具體實現,從名字能夠看出它的意圖就是當低於核心吃大小時執行的方法。下面看其具體實現,首先獲取到鎖,由於這地方涉及到線程池狀態的變化,先經過if語句判斷當前線程池中的線程數目是否小於核心池大小,有朋友也許會有疑問:前面在execute()方法中不是已經判斷過了嗎,只有線程池當前線程數目小於核心池大小纔會執行addIfUnderCorePoolSize方法的,爲什麼這地方還要繼續判斷?緣由很簡單,前面的判斷過程當中並無加鎖,所以可能在execute方法判斷的時候poolSize小於corePoolSize,而判斷完以後,在其餘線程中又向線程池提交了任務,就可能致使poolSize不小於corePoolSize了,因此須要在這個地方繼續判斷。而後接着判斷線程池的狀態是否爲RUNNING,緣由也很簡單,由於有可能在其餘線程中調用了shutdown或者shutdownNow方法。而後就是執行

t = addThread(firstTask);

這個方法也很是關鍵,傳進去的參數爲提交的任務,返回值爲Thread類型。而後接着在下面判斷t是否爲空,爲空則代表建立線程失敗(即poolSize>=corePoolSize或者runState不等於RUNNING),不然調用t.start()方法啓動線程。

咱們來看一下addThread方法的實現:

private Thread addThread(Runnable firstTask) {
    Worker w = new Worker(firstTask);
    Thread t = threadFactory.newThread(w);  //建立一個線程,執行任務   
    if (t != null) {
        w.thread = t;            //將建立的線程的引用賦值爲w的成員變量       
        workers.add(w);
        int nt = ++poolSize;     //當前線程數加1       
        if (nt > largestPoolSize)
            largestPoolSize = nt;
    }
    return t;
}

在addThread方法中,首先用提交的任務建立了一個Worker對象,而後調用線程工廠threadFactory建立了一個新的線程t,而後將線程t的引用賦值給了Worker對象的成員變量thread,接着經過workers.add(w)將Worker對象添加到工做集當中。

下面咱們看一下Worker類的實現:

private final class Worker implements Runnable {
    private final ReentrantLock runLock = new ReentrantLock();
    private Runnable firstTask;
    volatile long completedTasks;
    Thread thread;
    Worker(Runnable firstTask) {
        this.firstTask = firstTask;
    }
    boolean isActive() {
        return runLock.isLocked();
    }
    void interruptIfIdle() {
        final ReentrantLock runLock = this.runLock;
        if (runLock.tryLock()) {
            try {
        if (thread != Thread.currentThread())
        thread.interrupt();
            } finally {
                runLock.unlock();
            }
        }
    }
    void interruptNow() {
        thread.interrupt();
    }
 
    private void runTask(Runnable task) {
        final ReentrantLock runLock = this.runLock;
        runLock.lock();
        try {
            if (runState < STOP &&
                Thread.interrupted() &&
                runState >= STOP)
            boolean ran = false;
            beforeExecute(thread, task);   //beforeExecute方法是ThreadPoolExecutor類的一個方法,沒有具體實現,用戶能夠根據
            //本身須要重載這個方法和後面的afterExecute方法來進行一些統計信息,好比某個任務的執行時間等           
            try {
                task.run();
                ran = true;
                afterExecute(task, null);
                ++completedTasks;
            } catch (RuntimeException ex) {
                if (!ran)
                    afterExecute(task, ex);
                throw ex;
            }
        } finally {
            runLock.unlock();
        }
    }
 
    public void run() {
        try {
            Runnable task = firstTask;
            firstTask = null;
            while (task != null || (task = getTask()) != null) {
                runTask(task);
                task = null;
            }
        } finally {
            workerDone(this);   //當任務隊列中沒有任務時,進行清理工做       
        }
    }
}

 它實際上實現了Runnable接口,所以上面的Thread t = threadFactory.newThread(w);效果跟下面這句的效果基本同樣:

Thread t = new Thread(w);

至關於傳進去了一個Runnable任務,在線程t中執行這個Runnable。

既然Worker實現了Runnable接口,那麼天然最核心的方法即是run()方法了:

public void run() {
    try {
        Runnable task = firstTask;
        firstTask = null;
        while (task != null || (task = getTask()) != null) {
            runTask(task);
            task = null;
        }
    } finally {
        workerDone(this);
    }
}

 從run方法的實現能夠看出,它首先執行的是經過構造器傳進來的任務firstTask,在調用runTask()執行完firstTask以後,在while循環裏面不斷經過getTask()去取新的任務來執行,那麼去哪裏取呢?天然是從任務緩存隊列裏面去取,getTask是ThreadPoolExecutor類中的方法,並非Worker類中的方法,下面是getTask方法的實現:

Runnable getTask() {
    for (;;) {
        try {
            int state = runState;
            if (state > SHUTDOWN)
                return null;
            Runnable r;
            if (state == SHUTDOWN)  // Help drain queue
                r = workQueue.poll();
            else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //若是線程數大於核心池大小或者容許爲核心池線程設置空閒時間,
                //則經過poll取任務,若等待必定的時間取不到任務,則返回null
                r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
            else
                r = workQueue.take();
            if (r != null)
                return r;
            if (workerCanExit()) {    //若是沒取到任務,即r爲null,則判斷當前的worker是否能夠退出
                if (runState >= SHUTDOWN) // Wake up others
                    interruptIdleWorkers();   //中斷處於空閒狀態的worker
                return null;
            }
            // Else retry
        } catch (InterruptedException ie) {
            // On interruption, re-check runState
        }
    }
}

在getTask中,先判斷當前線程池狀態,若是runState大於SHUTDOWN(即爲STOP或者TERMINATED),則直接返回null。

若是runState爲SHUTDOWN或者RUNNING,則從任務緩存隊列取任務。

若是當前線程池的線程數大於核心池大小corePoolSize或者容許爲核心池中的線程設置空閒存活時間,則調用poll(time,timeUnit)來取任務,這個方法會等待必定的時間,若是取不到任務就返回null。

而後判斷取到的任務r是否爲null,爲null則經過調用workerCanExit()方法來判斷當前worker是否能夠退出,咱們看一下workerCanExit()的實現:

private boolean workerCanExit() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    boolean canExit;
    //若是runState大於等於STOP,或者任務緩存隊列爲空了
    //或者  容許爲核心池線程設置空閒存活時間而且線程池中的線程數目大於1
    try {
        canExit = runState >= STOP ||
            workQueue.isEmpty() ||
            (allowCoreThreadTimeOut &&
             poolSize > Math.max(1, corePoolSize));
    } finally {
        mainLock.unlock();
    }
    return canExit;
}

也就是說若是線程池處於STOP狀態、或者任務隊列已爲空或者容許爲核心池線程設置空閒存活時間而且線程數大於1時,容許worker退出。若是容許worker退出,則調用interruptIdleWorkers()中斷處於空閒狀態的worker,咱們看一下interruptIdleWorkers()的實現:

void interruptIdleWorkers() {
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        for (Worker w : workers)  //實際上調用的是worker的interruptIfIdle()方法
            w.interruptIfIdle();
    } finally {
        mainLock.unlock();
    }
}

 從實現能夠看出,它實際上調用的是worker的interruptIfIdle()方法,在worker的interruptIfIdle()方法中:

void interruptIfIdle() {
    final ReentrantLock runLock = this.runLock;
    if (runLock.tryLock()) {    //注意這裏,是調用tryLock()來獲取鎖的,由於若是當前worker正在執行任務,鎖已經被獲取了,是沒法獲取到鎖的
                                //若是成功獲取了鎖,說明當前worker處於空閒狀態
        try {
    if (thread != Thread.currentThread())  
    thread.interrupt();
        } finally {
            runLock.unlock();
        }
    }
}

這裏有一個很是巧妙的設計方式,假如咱們來設計線程池,可能會有一個任務分派線程,當發現有線程空閒時,就從任務緩存隊列中取一個任務交給空閒線程執行。可是在這裏,並無採用這樣的方式,由於這樣會要額外地對任務分派線程進行管理,無形地會增長難度和複雜度,這裏直接讓執行完任務的線程去任務緩存隊列裏面取任務來執行。

咱們再看addIfUnderMaximumPoolSize方法的實現,這個方法的實現思想和addIfUnderCorePoolSize方法的實現思想很是類似,惟一的區別在於addIfUnderMaximumPoolSize方法是在線程池中的線程數達到了核心池大小而且往任務隊列中添加任務失敗的狀況下執行的:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
    Thread t = null;
    final ReentrantLock mainLock = this.mainLock;
    mainLock.lock();
    try {
        if (poolSize < maximumPoolSize && runState == RUNNING)
            t = addThread(firstTask);
    } finally {
        mainLock.unlock();
    }
    if (t == null)
        return false;
    t.start();
    return true;
}

 看到沒有,其實它和addIfUnderCorePoolSize方法的實現基本如出一轍,只是if語句判斷條件中的poolSize < maximumPoolSize不一樣而已。到這裏,大部分朋友應該對任務提交給線程池以後到被執行的整個過程有了一個基本的瞭解,下面總結一下:

  • 若是當前線程池中的線程數目小於corePoolSize,則每來一個任務,就會建立一個線程去執行這個任務;
  • 若是當前線程池中的線程數目>=corePoolSize,則每來一個任務,會嘗試將其添加到任務緩存隊列當中,若添加成功,則該任務會等待空閒線程將其取出去執行;若添加失敗(通常來講是任務緩存隊列已滿),則會嘗試建立新的線程去執行這個任務;
  • 若是當前線程池中的線程數目達到maximumPoolSize,則會採起任務拒絕策略進行處理;
  • 若是線程池中的線程數量大於 corePoolSize時,若是某線程空閒時間超過keepAliveTime,線程將被終止,直至線程池中的線程數目不大於corePoolSize;若是容許爲核心池中的線程設置存活時間,那麼核心池中的線程空閒時間超過keepAliveTime,線程也會被終止。

3.線程池中的線程初始化

默認狀況下,建立線程池以後,線程池中是沒有線程的,須要提交任務以後纔會建立線程。

  在實際中若是須要線程池建立以後當即建立線程,能夠經過如下兩個方法辦到:

  • prestartCoreThread():初始化一個核心線程;
  • prestartAllCoreThreads():初始化全部核心線程

下面是這2個方法的實現:

public boolean prestartCoreThread() {
    return addIfUnderCorePoolSize(null); //注意傳進去的參數是null
}
 
public int prestartAllCoreThreads() {
    int n = 0;
    while (addIfUnderCorePoolSize(null))//注意傳進去的參數是null
        ++n;
    return n;
}

注意上面傳進去的參數是null,根據第2前面的分析可知若是傳進去的參數爲null,則最後執行線程會阻塞在getTask方法中的

r = workQueue.take();

即等待任務隊列中有任務。

4.任務緩存隊列及排隊策略

在前面咱們屢次提到了任務緩存隊列,即workQueue,它用來存放等待執行的任務。

  workQueue的類型爲BlockingQueue<Runnable>,一般能夠取下面三種類型:

  1)ArrayBlockingQueue:基於數組的先進先出隊列,此隊列建立時必須指定大小;

  2)LinkedBlockingQueue:基於鏈表的先進先出隊列,若是建立時沒有指定此隊列大小,則默認爲Integer.MAX_VALUE;

  3)synchronousQueue:這個隊列比較特殊,它不會保存提交的任務,而是將直接新建一個線程來執行新來的任務。

5.任務拒絕策略

當線程池的任務緩存隊列已滿而且線程池中的線程數目達到maximumPoolSize,若是還有任務到來就會採起任務拒絕策略,一般有如下四種策略:

ThreadPoolExecutor.AbortPolicy:丟棄任務並拋出RejectedExecutionException異常。
ThreadPoolExecutor.DiscardPolicy:也是丟棄任務,可是不拋出異常。
ThreadPoolExecutor.DiscardOldestPolicy:丟棄隊列最前面的任務,而後從新嘗試執行任務(重複此過程)
ThreadPoolExecutor.CallerRunsPolicy:由調用線程處理該任務

6.線程池的關閉

ThreadPoolExecutor提供了兩個方法,用於線程池的關閉,分別是shutdown()和shutdownNow(),其中:

  • shutdown():不會當即終止線程池,而是要等全部任務緩存隊列中的任務都執行完後才終止,但不再會接受新的任務
  • shutdownNow():當即終止線程池,並嘗試打斷正在執行的任務,而且清空任務緩存隊列,返回還沒有執行的任務

7.線程池容量的動態調整

ThreadPoolExecutor提供了動態調整線程池容量大小的方法:setCorePoolSize()和setMaximumPoolSize(),

  • setCorePoolSize:設置核心池大小
  • setMaximumPoolSize:設置線程池最大能建立的線程數目大小

  當上述參數從小變大時,ThreadPoolExecutor進行線程賦值,還可能當即建立新的線程來執行任務。

相關文章
相關標籤/搜索