Basic Of Concurrency(十九: 線程池)

線程池是併發編程比較典型的知識點了.當你想在應用中限制同一時間並行執行線程數量的時候可使用線程池.對比經過建立新線程的方式線程池有顯著的性能提高且每個線程都能得到必定大小的內存空間用做線程棧存儲.html

能夠將任務經過傳入線程池來代替建立新線程的方式讓任務並行執行.當線程池有閒置線程時,任務將會被分配給這部分線程執行.在線程池內部任務經過插入阻塞隊列的方式來讓閒置線程取出執行.當有新任務到達時,將會由一個閒置線程成功取出後執行.其餘閒置線程會在線程池中阻塞待命,直到有新的任務到達.java

線程池經常使用在多線程服務器上.每個連接經過網絡到達服務器時都會被看成一個任務進入到線程池裏.線程池中的線程將會並行處理連接中的請求.編程

Java5的java.util.concurrent包中已經有現成的實現,所以咱們不須要本身手動實現線程池.但咱們仍然頗有必要知道它的底層實現細節.服務器

這裏提供了一個簡單的線程池實現.你會發現咱們使用了前文阻塞隊列中的實現.然而在正式環境中,咱們可使用Java的內建阻塞隊列來替代.網絡

public class ThreadPool {
    private BlockingQueue<Runnable> taskQueue;
    private List<PoolThread> threads = new ArrayList<>();
    private boolean isStopped = false;

    public ThreadPool(int initThreads, int maxTasks) {
        taskQueue = new BlockingQueue<>(maxTasks);
        IntStream.range(0, initThreads)
                .mapToObj(i -> new PoolThread(taskQueue))
                .forEach(threads::add);
        threads.forEach(PoolThread::start);
    }

    public synchronized void execute(Runnable task) {
        if (isStopped) throw new IllegalStateException("ThreadPool is stopped");
        taskQueue.enqueue(task);
    }

    public synchronized void stop() {
        isStopped = true;
        threads.forEach(PoolThread::doStop);
    }

    private class PoolThread extends Thread {
        private BlockingQueue<Runnable> taskQueue;
        private boolean isStopped = false;

        public PoolThread(BlockingQueue<Runnable> taskQueue) {
            this.taskQueue = taskQueue;
        }

        @Override
        public void run() {
            while (!isStopped()) {
                try {
                    Runnable runnable = taskQueue.dequeue();
                    runnable.run();
                }catch (Exception e){
                    e.printStackTrace();
                }
            }
        }

        public synchronized void doStop() {
            isStopped = true;
            Thread.currentThread().interrupt(); // break pool thread of dequeue() call.
        }

        public synchronized boolean isStopped() {
            return isStopped;
        }
    }
}
複製代碼

一個線程池的實現有兩部分.一個是ThreadPool.class來提供對外的入口.而PoolThread則是繼承了Thread用於建立線程池中的線程來調度任務.多線程

一個任務能夠經過ThreadPool實例的execute(Runnable task)來傳入.execute()接受一個Runnable做爲參數.而Runnable則是任務的載體,任務的執行代碼能夠寫在Runnable的run方法裏.Runnable在線程池內部會被添加到阻塞隊列中,等待調度.併發

阻塞隊列中的Runnable會被閒置的PoolThead取出和執行.你能夠注意到PoolThread實例中的run()方法.隊列爲空時線程會阻塞在dequeue()調用上,直到有新的任務進入隊列線程才被喚醒,從而取出任務執行,這個過程會持續到線程中止爲止.ide

中止線程池能夠經過ThreadPool實例的stop()方法.咱們會注意到stop()方法內部將isStopped變量置換爲true且依次調用線程池中的每個線程的doStop()方法.當調用stop()完成後,再次調用execute()方法將會拋出IllegalStateException異常.post

線程會在並行執行完任務後中止.咱們注意到PoolThread實例doStop()方法中的Thread.currentThread().interrupt()調用.這可以讓阻塞在taskQueue.dequeue()中wait()調用上的線程退出wait()調用,從而退出dequeue()方法拋出一個InterruptedException異常.這個異常會在PoolThread實例的run()方法中捕獲,從而從新進入while(!isStopped)檢查,從而退出run()方法結束線程.性能

該系列博文爲筆者複習基礎所著譯文或理解後的產物,複習原文來自Jakob Jenkov所著Java Concurrency and Multithreading Tutorial

上一篇: 阻塞隊列
下一篇: Compare And Swap

相關文章
相關標籤/搜索