Java併發編程:線程池ThreadPoolExecutor

  多線程的程序的確能發揮多核處理器的性能。雖然與進程相比,線程輕量化了不少,可是其建立和關閉一樣須要花費時間。並且線程多了之後,也會搶佔內存資源。若是不對線程加以管理的話,是一個很是大的隱患。而線程池的目的就是管理線程。當你須要一個線程時,你就能夠拿一個空閒線程去執行任務,當任務執行完後,線程又會歸還到線程池。這樣就有效的避免了重複建立、關閉線程和線程數量過多帶來的問題。java

Java併發包提供的線程池

 

注:摘自《實戰Java高併發程序設計》多線程

  如圖是Java併發包下提供的線程池功能。其中ExecutorService接口提供一些操做線程池的方法。而Executors至關於一個線程池工廠類,它裏面有幾種現成的具有某種特定功能的線程池工廠方法。看到這些應該不陌生,舉個咱們平時最常使用的例子:併發

//建立一個大小爲10的固定線程池
ExecutorService threadpool= Executors.newScheduledThreadPool(10);

  下面簡單介紹一下這些工廠方法:ide

  newFixedThreadPool()方法:固定線程數量線程池。傳入的數字就是線程的數量,若是有空閒線程就去執行任務,若是沒有空閒線程就會把任務放到一個任務隊列,等到有線程空閒時便去處理隊列中的任務。函數

  newSingleThreadExecutor()方法:只有一個線程的線程池。一樣,超出的任務會被放到任務隊列,等這個線程空閒時就會去按順序處理。高併發

  newCachedThreadPool()方法:能夠根據實際狀況拓展的線程池。當沒有空閒線程去執行新任務時,就會再建立新的線程去執行任務,執行完後新建的線程也會返回線程池進行復用。oop

  newSingleThreadScheduledExecutor()方法:返回的是ScheduledExecutorService對象。ScheduledExecutorService是繼承於ExecutorService的,有一些拓展方法,如指定執行時間。這個線程池大小爲1,在指定時間執行任務。關於指定時間的幾個方法:schedule()是在指定時間後執行一次任務。scheduleAtFixedRate()和方法scheduleWithFixedDelay()方法,二者都是週期性的執行任務,可是前者是以上一次任務開始爲週期起點,後者是以上一次任務結束爲週期起點。具體的參數你們能夠在IDE裏面查看。性能

  newScheduledThreadPool()方法:和上面一個方法同樣,可是能夠指定線程池大小,其實上面那個方法也是調用這個方法的,只是傳入的參數是1。this

線程池核心類

  上面簡單的對Java併發包下線程池的結構和API進行簡單的介紹,下面開始深刻了解一下線程池。若是你們在IDE上追蹤一下上面幾個工廠方法就會發現,其中最後都會調用一個方法,經過上圖其實也能夠發現。那就是ThreadPoolExecutor的構造方法,工廠方法只是幫咱們傳入不一樣的參數,從而實現不一樣的效果,因此若是你想更自由的控制本身的線程池,推薦直接使用ThreadPoolExecutor建立線程池。下面給出這個構造函數的參數列表:spa

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler)

  參數從上到下,做用依次爲:

  1.指定線程池種線程的數量。

  2.線程池種最大的線程數量,也就是最大能拓展到多少。

  3.當線程數量超過corePoolSize,多餘的空閒線程多久會被銷燬。

  4.keepAliveTime的單位。

  5.任務隊列,當空閒線程不夠,也不能再新建線程時,新提交的任務就會被放到任務隊列種。

  6.線程工廠,用於建立線程,默認的便可。

  7.拒絕策略。當任務太多,達到最大線程數量、任務隊列也滿了,該如何拒絕新提交的任務。

任務隊列

  任務隊列是一個BlockingQueue接口,在ThreadPoolExecutor一共有以下幾種實現類實現了BlockingQueue接口。

  SynchronousQueue:直接提交隊列。這種隊列其實不會真正的去保存任務,每提交一個任務就直接讓空閒線程執行,若是沒有空閒線程就去新建,當達到最大線程數時,就會執行拒絕策略。因此使用這種任務隊列時,通常會設置很大的maximumPoolSize,否則很容易就執行了拒絕策略。newCachedThreadPool線程池的corePoolSize爲0,maximumPoolSize無限大,它用的就是直接提交隊列。

  ArrayBlockingQueue:有界任務隊列,其構造函數必須帶一個容量參數,表示任務隊列的大小。當線程數量小於corePoolSize時,有任務進來優先建立線程。當線程數等於corePoolSize時,新任務就會進入任務隊列,當任務隊列滿了,纔會建立新線程,線程數達到maximumPoolSize時執行拒絕策略。

  LinkedBlockingQueue:無界任務隊列,經過它的名字也應該知道了,它是個鏈表,除非沒有空間了,否則不會出現任務隊列滿了的狀況,可是很是耗費系統資源。和有界任務隊列同樣,線程數若小於corePoolSize,新任務進來時沒有空閒線程的話就會建立新線程,當達到corePoolSize時,就會進入任務隊列。會發現沒有maximumPoolSize什麼事,newFixedThreadPool固定大小線程池就是用的這個任務隊列,它的corePoolSize和maximumPoolSize相等。

  PriorityBlockingQueue:優先任務隊列,它是一個特殊的無界隊列,由於它總能保證高優先級的任務先執行。

拒絕策略

  JDK提供了四種拒絕策略。

  AbortPolicy:直接拋出異常,阻止系統正常工做。

  CallerRunsPolicy:若是線程池未關閉,則在調用者線程裏面執行被丟棄的任務,這個策略不是真正的拒絕任務。好比咱們在T1線程中提交的任務,那麼該拒絕策略就會把多餘的任務放到T1線程執行,會影響到提交者線程的性能。

  DiscardOldestPolicy:該策略會丟棄一個最老的任務,也就是即將被執行的任務,而後再次嘗試提交該任務。

  DiscardPolicy:直接丟棄多餘的任務,不作任何處理,若是容許丟棄任務,這個策略是最好的。

  以上內置的拒絕策略都實現了RejectedExecutionHandler接口,因此上面的拒絕策略沒法知足你的要求,能夠自定義一個:繼承RejectedExecutionHandler並實現rejectedExecution方法。

線程工廠

線程池中的線程是由ThreadFactory負責建立的,通常狀況下默認就行,若是有一些其餘的需求,好比自定義線程的名稱、優先級等,咱們也能夠利用ThreadFactory接口來自定義本身的線程工廠:繼承ThreadFactory並實現newThread方法。

線程池的拓展

  在ThreadPoolExecutor中有三個擴展方法:分別會在任務執行前beforeExecute、執行完成afterExecute、線程池退出時執行terminated。

  這幾個方法在哪調用的?在ThreadPoolExecutor中有一個內部類:Worker,每一個線程的任務其實都是由這個類裏面的run方法執行的,貼一下這個類的源碼:

 

private final class Worker
    extends AbstractQueuedSynchronizer
    implements Runnable
{
    /**
     * This class will never be serialized, but we provide a
     * serialVersionUID to suppress a javac warning.
     */
    private static final long serialVersionUID = 6138294804551838833L;

    /** Thread this worker is running in.  Null if factory fails. */
    final Thread thread;
    /** Initial task to run.  Possibly null. */
    Runnable firstTask;
    /** Per-thread task counter */
    volatile long completedTasks;
    //....省略
    
    /** Delegates main run loop to outer runWorker  */
    public void run() {
        runWorker(this);
    }
    //....省略
}

 

  接着進入這個runWorker方法:

final void runWorker(Worker w) {
    //...省略
            try {
                //任務執行前
                beforeExecute(wt, task);
                Throwable thrown = null;
                try {
                    task.run();
                } catch (RuntimeException x) {
                    thrown = x; throw x;
                } catch (Error x) {
                    thrown = x; throw x;
                } catch (Throwable x) {
                    thrown = x; throw new Error(x);
                } finally {
                    //任務執行完
                    afterExecute(task, thrown);
                }
            } 
    //....省略
}

  還有一個線程池退出時執行的方法是在何處執行的?這個方法被調用的地方就不止一處了,像線程池的shutdown方法就會調用

public void shutdown() {
    //....省略。這個方法裏面就會調用terminated
    tryTerminate();
}

  ThreadPoolExecutor中這三個方法默認是沒有任何內容的,因此咱們要自定義它也很簡單,直接重寫它們就好了:

ExecutorService threadpool= new ThreadPoolExecutor(5,5,0L,TimeUnit.SECONDS,new LinkedBlockingDeque<>()){
    @Override
    protected void beforeExecute(Thread t, Runnable r) {
        //執行任務前
    }
    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        //執行任務後
    }
    @Override
    protected void terminated() {
        //線程退出
    }
};
相關文章
相關標籤/搜索