### 寫在前面的話
最近一直都在研究Java的線程池ThreadPoolExecutor,可是雖然它那麼好,可是在實際的用途中怎麼去用,對於我來講就不知道如何下手了,還好有開源社區咱們能夠了解不少項目中所運用到的線程池,好比最熟悉的就是Apache Tomcat了,相信都對它不默生,一個Apache軟件基金下的一個開源Web容器,因此今天就來聊一下Tomcat的線程池實現。java
### 準備工做
首先去Apache Tomcat的官網下載Tomcat的源代碼,這裏給出<a href="http://mirrors.tuna.tsinghua.edu.cn/apache/tomcat/tomcat-7/v7.0.72/src/apache-tomcat-7.0.72-src.zip">Tomcat源碼連接</a>,下載下來以後,它是一個zip文件,須要把它進行解壓到相應的文件夾下面,以便咱們能方便的查看其源代碼。分析源碼最行之有效的方法就是知道這個類有哪些方法,哪些字段,繼承了哪些類,實現了哪些接口,因此咱們這裏推薦一款UML工具, *astah*-*professional*,可自行下載安裝,這是一個收費軟件,可是它有50天的試用期,因此咱們能夠以使用的身份使用該軟件。準備工做作好以後就能夠進行下一步的操做了。apache
### 初探Tomcat線程池
Tomcat的線程池的類文件在../apache-tomcat-7.0.72-src\java\org\apache\catalina\core包下面,定位到這個文件夾下面能夠看到StandardThreadExecutor.java就是咱們找尋的類了,用文本工具打開就能夠查看其源碼了。這裏源碼以下:
StandardThreadExecutor.java
``` java
public class StandardThreadExecutor extends LifecycleMBeanBase
implements Executor, ResizableExecutor {
//默認線程的優先級
protected int threadPriority = Thread.NORM_PRIORITY;
//守護線程
protected boolean daemon = true;
//線程名稱的前綴
protected String namePrefix = "tomcat-exec-";
//最大線程數默認200個
protected int maxThreads = 200;
//最小空閒線程25個
protected int minSpareThreads = 25;
//超時時間爲6000
protected int maxIdleTime = 60000;
//線程池容器
protected ThreadPoolExecutor executor = null;
//線程池的名稱
protected String name;
//是否提早啓動線程
protected boolean prestartminSpareThreads = false;
//隊列最大大小
protected int maxQueueSize = Integer.MAX_VALUE;
//爲了不在上下文中止以後,全部的線程在同一時間段被更新,因此進行線程的延遲操做
protected long threadRenewalDelay = 1000L;
//任務隊列
private TaskQueue taskqueue = null;tomcat
//容器啓動時進行,具體可參考org.apache.catalina.util.LifecycleBase#startInternal()
@Override
protected void startInternal() throws LifecycleException {
//實例化任務隊列
taskqueue = new TaskQueue(maxQueueSize);
//自定義的線程工廠類,實現了JDK的ThreadFactory接口
TaskThreadFactory tf = new TaskThreadFactory(namePrefix,daemon,getThreadPriority());
//這裏的ThreadPoolExecutor是tomcat自定義的,不是JDK的ThreadPoolExecutor
executor = new ThreadPoolExecutor(getMinSpareThreads(), getMaxThreads(), maxIdleTime, TimeUnit.MILLISECONDS,taskqueue, tf);
executor.setThreadRenewalDelay(threadRenewalDelay);
//是否提早啓動線程,若是爲true,則提早初始化minSpareThreads個的線程,放入線程池內
if (prestartminSpareThreads) {
executor.prestartAllCoreThreads();
}
//設置任務容器的父級線程池對象
taskqueue.setParent(executor);
//設置容器啓動狀態
setState(LifecycleState.STARTING);
}
//容器中止時的生命週期方法,進行關閉線程池和資源清理
@Override
protected void stopInternal() throws LifecycleException {多線程
setState(LifecycleState.STOPPING);
if ( executor != null ) executor.shutdownNow();
executor = null;
taskqueue = null;
}
//這個執行線程方法有超時的操做,參考org.apache.catalina.Executor接口
@Override
public void execute(Runnable command, long timeout, TimeUnit unit) {
if ( executor != null ) {
executor.execute(command,timeout,unit);
} else {
throw new IllegalStateException("StandardThreadExecutor not started.");
}
}app
//JDK默認操做線程的方法,參考java.util.concurrent.Executor接口
@Override
public void execute(Runnable command) {
if ( executor != null ) {
try {
executor.execute(command);
} catch (RejectedExecutionException rx) {
//there could have been contention around the queue
if ( !( (TaskQueue) executor.getQueue()).force(command) ) throw new RejectedExecutionException("Work queue full.");
}
} else throw new IllegalStateException("StandardThreadPool not started.");
}less
//因爲繼承了org.apache.tomcat.util.threads.ResizableExecutor接口,因此能夠從新定義線程池的大小
@Override
public boolean resizePool(int corePoolSize, int maximumPoolSize) {
if (executor == null)
return false;eclipse
executor.setCorePoolSize(corePoolSize);
executor.setMaximumPoolSize(maximumPoolSize);
return true;
}
}
```
看完了上面的源碼以後,不知此刻的你是一面茫然仍是認爲小菜一碟呢,無論怎樣,咱們先來看下UML類圖吧,瞭解一下具體的繼承關係,你就明白了,廢話很少說,能用圖片解決的東西儘可能少用文字。ide
![StandardThreadExecutor類繼承關係](http://upload-images.jianshu.io/upload_images/1421261-5d16128f41235a67.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)工具
接下來,咱們來看一下ResizableExecutor這個接口:
``` java
import java.util.concurrent.Executor;優化
public interface ResizableExecutor extends Executor {
/**
* Returns the current number of threads in the pool.
*
* @return the number of threads
*/
public int getPoolSize();
public int getMaxThreads();
/**
* Returns the approximate number of threads that are actively executing
* tasks.
*
* @return the number of threads
*/
public int getActiveCount();
public boolean resizePool(int corePoolSize, int maximumPoolSize);
public boolean resizeQueue(int capacity);
}
```
實現這個接口以後,就能動態改變線程池的大小和任務隊列的大小了,它是繼承自JDK的Executor接口的,其它的接口再也不多說,可自行查看源碼。
### Tomcat線程池的實現
Tomcat的線程池的名字也叫做ThreadPoolExecutor,剛開始看源代碼的時候還覺得是使用了JDK的ThreadPoolExecutor了呢,後面仔細查看才知道是Tomcat本身實現的一個ThreadPoolExecutor,不過基本上都差很少,都是在JDK之上封裝了一些本身的東西,上源碼:
``` java
public class ThreadPoolExecutor extends java.util.concurrent.ThreadPoolExecutor {
protected static final StringManager sm = StringManager
.getManager("org.apache.tomcat.util.threads.res");
/**
* 已經提交但還沒有完成的任務數量。
* 這包括已經在隊列中的任務和已經交給工做線程的任務但還未開始執行的任務
* 這個數字老是大於getActiveCount()的
**/
private final AtomicInteger submittedCount = new AtomicInteger(0);
private final AtomicLong lastContextStoppedTime = new AtomicLong(0L);
/**
* 最近的時間在ms時,一個線程決定殺死本身來避免
* 潛在的內存泄漏。 用於調節線程的更新速率。
*/
private final AtomicLong lastTimeThreadKilledItself = new AtomicLong(0L);
//延遲2個線程之間的延遲。 若是爲負,不更新線程。
private long threadRenewalDelay = 1000L;
//4個構造方法 ... 省略
public long getThreadRenewalDelay() {
return threadRenewalDelay;
}
public void setThreadRenewalDelay(long threadRenewalDelay) {
this.threadRenewalDelay = threadRenewalDelay;
}
/**
* 方法在完成給定Runnable的執行時調用。
* 此方法由執行任務的線程調用。 若是
* 非null,Throwable是未捕獲的{@code RuntimeException}
* 或{@code Error},致使執行忽然終止。...
* @param r 已完成的任務
* @param t 引發終止的異常,若是執行正常完成則爲null
**/
@Override
protected void afterExecute(Runnable r, Throwable t) {
submittedCount.decrementAndGet();
if (t == null) {
stopCurrentThreadIfNeeded();
}
}
//若是當前線程在上一次上下文中止以前啓動,則拋出異常,以便中止當前線程。
protected void stopCurrentThreadIfNeeded() {
if (currentThreadShouldBeStopped()) {
long lastTime = lastTimeThreadKilledItself.longValue();
if (lastTime + threadRenewalDelay < System.currentTimeMillis()) {
if (lastTimeThreadKilledItself.compareAndSet(lastTime,
System.currentTimeMillis() + 1)) {
// OK, it's really time to dispose of this thread
final String msg = sm.getString(
"threadPoolExecutor.threadStoppedToAvoidPotentialLeak",
Thread.currentThread().getName());
throw new StopPooledThreadException(msg);
}
}
}
}
//當前線程是否須要被終止
protected boolean currentThreadShouldBeStopped() {
if (threadRenewalDelay >= 0
&& Thread.currentThread() instanceof TaskThread) {
TaskThread currentTaskThread = (TaskThread) Thread.currentThread();
//線程建立的時間<上下文中止的時間,則能夠中止該線程
if (currentTaskThread.getCreationTime() <
this.lastContextStoppedTime.longValue()) {
return true;
}
}
return false;
}
public int getSubmittedCount() {
return submittedCount.get();
}
@Override
public void execute(Runnable command) {
execute(command,0,TimeUnit.MILLISECONDS);
}
public void execute(Runnable command, long timeout, TimeUnit unit) {
submittedCount.incrementAndGet();
try {
super.execute(command);
} catch (RejectedExecutionException rx) {
if (super.getQueue() instanceof TaskQueue) {
final TaskQueue queue = (TaskQueue)super.getQueue();
try {
if (!queue.force(command, timeout, unit)) {
submittedCount.decrementAndGet();
throw new RejectedExecutionException("Queue capacity is full.");
}
} catch (InterruptedException x) {
submittedCount.decrementAndGet();
Thread.interrupted();
throw new RejectedExecutionException(x);
}
} else {
submittedCount.decrementAndGet();
throw rx;
}
}
}
public void contextStopping() {
this.lastContextStoppedTime.set(System.currentTimeMillis());
int savedCorePoolSize = this.getCorePoolSize();
TaskQueue taskQueue =
getQueue() instanceof TaskQueue ? (TaskQueue) getQueue() : null;
if (taskQueue != null) {
taskQueue.setForcedRemainingCapacity(Integer.valueOf(0));
}
// setCorePoolSize(0) wakes idle threads
this.setCorePoolSize(0);
if (taskQueue != null) {
// ok, restore the state of the queue and pool
taskQueue.setForcedRemainingCapacity(null);
}
this.setCorePoolSize(savedCorePoolSize);
}
}
```
Tomcat的線程池根據文檔來講:和java.util.concurrent同樣,可是它實現了一個高效的方法getSubmittedCount()方法用來處理工做隊列。具體能夠查看上面的註釋和源碼就知道了。把UML圖獻上。
![ThreadPoolExecutor ](http://upload-images.jianshu.io/upload_images/1421261-071de0ab2e91ae14.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)
### Tomcat線程工廠
想要自定義線程工廠類,只須要實現JDK的ThreadFactory接口就能夠了,咱們來看看Tomcat是如何實現的吧:
``` java
public class TaskThreadFactory implements ThreadFactory {
//線程組
private final ThreadGroup group;
//線程增加因子
private final AtomicInteger threadNumber = new AtomicInteger(1);
//名稱前綴
private final String namePrefix;
//是不是守護線程
private final boolean daemon;
//線程優先級
private final int threadPriority;
public TaskThreadFactory(String namePrefix, boolean daemon, int priority) {
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.namePrefix = namePrefix;
this.daemon = daemon;
this.threadPriority = priority;
}
@Override
public Thread newThread(Runnable r) {
TaskThread t = new TaskThread(group, r, namePrefix + threadNumber.getAndIncrement());
t.setDaemon(daemon);
t.setPriority(threadPriority);
return t;
}
}
```
Tomcat的線程工廠類和JDK實現的線程工廠類相差無幾,具體能夠參考一下JDK線程工廠Executors.DefaultThreadFactory工廠類的實現。
### Tomcat的線程類
Tomcat本身定義了TaskThread用於線程的執行,裏面增長了creationTime字段用於定義線程建立的開始時間,以便後面線程池獲取這個時間來進行優化。
``` java
/**
* 一個實現建立時間紀錄的線程類
*/
public class TaskThread extends Thread {
private static final Log log = LogFactory.getLog(TaskThread.class);
private final long creationTime;
public TaskThread(ThreadGroup group, Runnable target, String name) {
super(group, new WrappingRunnable(target), name);
this.creationTime = System.currentTimeMillis();
}
public TaskThread(ThreadGroup group, Runnable target, String name,
long stackSize) {
super(group, new WrappingRunnable(target), name, stackSize);
this.creationTime = System.currentTimeMillis();
}
public final long getCreationTime() {
return creationTime;
}
/**
* 封裝{@link Runnable}以接受任何{@link StopPooledThreadException},而不是讓它走,並可能在調試器中觸發中斷。
*/
private static class WrappingRunnable implements Runnable {
private Runnable wrappedRunnable;
WrappingRunnable(Runnable wrappedRunnable) {
this.wrappedRunnable = wrappedRunnable;
}
@Override
public void run() {
try {
wrappedRunnable.run();
} catch(StopPooledThreadException exc) {
//expected : we just swallow the exception to avoid disturbing
//debuggers like eclipse's
log.debug("Thread exiting on purpose", exc);
}
}
}
}
```
按照Tomcat的註解可知,它就是一個普通的線程類而後增長一個紀錄線程建立的時間紀錄而已,後面還使用動態內部類封裝了一個Runnable,用於調試出發中斷。
### Tomcat任務隊列
Tomcat的線程隊列由org.apache.tomcat.util.threads.TaskQueue來處理,它集成自LinkedBlockingQueue(一個阻塞的鏈表隊列),來看下源代碼吧。
``` java
public class TaskQueue extends LinkedBlockingQueue<Runnable> {
private static final long serialVersionUID = 1L;
private ThreadPoolExecutor parent = null;
// no need to be volatile, the one times when we change and read it occur in
// a single thread (the one that did stop a context and fired listeners)
private Integer forcedRemainingCapacity = null;
public TaskQueue() {
super();
}
public TaskQueue(int capacity) {
super(capacity);
}
public TaskQueue(Collection<? extends Runnable> c) {
super(c);
}
public void setParent(ThreadPoolExecutor tp) {
parent = tp;
}
public boolean force(Runnable o) {
if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o); //forces the item onto the queue, to be used if the task is rejected
}
public boolean force(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
if ( parent.isShutdown() ) throw new RejectedExecutionException("Executor not running, can't force a command into the queue");
return super.offer(o,timeout,unit); //forces the item onto the queue, to be used if the task is rejected
}
@Override
public boolean offer(Runnable o) {
//we can't do any checks
if (parent==null) return super.offer(o);
//we are maxed out on threads, simply queue the object
if (parent.getPoolSize() == parent.getMaximumPoolSize()) return super.offer(o);
//we have idle threads, just add it to the queue
if (parent.getSubmittedCount()<(parent.getPoolSize())) return super.offer(o);
//if we have less threads than maximum force creation of a new thread
if (parent.getPoolSize()<parent.getMaximumPoolSize()) return false;
//if we reached here, we need to add it to the queue
return super.offer(o);
}
@Override
public Runnable poll(long timeout, TimeUnit unit)
throws InterruptedException {
Runnable runnable = super.poll(timeout, unit);
if (runnable == null && parent != null) {
// the poll timed out, it gives an opportunity to stop the current
// thread if needed to avoid memory leaks.
parent.stopCurrentThreadIfNeeded();
}
return runnable;
}
@Override
public Runnable take() throws InterruptedException {
if (parent != null && parent.currentThreadShouldBeStopped()) {
return poll(parent.getKeepAliveTime(TimeUnit.MILLISECONDS),
TimeUnit.MILLISECONDS);
// yes, this may return null (in case of timeout) which normally
// does not occur with take()
// but the ThreadPoolExecutor implementation allows this
}
return super.take();
}
@Override
public int remainingCapacity() {
if (forcedRemainingCapacity != null) {
return forcedRemainingCapacity.intValue();
}
return super.remainingCapacity();
}
public void setForcedRemainingCapacity(Integer forcedRemainingCapacity) {
this.forcedRemainingCapacity = forcedRemainingCapacity;
}
}
```
TaskQueue這個任務隊列是專門爲線程池而設計的。優化任務隊列以適當地利用線程池執行器內的線程。
若是你使用一個普通的隊列,執行器將產生線程,當有空閒線程,你不能強制項目到隊列自己。
### 總結 從0到1分析一下Apache Tomcat的線程池,感受心好累啊,不過有收穫,至少多線程池這一塊又增強了,首先是定位到了StandardThreadExecutor這個類,而後由此展開,ResizableExecutor(動態大小的線程池接口) 、ThreadPoolExecutor (Tomcat線程池具體實現對象)、TaskThreadFactory(Tomcat線程工廠)、TaskThread(Tomcat線程類-一個紀錄建立時間的線程類)、TaskQueue(Tomcat的任務隊列-一個專門爲線程池而設計優化的任務隊列),喝口水,壓壓驚。