小豹子帶你看源碼:Java 線程池(三)提交任務

承上啓下:上一篇文章小豹子講了線程池的實例化過程,粗略介紹了線程池的狀態轉換;這篇文章主要講了我運行線程池時遇到的小問題,以及 execute 方法的源碼理解。java

4 並不算疑難的 Bug

按照咱們的規劃,下一步就應該提交任務,探究線程池執行任務時的內部行爲,但首先,我要提交一個任務嘛。因而,接着上一篇文章的代碼,我提交了一個任務:git

@Test
public void submitTest() {
    // 建立線程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(), 
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread();
            }
        }, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("拒絕服務");
        }
    });
    // 提交任務,該任務爲睡眠 1 秒後打印 Hello
    threadPoolExecutor.submit(new Callable<String>() {
        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            System.out.println("Hello");
            return null;
        }
    });
}
複製代碼

而我並無看到任何輸出,程序也並無睡眠一秒,而是立刻結束了。哦對,我想起來,咱們建立的線程默認是守護線程,當全部用戶線程結束以後,程序就會結束了,並不會理會是否還有守護線程在運行。那麼咱們用一個簡單易行的辦法來解決這個問題 —— 不讓用戶線程結束,讓它多睡一會:github

@Test
public void submitTest() throws InterruptedException {
    // 建立線程池
    ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(5, 10, 60, TimeUnit.SECONDS, 
        new LinkedBlockingQueue<Runnable>(), 
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                return new Thread();
            }
        }, new RejectedExecutionHandler() {
            @Override
            public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                System.out.println("拒絕服務");
        }
    });
    // 提交任務,該任務爲睡眠 1 秒後打印 Hello
    threadPoolExecutor.submit(new Callable<String>() {
        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            System.out.println("Hello");
            return null;
        }
    });
    // 使主線程休眠 5 秒,防止守護線程意外退出
    Thread.sleep(5000L);
}
複製代碼

然而,程序等待 5 秒以後,依舊沒有輸出。個人第一個反應是,我對於線程池的用法不對。是否是還須要調用某個方法來「激活」或者「啓動」線程池?而不管在文檔中,仍是各博客的例子中,我都沒有找到相似的方法。咱們仔細思考一下這個 Bug,產生這樣問題的可能緣由有三:安全

  1. ThreadPoolExecutor 內部代碼有問題
  2. 我對 ThreadPoolExecutor 的使用方法不對
  3. 我設計的 ThreadFactoryRejectedExecutionHandler 有問題

緣由 1,可能性過小,幾乎沒有。那麼緣由二、3,咱們如今無法排除,因而我嘗試構建一個最小可重現錯誤,將 ThreadPoolExecutor 剝離出來,看 Bug 是否重現:bash

最小可重現(minimal reproducible)這個思想是我在翻譯《使用 Rust 開發一個簡單的 Web 應用,第 4 部分 —— CLI 選項解析》時,做者用到的思想。就是在咱們沒法定位 Bug 時,剝離出當前代碼中咱們認爲無關的部分,剝離後觀察 Bug 是否重現,一步步縮小 Bug 的範圍。通俗的說,就是排除法。併發

private class MyThreadFactory implements ThreadFactory{
    @Override
    public Thread newThread(Runnable r) {
        return new Thread();
    }
}

@Test
public void reproducibleTest() throws InterruptedException {
    new MyThreadFactory().newThread(new Runnable() {
        @Override
        public void run() {
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            System.out.println("Hello");
        }
    }).start();
    Thread.sleep(5000L);
}
複製代碼

仍是沒有任何輸出,不過這是一個好消息,這意味着咱們定位了問題所在:如今問題只可能出如今 MyThreadFactory 中,短短 6 行代碼會有什麼問題呢?哎呦(拍大腿),我沒有把 Runnable r 傳給 new Thread() 啊,我一直在執行一個空線程啊,怎麼可能有任何輸出!因而:return new Thread(r); 這樣一改就行了。ide

5 重構

上面的問題看似簡單,但能出現這麼低級的錯誤,值得我思考。我由於產生該錯誤的緣由有二:post

  1. 我不瞭解 ThreadPoolExecutor 的原理,從語法上看 ThreadFactory 的實現類只須要傳出一個 Thread 實例就好了,殊不知 Runnable r 不可或缺。
  2. 測試代碼結構凌亂不堪。即使是測試代碼,也不該該寫成麪條,本身看尚不能清楚明瞭,何談讀者?

因而,我決定對測試代碼進行重構。此次重構,一要使線程工廠產生非守護線程,防止由於主進程的退出致使線程池中線程所有意外退出;二要對每一個操做打日誌,咱們要能直觀的觀察到線程池在作什麼,值得一提的是,對於阻塞隊列的日誌操做,我使用了動態代理的方式對每個方法打日誌,不熟悉動態代理的童鞋能夠戳我以前寫的小豹子帶你看源碼:JDK 動態代理測試

// import...

public class ThreadPoolExecutorTest {
    /**
     * 記錄啓動時間
     */
    private final static long START_TIME = System.currentTimeMillis();

    /**
     * 自定義線程工廠,產生非守護線程,並打印日誌
     */
    private class MyThreadFactory implements ThreadFactory {
        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setDaemon(false);
            debug("建立線程 - %s", thread.getName());
            return thread;
        }
    }

    /**
     * 自定義拒絕服務異常處理器,打印拒絕服務信息
     */
    private class MyRejectedExecutionHandler implements RejectedExecutionHandler {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
            debug("拒絕請求,Runnable:%s,ThreadPoolExecutor:%s", r, executor);
        }
    }

    /**
     * 自定義任務,休眠 1 秒後打印當前線程名,並返回線程名
     */
    private class MyTask implements Callable<String> {

        @Override
        public String call() throws InterruptedException {
            Thread.sleep(1000L);
            String threadName = Thread.currentThread().getName();
            debug("MyTask - %s", threadName);
            return threadName;
        }
    }

    /**
     * 對 BlockingQueue 的動態代理,實現對 BlockingQueue 的全部方法調用打 Log
     */
    private class PrintInvocationHandler implements InvocationHandler {
        private final BlockingQueue<?> blockingQueue;

        private PrintInvocationHandler(BlockingQueue<?> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }

        @Override
        public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
            debug("BlockingQueue - %s,參數爲:%s", method.getName(), Arrays.toString(args));
            Object result = method.invoke(blockingQueue, args);
            debug("BlockingQueue - %s 執行完畢,返回值爲:%s", method.getName(), String.valueOf(result));
            return result;
        }
    }

    /**
     * 產生 BlockingQueue 代理類
     * @param blockingQueue 原 BlockingQueue
     * @param <E> 任意類型
     * @return 動態代理 BlockingQueue,執行任何方法時會打 Log
     */
    @SuppressWarnings("unchecked")
    private <E> BlockingQueue<E> debugQueue(BlockingQueue<E> blockingQueue) {
        return (BlockingQueue<E>) Proxy.newProxyInstance(this.getClass().getClassLoader(),
                new Class<?>[]{BlockingQueue.class},
                new PrintInvocationHandler(blockingQueue));
    }

    /**
     * 實例化一個 核心池爲 3,最大池爲 5,存活時間爲 20s,利用上述阻塞隊列、線程工廠、拒絕服務處理器的線程池實例
     * @return 返回 ThreadPoolExecutor 實例
     */
    private ThreadPoolExecutor newTestPoolInstance() {
        return new ThreadPoolExecutor(3, 5, 20,
                TimeUnit.SECONDS, debugQueue(new LinkedBlockingQueue<>()),
                new MyThreadFactory(), new MyRejectedExecutionHandler());
    }

    /**
     * 向控制檯打印日誌,自動輸出時間,線程等信息
     * @param info
     * @param arg
     */
    private void debug(String info, Object... arg) {
        long time = System.currentTimeMillis() - START_TIME;
        System.out.println(String.format(((double) time / 1000) + "-" + Thread.currentThread().getName() + "-" + info, arg));
    }

    /**
     * 測試實例化操做
     */
    private void newInstanceTest() {
        newTestPoolInstance();
    }

    /**
     * 測試提交操做,提交 10 次任務
     */
    private void submitTest() {
        ThreadPoolExecutor threadPool = newTestPoolInstance();
        for (int i = 0; i < 10; i++) {
            threadPool.submit(new MyTask());
        }
    }

    public static void main(String[] args) {
        ThreadPoolExecutorTest test = new ThreadPoolExecutorTest();
        test.submitTest();
    }
}
複製代碼

編譯,運行 =>ui

0.047-main-建立線程 - Thread-0
0.064-main-建立線程 - Thread-1
0.064-main-建立線程 - Thread-2
0.064-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@4d7e1886]
0.064-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.064-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@3cd1a2f1]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@2f0e140b]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@7440e464]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@49476842]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@78308db1]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
0.065-main-BlockingQueue - offer,參數爲:[java.util.concurrent.FutureTask@27c170f0]
0.065-main-BlockingQueue - offer 執行完畢,返回值爲:true
1.065-Thread-1-MyTask - Thread-1
1.065-Thread-0-MyTask - Thread-0
1.065-Thread-2-MyTask - Thread-2
1.065-Thread-1-BlockingQueue - take,參數爲:null
1.065-Thread-0-BlockingQueue - take,參數爲:null
1.065-Thread-2-BlockingQueue - take,參數爲:null
1.065-Thread-0-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@3cd1a2f1
1.065-Thread-2-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@2f0e140b
1.065-Thread-1-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@4d7e1886
2.065-Thread-1-MyTask - Thread-1
2.065-Thread-2-MyTask - Thread-2
2.065-Thread-0-MyTask - Thread-0
2.065-Thread-1-BlockingQueue - take,參數爲:null
2.065-Thread-2-BlockingQueue - take,參數爲:null
2.065-Thread-0-BlockingQueue - take,參數爲:null
2.065-Thread-1-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@7440e464
2.065-Thread-2-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@49476842
2.065-Thread-0-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@78308db1
3.066-Thread-1-MyTask - Thread-1
3.066-Thread-2-MyTask - Thread-2
3.066-Thread-0-MyTask - Thread-0
3.066-Thread-2-BlockingQueue - take,參數爲:null
3.066-Thread-1-BlockingQueue - take,參數爲:null
3.066-Thread-0-BlockingQueue - take,參數爲:null
3.066-Thread-2-BlockingQueue - take 執行完畢,返回值爲:java.util.concurrent.FutureTask@27c170f0
4.067-Thread-2-MyTask - Thread-2
4.067-Thread-2-BlockingQueue - take,參數爲:null
複製代碼

日誌的格式是:時間(秒)-線程名-信息

從日誌輸出中,咱們能夠獲知:

  • 當隊列爲空,線程數少於核心線程數時,提交任務會觸發建立線程,並當即執行任務
  • 當核心線程均忙,再提交的請求會被存儲至阻塞隊列,等待線程空閒後執行隊列中的任務
  • 除主線程外,始終只有三個工做線程
  • 當隊列爲空,工做線程還在運行的時候,工做線程會由於阻塞隊列的 take 方法阻塞(這一點由日誌後幾行能夠看出,只有調用日誌,沒有調用完成的日誌)

由此,我產生一個疑問:爲何始終只有三個線程?個人設置不是「核心池爲 3,最大池爲 5」嗎?爲何只有三個線程在工做呢?

6 submit 任務

終於開始看源碼了,咱們以 submit 爲切入點,探尋咱們提交任務時,線程池作了什麼,submit 方法自己很簡單,就是將傳入參數封裝爲 RunnableFuture 實例,而後調用 execute 方法,如下給出 submit 多個重載方法其中之一:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}
複製代碼

那麼,咱們繼續看 execute 的代碼:

public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
複製代碼

咱們首先解釋一下 addWorker 方法,暫時咱們只須要了解幾件事情就能夠理解 execute 代碼了:

  • 該方法用於新建一個工做線程
  • 該方法線程安全
  • 該方法第一個參數是新線程要執行的第一個任務,第二個參數是是否新建核心線程
  • 該方法若是新建線程成功,則返回 true,不然返回 false

那麼咱們回過頭來理解 execute 代碼:

爲了幫助理解,我根據代碼邏輯畫了一個流程圖:

execute 方法流程圖

如今我明白了,只有等待隊列插入失敗(如達到容量上限等)狀況下,纔會建立非核心線程來處理任務,也就是說,咱們使用的 LinkedBlockingQueue 隊列來做爲等待隊列,那是看不到非核心線程被建立的現象的。

有心的讀者可能注意到了,整個過程沒有加鎖啊,怎樣保證併發安全呢?咱們觀察這段代碼,其實不必所有加鎖,只須要保證 addWorkerremoveworkQueue.offer 三個方法的線程安全,該方法就不必加鎖。事實上,在 addWorker 中是有對線程池狀態的 recheck 的,若是建立失敗會返回 false。

系列文章

小豹子仍是一個大三的學生,小豹子但願你能「批判性的」閱讀本文,對本文內容中不正確、不穩當之處進行嚴厲的批評,小豹子感激涕零。

相關文章
相關標籤/搜索