核心線程池的內部實現(讀書筆記)

     對於核心的幾個線程池,不管是newFixedThreadPool()方法,newSingleThreadExecutor()仍是newCachedThreadPool()方法,雖然看起來建立的線程有着徹底不一樣的功能特色,但其內部實現均使用了ThreadPoolExecutor實現,下面給出了三個線程池的實現方式.
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
  由以上線程池的實現代碼能夠看到,他們都是ThreadPoolExecutor類的封裝. 讓咱們看一下ThreadPoolExecutor最重要的構造器:
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) 
 
函數的參數以下:
  • corePoolSize 指定線程池中的線程數量
  • maximumPoolSize 指定了線程池中的最大線程數量
  • keepAliveTime 當前線程池數量超過corePoolSize時,多餘的空閒線程的存活時間,
  • uintkeepAliveTime的單位
  • workQueue 隊伍隊列,被提交但還沒有被執行的任務.
  • threadFactory:線程工廠 用於建立線程,通常用默認便可
  • handler 拒絕策略 當任務太多來不及處理,如何拒絕任務
 
     以上參數中,大多數都很簡單,只有workQueue和handler須要進行詳細說明.
參數workQueue指被提交但未執行的任務隊列,他是一個BlockingQueue接口的對象,用於存放Runable對象,根據隊列功能分類,子ThreadPoolExecutor的構造函數中使用一下幾種BlockIngQueue.
  • 直接提交的隊列,改功能由synchronousQueue對象提供,SynchronousQueue是一個特殊的BlockingQueue.這個隊列沒有容量,每個插入操做都要等待一個響應的刪除操做,反之,每個刪除操做都要等待對應的插入操做,若是使用SynchronousQueue,提交的任務不會真實的保存,而老是將新任務提交給線程執行, 若是沒有空閒的進程,則嘗試建立新的進程,若是進程數量已經達到最大值,則執行拒絕策略,使用SynchronousQueue隊列,一般要設置很大的maximumPoolSize值,不然很容易執行拒絕策略.
  • 有界的任務隊列,有界的任務隊列可使用ArrayBlockingQueue實現,ArrayBlockingQueue的構造函數必須帶一個容量參數,表示該隊列的最大容量,如寫所示:
public ArrayBlockingQueue(int capacity)
          當使用有界的任務隊列時,如有新的任務須要執行,若是線程池的實際線程數小於corePoolSize,則會優先建立新的新線程,若大於corePoolSize,則會將新任務加入等待隊列,若等待隊列已經滿,沒法加入,則在總線程不大於maximumPoolSize的前提下,建立新的線程執行任務,若大於maximumPoolSize,則執行拒絕策略,可見,有界隊列金當在任務隊列裝滿時,纔可能將線程數量提高到corePoolSize以上,換言之,除非系統很是繁忙.不然確保核心線程維持在corePoolSize.
  • 無界的任務隊列:無界的任務隊列能夠經過LinkedBlockingQueue類實現,與有界隊列相比,除非系統資源耗盡,不然無界的任務隊列不存在任務入隊失敗的狀況,當有新的任務到來,系統的線程數小於corePoolSize時,線程池會生成新的線程執行任務,但當系統的線程數達到corePoolSize後,就並不會繼續增長,若後續仍有席你的任務加入,而又沒有空閒的線程資源,責任務直接進入對列等待,若任務建立和處理的速度差別很大,無界隊列會保持快速增加,直到耗盡系統內存.
  • 優先任務隊列:優先任務隊列是帶有執行優先級的隊列,它經過PriorityBlockingQueue實現,能夠控制任務的執行順序,他是一個特殊的無界隊列,不管是有界隊列ArrayBlockingQueue,仍是未指定大小的無界隊列LinkedBlockingQueue都是按照先進先出的算法處理任務的,而PriorityBlockingQueue則能夠根據自身的優先級順序前後執行,確保系統性能的同時,也能有很好的質量保證.
回顧newFixedThreadPool()的方法實現,
public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
     咱們發現它用了corePoolSize和maximumPoolSize大小同樣,而且使用了LingkedBlockingQueue任務隊列的線程池.由於固定大小的線程池而言,不存在線程數量的動態變化,同時它使用無界隊列存放沒法當即執行的任務,當任務提交很是頻繁的時候,改隊列可能迅速膨脹.從而耗盡系統性能.
     newSingleThreadExecutor()返回的單線程線程池,是newFixedThreadPool()方法的一種退化,只是簡單的將線程池線程數量設置爲1
     newCachedThreadPool()方法的實現:
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>(),
                                  threadFactory);
}
 
這就意味着無任務時,線程池內無線程,而當任務提交時,該線程池會使用空閒的線程執行任務,若無空閒線程,則將任務加入SynchronousQueue隊列,而SynchronousQueue隊列是一種直接提交的隊列,他總會迫使線程池增長新的線程執行任務,.當任務執行完畢後,因爲corePoolSize爲0 所以空閒線程又會在指定的60s內回收.
     對於這個線程池,若是同時有大量任務被提交,而任務的執行又不那麼快,那麼系統便會開啓等量的線程處理,這樣作法可能會很快耗盡系統的資源,
這裏咱們看一看ThreadPoolExecutor線程池的核心調度代碼,這段代碼也充分體現了上述線程池的工做邏輯:
public void execute(Runnable command) {
    if (command == null)
        throw new NullPointerException();
    /*
     * Proceed in 3 steps:
     *
     * 1. If fewer than corePoolSize threads are running, try to
     * start a new thread with the given command as its first
     * task.  The call to addWorker atomically checks runState and
     * workerCount, and so prevents false alarms that would add
     * threads when it shouldn't, by returning false.
     *
     * 2. If a task can be successfully queued, then we still need
     * to double-check whether we should have added a thread
     * (because existing ones died since last checking) or that
     * the pool shut down since entry into this method. So we
     * recheck state and if necessary roll back the enqueuing if
     * stopped, or start a new thread if there are none.
     *
     * 3. If we cannot queue task, then we try to add a new
     * thread.  If it fails, we know we are shut down or saturated
     * and so reject the task.
     */
    int c = ctl.get();
    if (workerCountOf(c) < corePoolSize) {
        if (addWorker(command, true))
            return;
        c = ctl.get();
    }
    if (isRunning(c) && workQueue.offer(command)) {
        int recheck = ctl.get();
        if (! isRunning(recheck) && remove(command))
            reject(command);
        else if (workerCountOf(recheck) == 0)
            addWorker(null, false);
    }
    else if (!addWorker(command, false))
        reject(command);
}
 
     workerCountOf()方法取得了當前線程池的線程總數,當線程總數小於corePoolSize核心線程數時,會將任務經過addWorker()方法直接調度執行,不然則在workQueue.offer()進入等待隊列,若是進入等待隊列失敗,則會執行將任務直接提交給線程池,若是當期已經達到maximumPoolSize,則提交失敗,執行拒絕策略.
  • 超負載了怎麼辦:拒絕策略
ThreadPoolExecutor的最後一個參數制定了拒絕策略,也就是當任務數量超過系統實際承載能力時,該如何處理呢?這時候就要用到拒絕策略了,,
JDK內置提供了四種拒絕策略.  
  • AbortPolicy策略:該策略會直接拋出異常,阻止系統正常工做
  • CallerRunsPolicy策略,只要線程池未關閉,該策略直接在調用者線程中,運行當前被丟棄的任務.顯然這樣作不會真的丟棄任務,可是任務提交線程的性能極有可能會急劇降低.
  • DiscardOledestPolicy策略: 改策略將丟棄最古老的一個請求,也就是即將被執行的一個任務,並嘗試再次提交當前任務.
  • DiscardPolicy策略,該策略默默的丟棄沒法處理的任務,不與任何處理,若是容許人物丟棄,我以爲這多是最好的一種方案了吧!
以上內置策略均實現了RejectedExecutionHandler接口 若以上策略仍沒法知足實際應用須要,徹底能夠本身拓展RejectedExecutionHandler接口 定義以下:
/**
 * A handler for tasks that cannot be executed by a {@link ThreadPoolExecutor}.
 *
 * @since 1.5
 * @author Doug Lea
 */
public interface RejectedExecutionHandler {

    /**
     * Method that may be invoked by a {@link ThreadPoolExecutor} when
     * {@link ThreadPoolExecutor#execute execute} cannot accept a
     * task.  This may occur when no more threads or queue slots are
     * available because their bounds would be exceeded, or upon
     * shutdown of the Executor.
     *
     * <p>In the absence of other alternatives, the method may throw
     * an unchecked {@link RejectedExecutionException}, which will be
     * propagated to the caller of {@code execute}.
     *
     * @param r the runnable task requested to be executed
     * @param executor the executor attempting to execute this task
     * @throws RejectedExecutionException if there is no remedy
     */
    void rejectedExecution(Runnable r, ThreadPoolExecutor executor);
}
其中r爲請求執行的任務,executor爲當前線程池.
 
咱們簡單的自定義線程池和拒絕策略的使用:
public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        /**
         * When an object implementing interface <code>Runnable</code> is used
         * to create a thread, starting the thread causes the object's
         * <code>run</code> method to be called in that separately executing
         * thread.
         * <p>
         * The general contract of the method <code>run</code> is that it may
         * take any action whatsoever.
         *
         * @see Thread#run()
         */
        @Override
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

        public static void main(String[] args) throws InterruptedException {
            MyTask task = new MyTask();
            ExecutorService es = new ThreadPoolExecutor(5, 5,
                    0L, TimeUnit.MILLISECONDS,
                    new LinkedBlockingQueue<>(10),
                    Executors.defaultThreadFactory(),
                    (r, executor) -> System.out.println(r.toString() + " is discard"));
            for (int i = 0; i < Integer.MAX_VALUE; i++) {
                es.submit(task);
                Thread.sleep(10);
            }
        }
    }
}
     上訴代碼咱們自定義了一個線程池,該池子有5個常駐線程,而且最大的線程數也是5個,這和固定大小的線程池是同樣的,可是他卻擁有一個只有10個容量的等待隊列,由於使用無界隊列極可能不是最佳解決方案,若是任務量極大,極可能會吧內存呈爆,給一個合理的隊列大小,也合乎常理的選擇,同時,這裏定義了拒絕策略,.咱們不拋出異常,由於萬一在任務提交端沒有進行異常處理,則有可能使得整個系統都崩潰,這極有可能不是咱們但願遇到的,但做爲必要的信息記錄,咱們將任務丟棄的信息進行打印.固然這是比內置的DiscardPolicy策略高級那麼一點點,
     因爲上述代碼中,MyTask執行須要花費100毫秒,所以 必然致使大量的任務被直接丟棄,輸入以下:
     在實際的應用中,咱們能夠將更詳細的信息記錄到日誌中,來分析系統的負載和任務丟失的狀況
相關文章
相關標籤/搜索