承上啓下:上一篇文章小豹子講了線程池的實例化過程,粗略介紹了線程池的狀態轉換;這篇文章主要講了我運行線程池時遇到的小問題,以及
execute
方法的源碼理解。java
按照咱們的規劃,下一步就應該提交任務,探究線程池執行任務時的內部行爲,但首先,我要提交一個任務嘛。因而,接着上一篇文章的代碼,我提交了一個任務:git
@Test
public void submitTest() {
// 建立線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("拒絕服務");
}
});
// 提交任務,該任務爲睡眠 1 秒後打印 Hello
threadPoolExecutor.submit(new Callable<String>() {
@Override
public String call() throws InterruptedException {
Thread.sleep(1000L);
System.out.println("Hello");
return null;
}
});
}
複製代碼
而我並無看到任何輸出,程序也並無睡眠一秒,而是立刻結束了。哦對,我想起來,咱們建立的線程默認是守護線程,當全部用戶線程結束以後,程序就會結束了,並不會理會是否還有守護線程在運行。那麼咱們用一個簡單易行的辦法來解決這個問題 —— 不讓用戶線程結束,讓它多睡一會:github
@Test
public void submitTest() throws InterruptedException {
// 建立線程池
ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS,
new LinkedBlockingQueue<Runnable>(),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
}, new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
System.out.println("拒絕服務");
}
});
// 提交任務,該任務爲睡眠 1 秒後打印 Hello
threadPoolExecutor.submit(new Callable<String>() {
@Override
public String call() throws InterruptedException {
Thread.sleep(1000L);
System.out.println("Hello");
return null;
}
});
// 使主線程休眠 5 秒,防止守護線程意外退出
Thread.sleep(5000L);
}
複製代碼
然而,程序等待 5 秒以後,依舊沒有輸出。個人第一個反應是,我對於線程池的用法不對。是否是還須要調用某個方法來「激活」或者「啓動」線程池?而不管在文檔中,仍是各博客的例子中,我都沒有找到相似的方法。咱們仔細思考一下這個 Bug,產生這樣問題的可能緣由有三:安全
ThreadPoolExecutor
內部代碼有問題ThreadPoolExecutor
的使用方法不對ThreadFactory
或 RejectedExecutionHandler
有問題緣由 1,可能性過小,幾乎沒有。那麼緣由二、3,咱們如今無法排除,因而我嘗試構建一個最小可重現錯誤,將 ThreadPoolExecutor
剝離出來,看 Bug 是否重現:bash
最小可重現(minimal reproducible)這個思想是我在翻譯《使用 Rust 開發一個簡單的 Web 應用,第 4 部分 —— CLI 選項解析》時,做者用到的思想。就是在咱們沒法定位 Bug 時,剝離出當前代碼中咱們認爲無關的部分,剝離後觀察 Bug 是否重現,一步步縮小 Bug 的範圍。通俗的說,就是排除法。併發
private class MyThreadFactory implements ThreadFactory{
@Override
public Thread newThread(Runnable r) {
return new Thread();
}
}
@Test
public void reproducibleTest() throws InterruptedException {
new MyThreadFactory().newThread(new Runnable() {
@Override
public void run() {
try {
Thread.sleep(1000L);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("Hello");
}
}).start();
Thread.sleep(5000L);
}
複製代碼
仍是沒有任何輸出,不過這是一個好消息,這意味着咱們定位了問題所在:如今問題只可能出如今 MyThreadFactory
中,短短 6 行代碼會有什麼問題呢?哎呦(拍大腿),我沒有把 Runnable r
傳給 new Thread()
啊,我一直在執行一個空線程啊,怎麼可能有任何輸出!因而:return new Thread(r);
這樣一改就行了。ide
上面的問題看似簡單,但能出現這麼低級的錯誤,值得我思考。我由於產生該錯誤的緣由有二:post
ThreadPoolExecutor
的原理,從語法上看 ThreadFactory
的實現類只須要傳出一個 Thread
實例就好了,殊不知 Runnable r
不可或缺。因而,我決定對測試代碼進行重構。此次重構,一要使線程工廠產生非守護線程,防止由於主進程的退出致使線程池中線程所有意外退出;二要對每一個操做打日誌,咱們要能直觀的觀察到線程池在作什麼,值得一提的是,對於阻塞隊列的日誌操做,我使用了動態代理的方式對每個方法打日誌,不熟悉動態代理的童鞋能夠戳我以前寫的小豹子帶你看源碼:JDK 動態代理。測試
// import...
public class ThreadPoolExecutorTest {
/**
* 記錄啓動時間
*/
private final static long START_TIME = System.currentTimeMillis();
/**
* 自定義線程工廠,產生非守護線程,並打印日誌
*/
private class MyThreadFactory implements ThreadFactory {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setDaemon(false);
debug("建立線程 - %s", thread.getName());
return thread;
}
}
/**
* 自定義拒絕服務異常處理器,打印拒絕服務信息
*/
private class MyRejectedExecutionHandler implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
debug("拒絕請求,Runnable:%s,ThreadPoolExecutor:%s", r, executor);
}
}
/**
* 自定義任務,休眠 1 秒後打印當前線程名,並返回線程名
*/
private class MyTask implements Callable<String> {
@Override
public String call() throws InterruptedException {
Thread.sleep(1000L);
String threadName = Thread.currentThread().getName();
debug("MyTask - %s", threadName);
return threadName;
}
}
/**
* 對 BlockingQueue 的動態代理,實現對 BlockingQueue 的全部方法調用打 Log
*/
private class PrintInvocationHandler implements InvocationHandler {
private final BlockingQueue<?> blockingQueue;
private PrintInvocationHandler(BlockingQueue<?> blockingQueue) {
this.blockingQueue = blockingQueue;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
debug("BlockingQueue - %s,參數爲:%s", method.getName(), Arrays.toString(args));
Object result = method.invoke(blockingQueue, args);
debug("BlockingQueue - %s 執行完畢,返回值爲:%s", method.getName(), String.valueOf(result));
return result;
}
}
/**
* 產生 BlockingQueue 代理類
* @param blockingQueue 原 BlockingQueue
* @param <E> 任意類型
* @return 動態代理 BlockingQueue,執行任何方法時會打 Log
*/
@SuppressWarnings("unchecked")
private <E> BlockingQueue<E> debugQueue(BlockingQueue<E> blockingQueue) {
return (BlockingQueue<E>) Proxy.newProxyInstance(this.getClass().getClassLoader(),
new Class<?>[]{BlockingQueue.class},
new PrintInvocationHandler(blockingQueue));
}
/**
* 實例化一個 核心池爲 3,最大池爲 5,存活時間爲 20s,利用上述阻塞隊列、線程工廠、拒絕服務處理器的線程池實例
* @return 返回 ThreadPoolExecutor 實例
*/
private ThreadPoolExecutor newTestPoolInstance() {
return new ThreadPoolExecutor(3, 5, 20,
TimeUnit.SECONDS, debugQueue(new LinkedBlockingQueue<>()),
new MyThreadFactory(), new MyRejectedExecutionHandler());
}
/**
* 向控制檯打印日誌,自動輸出時間,線程等信息
* @param info
* @param arg
*/
private void debug(String info, Object... arg) {
long time = System.currentTimeMillis() - START_TIME;
System.out.println(String.format(((double) time / 1000) + "-" + Thread.currentThread().getName() + "-" + info, arg));
}
/**
* 測試實例化操做
*/
private void newInstanceTest() {
newTestPoolInstance();
}
/**
* 測試提交操做,提交 10 次任務
*/
private void submitTest() {
ThreadPoolExecutor threadPool = newTestPoolInstance();
for (int i = 0; i < 10; i++) {
threadPool.submit(new MyTask());
}
}
public static void main(String[] args) {
ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
test.submitTest();
}
}
複製代碼
編譯,運行 =>ui
0.047-main-建立線程 - Thread-0
0.064-main-建立線程 - Thread-1
0.064-main-建立線程 - Thread-2
0.064-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@4d7e1886]
0.064-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.064-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@3cd1a2f1]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@2f0e140b]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@7440e464]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@49476842]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@78308db1]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@27c170f0]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
1.065-Thread-1-MyTask - Thread-1
1.065-Thread-0-MyTask - Thread-0
1.065-Thread-2-MyTask - Thread-2
1.065-Thread-1-BlockingQueue - take,參數爲:null
1.065-Thread-0-BlockingQueue - take,參數爲:null
1.065-Thread-2-BlockingQueue - take,參數爲:null
1.065-Thread-0-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@3cd1a2f1
1.065-Thread-2-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@2f0e140b
1.065-Thread-1-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@4d7e1886
2.065-Thread-1-MyTask - Thread-1
2.065-Thread-2-MyTask - Thread-2
2.065-Thread-0-MyTask - Thread-0
2.065-Thread-1-BlockingQueue - take,參數爲:null
2.065-Thread-2-BlockingQueue - take,參數爲:null
2.065-Thread-0-BlockingQueue - take,參數爲:null
2.065-Thread-1-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@7440e464
2.065-Thread-2-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@49476842
2.065-Thread-0-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@78308db1
3.066-Thread-1-MyTask - Thread-1
3.066-Thread-2-MyTask - Thread-2
3.066-Thread-0-MyTask - Thread-0
3.066-Thread-2-BlockingQueue - take,參數爲:null
3.066-Thread-1-BlockingQueue - take,參數爲:null
3.066-Thread-0-BlockingQueue - take,參數爲:null
3.066-Thread-2-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@27c170f0
4.067-Thread-2-MyTask - Thread-2
4.067-Thread-2-BlockingQueue - take,參數爲:null
複製代碼
日誌的格式是:時間(秒)-線程名-信息
從日誌輸出中,咱們能夠獲知:
take
方法阻塞(這一點由日誌後幾行能夠看出,只有調用日誌,沒有調用完成的日誌)由此,我產生一個疑問:爲何始終只有三個線程?個人設置不是「核心池爲 3,最大池爲 5」嗎?爲何只有三個線程在工做呢?
終於開始看源碼了,咱們以 submit
爲切入點,探尋咱們提交任務時,線程池作了什麼,submit
方法自己很簡單,就是將傳入參數封裝爲 RunnableFuture
實例,而後調用 execute
方法,如下給出 submit
多個重載方法其中之一:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
複製代碼
那麼,咱們繼續看 execute
的代碼:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
複製代碼
咱們首先解釋一下 addWorker
方法,暫時咱們只須要了解幾件事情就能夠理解 execute
代碼了:
true
,不然返回 false
那麼咱們回過頭來理解 execute
代碼:
爲了幫助理解,我根據代碼邏輯畫了一個流程圖:
如今我明白了,只有等待隊列插入失敗(如達到容量上限等)狀況下,纔會建立非核心線程來處理任務,也就是說,咱們使用的 LinkedBlockingQueue
隊列來做爲等待隊列,那是看不到非核心線程被建立的現象的。
有心的讀者可能注意到了,整個過程沒有加鎖啊,怎樣保證併發安全呢?咱們觀察這段代碼,其實不必所有加鎖,只須要保證 addWorker
、remove
和 workQueue.offer
三個方法的線程安全,該方法就不必加鎖。事實上,在 addWorker
中是有對線程池狀態的 recheck 的,若是建立失敗會返回 false。
小豹子仍是一個大三的學生,小豹子但願你能「批判性的」閱讀本文,對本文內容中不正確、不穩當之處進行嚴厲的批評,小豹子感激涕零。