以前建立線程的時候都是用的 newCachedThreadPoo
,newFixedThreadPool
,newScheduledThreadPool
,newSingleThreadExecutor
這四個方法。
固然 Executors
也是用不一樣的參數去 new ThreadPoolExecutor
實現的,本文先分析前四種線程建立方式,後在分析 new ThreadPoolExecutor
建立方式java
因爲使用了LinkedBlockingQueue
因此maximumPoolSize
沒用,當corePoolSize
滿了以後就加入到LinkedBlockingQueue
隊列中。 每當某個線程執行完成以後就從LinkedBlockingQueue
隊列中取一個。 因此這個是建立固定大小的線程池。git
源碼分析github
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(
nThreads,
nThreads,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
複製代碼
建立線程數爲1的線程池,因爲使用了LinkedBlockingQueue
因此maximumPoolSize
沒用,corePoolSize
爲1表示線程數大小爲1,滿了就放入隊列中,執行完了就從隊列取一個。緩存
源碼分析函數
public static ExecutorService newSingleThreadExecutor() {
return new Executors.FinalizableDelegatedExecutorService
(
new ThreadPoolExecutor(
1,
1,
0L,
TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>())
);
}
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
複製代碼
建立可緩衝的線程池。沒有大小限制。因爲corePoolSize
爲0因此任務會放入SynchronousQueue
隊列中,SynchronousQueue
只能存放大小爲1,因此會馬上新起線程,因爲maxumumPoolSize
爲Integer.MAX_VALUE
因此能夠認爲大小爲2147483647
。受內存大小限制。oop
源碼分析源碼分析
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(
0,
Integer.MAX_VALUE,
60L,
TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
複製代碼
源碼分析 ,ThreadPoolExecutor
的構造函數測試
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
複製代碼
一、corePoolSize
核心線程數大小,當線程數 < corePoolSize ,會建立線程執行 runnableui
二、maximumPoolSize
最大線程數, 當線程數 >= corePoolSize的時候,會把 runnable 放入 workQueue中this
三、keepAliveTime
保持存活時間,當線程數大於corePoolSize的空閒線程能保持的最大時間。
四、unit
時間單位
五、workQueue
保存任務的阻塞隊列
六、threadFactory
建立線程的工廠
七、handler
拒絕策略
一、當線程數小於 corePoolSize
時,建立線程執行任務。
二、當線程數大於等於 corePoolSize
而且 workQueue
沒有滿時,放入workQueue
中
三、線程數大於等於 corePoolSize
而且當 workQueue
滿時,新任務新建線程運行,線程總數要小於 maximumPoolSize
四、當線程總數等於 maximumPoolSize
而且 workQueue
滿了的時候執行 handler
的 rejectedExecution
。也就是拒絕策略。
ThreadPoolExecutor默認有四個拒絕策略:
一、ThreadPoolExecutor.AbortPolicy()
直接拋出異常RejectedExecutionException
二、ThreadPoolExecutor.CallerRunsPolicy()
直接調用run方法而且阻塞執行
三、ThreadPoolExecutor.DiscardPolicy()
直接丟棄後來的任務
四、ThreadPoolExecutor.DiscardOldestPolicy()
丟棄在隊列中隊首的任務
固然能夠本身繼承RejectedExecutionHandler來寫拒絕策略.
package io.ymq.thread.TestThreadPoolExecutor;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/** * 描述: * * @author yanpenglei * @create 2017-10-12 15:39 **/
public class TestThreadPoolExecutor {
public static void main(String[] args) {
long currentTimeMillis = System.currentTimeMillis();
// 構造一個線程池
ThreadPoolExecutor threadPool = new ThreadPoolExecutor(5, 6, 3,
TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(3)
);
for (int i = 1; i <= 10; i++) {
try {
String task = "task=" + i;
System.out.println("建立任務並提交到線程池中:" + task);
threadPool.execute(new ThreadPoolTask(task));
Thread.sleep(100);
} catch (Exception e) {
e.printStackTrace();
}
}
try {
//等待全部線程執行完畢當前任務。
threadPool.shutdown();
boolean loop = true;
do {
//等待全部線程執行完畢當前任務結束
loop = !threadPool.awaitTermination(2, TimeUnit.SECONDS);//等待2秒
} while (loop);
if (loop != true) {
System.out.println("全部線程執行完畢");
}
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
System.out.println("耗時:" + (System.currentTimeMillis() - currentTimeMillis));
}
}
}
複製代碼
package io.ymq.thread.TestThreadPoolExecutor;
import java.io.Serializable;
/** * 描述: * * @author yanpenglei * @create 2017-10-12 15:40 **/
public class ThreadPoolTask implements Runnable, Serializable {
private Object attachData;
ThreadPoolTask(Object tasks) {
this.attachData = tasks;
}
public void run() {
try {
System.out.println("開始執行任務:" + attachData + "任務,使用的線程池,線程名稱:" + Thread.currentThread().getName());
System.out.println();
} catch (Exception e) {
e.printStackTrace();
}
attachData = null;
}
}
複製代碼
遇到java.util.concurrent.RejectedExecutionException
第一
你的線程池 ThreadPoolExecutor
顯示的 shutdown()
以後,再向線程池提交任務的時候。 若是你配置的拒絕策略是 AbortPolicy
的話,這個異常就會拋出來。
第二
當你設置的任務緩存隊列太小的時候,或者說, 你的線程池裏面全部的線程都在幹活(線程數== maxPoolSize
),而且你的任務緩存隊列也已經充滿了等待的隊列, 這個時候,你再向它提交任務,則會拋出這個異常。
響應
能夠看到線程 pool-1-thread-1 到5 循環使用
建立任務並提交到線程池中:task=1
開始執行任務:task=1任務,使用的線程池,線程名稱:pool-1-thread-1
建立任務並提交到線程池中:task=2
開始執行任務:task=2任務,使用的線程池,線程名稱:pool-1-thread-2
建立任務並提交到線程池中:task=3
開始執行任務:task=3任務,使用的線程池,線程名稱:pool-1-thread-3
建立任務並提交到線程池中:task=4
開始執行任務:task=4任務,使用的線程池,線程名稱:pool-1-thread-4
建立任務並提交到線程池中:task=5
開始執行任務:task=5任務,使用的線程池,線程名稱:pool-1-thread-5
建立任務並提交到線程池中:task=6
開始執行任務:task=6任務,使用的線程池,線程名稱:pool-1-thread-1
建立任務並提交到線程池中:task=7
開始執行任務:task=7任務,使用的線程池,線程名稱:pool-1-thread-2
建立任務並提交到線程池中:task=8
開始執行任務:task=8任務,使用的線程池,線程名稱:pool-1-thread-3
建立任務並提交到線程池中:task=9
開始執行任務:task=9任務,使用的線程池,線程名稱:pool-1-thread-4
建立任務並提交到線程池中:task=10
開始執行任務:task=10任務,使用的線程池,線程名稱:pool-1-thread-5
全部線程執行完畢
耗時:1015
複製代碼
github github.com/souyunku/ym…