線程池ThreadPoolExecutor

爲何要使用線程池?

這裏寫圖片描述

線程是一個操做系統概念。操做系統負責這個線程的建立、掛起、運行、阻塞和終結操做。而操做系統建立線程、切換線程狀態、終結線程都要進行CPU調度——這是一個耗費時間和系統資源的事情。 
另外一方面,大多數實際場景中是這樣的:處理某一次請求的時間是很是短暫的,可是請求數量是巨大的。這種技術背景下,若是咱們爲每個請求都單首創建一個線程,那麼物理機的全部資源基本上都被操做系統建立線程、切換線程狀態、銷燬線程這些操做所佔用,用於業務請求處理的資源反而減小了。因此最理想的處理方式是,將處理請求的線程數量控制在一個範圍,既保證後續的請求不會等待太長時間,又保證物理機將足夠的資源用於請求處理自己。 
另外,一些操做系統是有最大線程數量限制的。當運行的線程數量逼近這個值的時候,操做系統會變得不穩定。這也是咱們要限制線程數量的緣由。前端

線程池的基本使用方式

JAVA語言爲咱們提供了兩種基礎線程池的選擇:ScheduledThreadPoolExecutor和ThreadPoolExecutor。它們都實現了ExecutorService接口(注意,ExecutorService接口自己和「線程池」並無直接關係,它的定義更接近「執行器」,而「使用線程管理的方式進行實現」只是其中的一種實現方式)。這篇文章中,咱們主要圍繞ThreadPoolExecutor類進行講解。java

ThreadPoolExecutor類的使用方式:

public class PoolThreadSimple {     
    public static void main(String[] args) throws Throwable { 
        /* * corePoolSize:核心大小,線程池初始化的時候,就會有這麼大 
         * * maximumPoolSize:線程池最大線程數 
         * * keepAliveTime:若是當前線程池中線程數大於corePoolSize。
         * * 多餘的線程,在等待keepAliveTime時間後若是尚未新的線程任務指派給它,它就會被回收 
         * * unit:等待時間keepAliveTime的單位 
         * * workQueue:等待隊列。這個對象的設置是本文將重點介紹的內容 
         * */ 
        ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new SynchronousQueue()); 
        for(int index = 0 ; index < 10 ; index ++) { 
            poolExecutor.submit(new PoolThreadSimple.TestRunnable(index)); 
        } 
    } 
    private static class TestRunnable implements Runnable { /** * 這個就是測試用的線程 */ 
        /** * 日誌 */ 
        private static Log LOGGER = LogFactory.getLog(TestRunnable.class); 
        /** * 記錄任務的惟一編號,這樣在日誌中好作識別 */ private Integer index; 
        public TestRunnable(int index) { 
            this.index = index; 
        } 
        /** * @return the index */ 
        public Integer getIndex() { 
            return index; 
        } 
        /* * 線程中,就只作一件事情: * 等待60秒鐘的事件,以便模擬業務操做過程 * */
        @Override 
        public void run() {  
            Thread currentThread = Thread.currentThread(); 
            TestRunnable.LOGGER.info("線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")開始執行==="); 
            synchronized (currentThread) { 
                try { 
                    currentThread.wait(60000); 
                } 
                catch (InterruptedException e) { 
                    TestRunnable.LOGGER.error(e.getMessage(), e); 
                } 
            } 
            TestRunnable.LOGGER.info("線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")執行完成"); 
        } 
    } 
}

下文中,將對線程池中的corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory、handler參數和一些經常使用/不經常使用的設置項進行逐一講解。後端

ThreadPoolExecutor邏輯結構和工做方式

在上面的代碼中,咱們建立線程池的時候使用了ThreadPoolExecutor中最簡單的一個構造函數:數組

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)

構造函數中須要傳入的參數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit和workQueue。要明確理解這些參數(和後續將要介紹的參數)的含義,就首先要搞清楚ThreadPoolExecutor線程池的邏輯結構。 
[圖片] 
必定要注意一個概念,即存在於線程池中容器的必定是Thread對象,而不是你要求運行的任務(因此叫線程池而不叫任務池也不叫對象池);你要求運行的任務將被線程池分配給某一個空閒的Thread運行。 
從上圖中,咱們能夠看到構成線程池的幾個重要元素: 
● 等待隊列:顧名思義,就是你調用線程池對象的submit()方法或者execute()方法,要求線程池運行的任務(這些任務必須實現Runnable接口或者Callable接口)。可是出於某些緣由線程池並無立刻運行這些任務,而是送入一個隊列等待執行。緩存

● 核心線程:線程池主要用於執行任務的是「核心線程」,「核心線程」的數量是你建立線程時所設置的corePoolSize參數決定的。若是不進行特別的設定,線程池中始終會保持corePoolSize數量的線程數(不包括建立階段)。安全

● 非核心線程:一旦任務數量過多(由等待隊列的特性決定),線程池將建立「非核心線程」臨時幫助運行任務。你設置的大於corePoolSize參數小於maximumPoolSize參數的部分,就是線程池能夠臨時建立的「非核心線程」的最大數量。這種狀況下若是某個線程沒有運行任何任務,在等待keepAliveTime時間後,這個線程將會被銷燬,直到線程池的線程數量從新達到corePoolSize。less

● maximumPoolSize參數也是當前線程池容許建立的最大線程數量。那麼若是設置的corePoolSize參數和設置的maximumPoolSize參數一致時,線程池在任何狀況下都不會回收空閒線程。keepAliveTime和timeUnit也就失去了意義。ide

● keepAliveTime參數和timeUnit參數也是配合使用的。keepAliveTime參數指明等待時間的量化值,timeUnit指明量化值單位。例如keepAliveTime=1,timeUnit爲TimeUnit.MINUTES,表明空閒線程的回收閥值爲1分鐘。函數

說完了線程池的邏輯結構,下面咱們討論一下線程池是怎樣處理某一個運行任務的。 
一、首先能夠經過線程池提供的submit()方法或者execute()方法,要求線程池執行某個任務。線程池收到這個要求執行的任務後,會有幾種處理狀況: 
1.一、若是當前線程池中運行的線程數量尚未達到corePoolSize大小時,線程池會建立一個新的線程運行你的任務,不管以前已經建立的線程是否處於空閒狀態。 
1.二、若是當前線程池中運行的線程數量已經達到設置的corePoolSize大小,線程池會把你的這個任務加入到等待隊列中。直到某一個的線程空閒了,線程池會根據設置的等待隊列規則,從隊列中取出一個新的任務執行。 
1.三、若是根據隊列規則,這個任務沒法加入等待隊列。這時線程池就會建立一個「非核心線程」直接運行這個任務。注意,若是這種狀況下任務執行成功,那麼當前線程池中的線程數量必定大於corePoolSize。 
1.四、若是這個任務,沒法被「核心線程」直接執行,又沒法加入等待隊列,又沒法建立「非核心線程」直接執行,且你沒有爲線程池設置RejectedExecutionHandler。這時線程池會拋出RejectedExecutionException異常,即線程池拒絕接受這個任務。(實際上拋出RejectedExecutionException異常的操做,是ThreadPoolExecutor線程池中一個默認的RejectedExecutionHandler實現:AbortPolicy,這在後文會提到) 
二、一旦線程池中某個線程完成了任務的執行,它就會試圖到任務等待隊列中拿去下一個等待任務(全部的等待任務都實現了BlockingQueue接口,按照接口字面上的理解,這是一個可阻塞的隊列接口),它會調用等待隊列的poll()方法,並停留在哪裏。 
三、當線程池中的線程超過你設置的corePoolSize參數,說明當前線程池中有所謂的「非核心線程」。那麼當某個線程處理完任務後,若是等待keepAliveTime時間後仍然沒有新的任務分配給它,那麼這個線程將會被回收。線程池回收線程時,對所謂的「核心線程」和「非核心線程」是一視同仁的,直到線程池中線程的數量等於你設置的corePoolSize參數時,回收過程纔會中止。工具

不經常使用的設置

在ThreadPoolExecutor線程池中,有一些不經常使用的甚至不須要的設置

allowCoreThreadTimeOut:

線程池回收線程只會發生在當前線程池中線程數量大於corePoolSize參數的時候;當線程池中線程數量小於等於corePoolSize參數的時候,回收過程就會中止。 
allowCoreThreadTimeOut設置項能夠要求線程池:將包括「核心線程」在內的,沒有任務分配的任何線程,在等待keepAliveTime時間後所有進行回收:

ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(1)); 
poolExecutor.allowCoreThreadTimeOut(true);

如下是設置前的效果: 
[圖片] 
如下是設置後的效果: 
[圖片]

prestartAllCoreThreads

前文咱們還討論到,當線程池中的線程尚未達到你設置的corePoolSize參數值的時候,若是有新的任務到來,線程池將建立新的線程運行這個任務,不管以前已經建立的線程是否處於空閒狀態。這個描述能夠用下面的示意圖表示出來: 
[圖片] 
prestartAllCoreThreads設置項,能夠在線程池建立,但尚未接收到任何任務的狀況下,先行建立符合corePoolSize參數值的線程數:

ThreadPoolExecutor poolExecutor =new ThreadPoolExecutor(5,10,1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1));
poolExecutor.prestartAllCoreThreads();

咱們繼續討論ThreadPoolExecutor線程池。上面給出的最簡單的ThreadPoolExecutor線程池的使用方式中,咱們只採用了ThreadPoolExecutor最簡單的一個構造函數:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)

實際上ThreadPoolExecutor線程池有不少種構造函數,其中最複雜的一種構造函數是:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)

在上文中咱們尚未介紹的workQueue、threadFactory和handler參數,將是本文講解的重點。

一:ThreadFactory的使用

線程池最主要的一項工做,就是在知足某些條件的狀況下建立線程。而在ThreadPoolExecutor線程池中,建立線程的工做交給ThreadFactory來完成。要使用線程池,就必需要指定ThreadFactory。 
相似於上文中,若是咱們使用的構造函數時並無指定使用的ThreadFactory,這個時候ThreadPoolExecutor會使用一個默認的ThreadFactory:DefaultThreadFactory。(這個類在Executors工具類中)

固然,在某些特殊業務場景下,還可使用一個自定義的ThreadFactory線程工廠,以下代碼片斷:

import java.util.concurrent.ThreadFactory; 
/** * 測試自定義的一個線程工廠 */ 
public class TestThreadFactory implements ThreadFactory { 
    @Override 
    public Thread newThread(Runnable r) { 
        return new Thread(r); 
    } 
}

二:線程池的等待隊列

在使用ThreadPoolExecutor線程池的時候,須要指定一個實現了BlockingQueue接口的任務等待隊列。在ThreadPoolExecutor線程池的API文檔中,一共推薦了三種等待隊列,它們是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue;

隊列和棧

● 隊列:是一種特殊的線性結構,容許在線性結構的前端進行刪除/讀取操做;容許在線性結構的後端進行插入操做;這種線性結構具備「先進先出」的操做特色: 
[圖片] 
可是在實際應用中,隊列中的元素有可能不是以「進入的順序」爲排序依據的。例如咱們將要講到的PriorityBlockingQueue隊列。 
● 棧:棧也是一種線性結構,可是棧和隊列相比只容許在線性結構的一端進行操做,入棧和出棧都是在一端完成。 
[圖片]

2.1有限隊列

● SynchronousQueue:

「是這樣 一種阻塞隊列,其中每一個 put 必須等待一個 take,反之亦然。同步隊列沒有任何內部容量。翻譯一下:這是一個內部沒有任何容量的阻塞隊列,任何一次插入操做的元素都要等待相對的刪除/讀取操做,不然進行插入操做的線程就要一直等待,反之亦然。

SynchronousQueue<Object> queue = new SynchronousQueue<Object>(); 
// 不要使用add,由於這個隊列內部沒有任何容量,因此會拋出異常「IllegalStateException」 
//queue.add(new Object()); 
// 操做線程會在這裏被阻塞,直到有其餘操做線程取走這個對象 
queue.put(new Object());

● ArrayBlockingQueue:

一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。新元素插入到隊列的尾部,隊列獲取操做則是從隊列頭部開始得到元素。這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞。

// 咱們建立了一個ArrayBlockingQueue,而且設置隊列空間爲2 
ArrayBlockingQueue<Object> arrayQueue = new ArrayBlockingQueue<Object>(2); 
// 插入第一個對象 
arrayQueue.put(new Object()); 
// 插入第二個對象 
arrayQueue.put(new Object()); 
// 插入第三個對象時,這個操做線程就會被阻塞。 
arrayQueue.put(new Object()); 
// 請不要使用add操做,和SynchronousQueue的add操做同樣,它們都使用了AbstractQueue中的add實現

2.2無限隊列

● LinkedBlockingQueue:

LinkedBlockingQueue是咱們在ThreadPoolExecutor線程池中經常使用的等待隊列。它能夠指定容量也能夠不指定容量。因爲它具備「無限容量」的特性,因此我仍是將它納入了無限隊列的範疇(實際上任何無限容量的隊列/棧都是有容量的,這個容量就是Integer.MAX_VALUE)。 
LinkedBlockingQueue的實現是基於鏈表結構,而不是相似ArrayBlockingQueue那樣的數組。但實際使用過程當中,不須要關心它的內部實現,若是指定了LinkedBlockingQueue的容量大小,那麼它反映出來的使用特性就和ArrayBlockingQueue相似了。

LinkedBlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<Object>(2); 
linkedQueue.put(new Object()); 
// 插入第二個對象 
linkedQueue.put(new Object()); 
// 插入第三個對象時,這個操做線程就會被阻塞。 
linkedQueue.put(new Object());

=======================================

// 或者以下使用: 
LinkedBlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<Object>(); 
linkedQueue.put(new Object()); 
// 插入第二個對象 
linkedQueue.put(new Object()); 
// 插入第N個對象時,都不會阻塞 
linkedQueue.put(new Object());

● LinkedBlockingDeque

LinkedBlockingDeque是一個基於鏈表的雙端隊列。LinkedBlockingQueue的內部結構決定了它只能從隊列尾部插入,從隊列頭部取出元素;可是LinkedBlockingDeque既能夠從尾部插入/取出元素,還能夠從頭部插入元素/取出元素。

LinkedBlockingDeque linkedDeque = new LinkedBlockingDeque(); 
// push ,能夠從隊列的頭部插入元素 
linkedDeque.push(new TempObject(1)); 
linkedDeque.push(new TempObject(2)); 
linkedDeque.push(new TempObject(3)); 
// poll , 能夠從隊列的頭部取出元素 
TempObject tempObject = linkedDeque.poll(); 
// 這裏會打印 
tempObject.index = 3 
System.out.println("tempObject.index = " + tempObject.getIndex()); 
// put , 能夠從隊列的尾部插入元素 
linkedDeque.put(new TempObject(4)); 
linkedDeque.put(new TempObject(5)); 
// pollLast , 能夠從隊列尾部取出元素 
tempObject = linkedDeque.pollLast(); 
// 這裏會打印 
tempObject.index = 5 
System.out.println("tempObject.index = " + tempObject.getIndex());

● PriorityBlockingQueue

PriorityBlockingQueue是一個按照優先級進行內部元素排序的無限隊列。存放在PriorityBlockingQueue中的元素必須實現Comparable接口,這樣才能經過實現compareTo()方法進行排序。優先級最高的元素將始終排在隊列的頭部;PriorityBlockingQueue不會保證優先級同樣的元素的排序,也不保證當前隊列中除了優先級最高的元素之外的元素,隨時處於正確排序的位置。 
這是什麼意思呢?PriorityBlockingQueue並不保證除了隊列頭部之外的元素排序必定是正確的。請看下面的示例代碼:

PriorityBlockingQueue priorityQueue = new PriorityBlockingQueue(); 
priorityQueue.put(new TempObject(-5)); 
priorityQueue.put(new TempObject(5));
priorityQueue.put(new TempObject(-1)); 
priorityQueue.put(new TempObject(1)); 
// 第一個元素是5 
// 實際上在尚未執行priorityQueue.poll()語句的時候,隊列中的第二個元素不必定是1 
TempObject targetTempObject = priorityQueue.poll(); 
System.out.println("tempObject.index = " + targetTempObject.getIndex()); 
// 第二個元素是1 
targetTempObject = priorityQueue.poll(); 
System.out.println("tempObject.index = " + targetTempObject.getIndex()); 
// 第三個元素是-1 
targetTempObject = priorityQueue.poll(); 
System.out.println("tempObject.index = " + targetTempObject.getIndex()); 
// 第四個元素是-5 
targetTempObject = priorityQueue.poll(); 
System.out.println("tempObject.index = " + targetTempObject.getIndex());
// 這個元素類,必須實現Comparable接口 
private static class TempObject implements Comparable<TempObject> { 
    private int index; public TempObject(int index) { 
        this.index = index;
    } 
    /** * @return the index */ 
    public int getIndex() { 
        return index; 
    } 
    /* (non-Javadoc) * @see java.lang.Comparable#compareTo(java.lang.Object) */ 
    @Override 
    public int compareTo(TempObject o) { 
        return o.getIndex() - this.index; 
    } 
}

● LinkedTransferQueue

LinkedTransferQueue也是一個無限隊列,它除了具備通常隊列的操做特性外(先進先出),還具備一個阻塞特性:LinkedTransferQueue能夠由一對生產者/消費者線程進行操做,當消費者將一個新的元素插入隊列後,消費者線程將會一直等待,直到某一個消費者線程將這個元素取走,反之亦然。 
LinkedTransferQueue的操做特性能夠由下面這段代碼提現。在下面的代碼片斷中,有兩中類型的線程:生產者和消費者,這兩類線程互相等待對方的操做:

/** * 生產者線程 */ 
private static class ProducerRunnable implements Runnable { 
    private LinkedTransferQueue linkedQueue; 
    public ProducerRunnable(LinkedTransferQueue linkedQueue) { 
        this.linkedQueue = linkedQueue; 
        } 
    @Override 
    public void run() { 
        for(int index = 1 ; ; index++) { 
            try { 
                // 向LinkedTransferQueue隊列插入一個新的元素 
                // 而後生產者線程就會等待,直到有一個消費者將這個元素從隊列中取走 
                this.linkedQueue.transfer(new TempObject(index)); 
            } catch (InterruptedException e) { 
                e.printStackTrace(System.out); 
            } 
        } 
    } 
} 
/** * 消費者線程 */ 
private static class ConsumerRunnable implements Runnable { 
    private LinkedTransferQueue linkedQueue; 
    public ConsumerRunnable(LinkedTransferQueue linkedQueue) { 
        this.linkedQueue = linkedQueue; 
    } 
    @Override 
    public void run() { 
        Thread currentThread = Thread.currentThread(); 
        while(!currentThread.isInterrupted()) { 
            try { 
                // 等待,直到從LinkedTransferQueue隊列中獲得一個元素 
                TempObject targetObject = this.linkedQueue.take(); 
                System.out.println("線程(" + currentThread.getId() + ")取得targetObject.index = " + targetObject.getIndex()); 
            } catch (InterruptedException e) { 
                e.printStackTrace(System.out); 
            } 
        } 
    } 
}

=====如下是啓動代碼:

LinkedTransferQueue<TempObject> linkedQueue = new LinkedTransferQueue<TempObject>(); 
// 這是一個生產者線程 
Thread producerThread = new Thread(new ProducerRunnable(linkedQueue)); 
// 這裏有兩個消費者線程 
Thread consumerRunnable1 = new Thread(new ConsumerRunnable(linkedQueue)); 
Thread consumerRunnable2 = new Thread(new ConsumerRunnable(linkedQueue)); 
// 開始運行 
producerThread.start(); 
consumerRunnable1.start(); 
consumerRunnable2.start(); 
// 這裏只是爲了main不退出,沒有任何演示含義 
Thread currentThread = Thread.currentThread(); 
synchronized (currentThread) { 
    currentThread.wait(); 
}

三:拒絕任務(handler)

在ThreadPoolExecutor線程池中還有一個重要的接口:RejectedExecutionHandler。當提交給線程池的某一個新任務沒法直接被線程池中「核心線程」直接處理,又沒法加入等待隊列,也沒法建立新的線程執行;又或者線程池已經調用shutdown()方法中止了工做;又或者線程池不是處於正常的工做狀態;這時候ThreadPoolExecutor線程池會拒絕處理這個任務,觸發建立ThreadPoolExecutor線程池時定義的RejectedExecutionHandler接口的實現

在建立ThreadPoolExecutor線程池時,必定會指定RejectedExecutionHandler接口的實現。若是調用的是不須要指定RejectedExecutionHandler接口的構造函數,如:

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue); 
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory);

那麼ThreadPoolExecutor線程池在建立時,會使用一個默認的RejectedExecutionHandler接口實現,源代碼片斷以下:

public class ThreadPoolExecutor extends AbstractExecutorService { 
    ...... 
    /** * The default rejected execution handler */ 
    private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); 
    ...... 
    // 能夠看到,ThreadPoolExecutor中的兩個沒有指定RejectedExecutionHandler 
    // 接口的構造函數,都是使用了一個RejectedExecutionHandler接口的默認實現:
    AbortPolicy public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { 
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); 
    } 
    ...... 
    public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { 
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); 
    } 
    ...... 
}

實際上,在ThreadPoolExecutor中已經提供了四種能夠直接使用的RejectedExecutionHandler接口的實現:

● CallerRunsPolicy: 
這個拒絕處理器,將直接運行這個任務的run方法。可是,請注意並非在ThreadPoolExecutor線程池中的線程中運行,而是直接調用這個任務實現的run方法。源代碼以下:

public static class CallerRunsPolicy implements RejectedExecutionHandler { 
    /** * Creates a {@code CallerRunsPolicy}. */ 
    public CallerRunsPolicy() { } 
    /** Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. 
     ** @param r the runnable task requested to be executed * @param e the executor attempting to execute this task 
     **/ 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
        if (!e.isShutdown()) { 
            r.run(); 
        } 
    } 
}

● AbortPolicy:

這個處理器,在任務被拒絕後會建立一個RejectedExecutionException異常並拋出。這個處理過程也是ThreadPoolExecutor線程池默認的RejectedExecutionHandler實現。

● DiscardPolicy: 
DiscardPolicy處理器,將會默默丟棄這個被拒絕的任務,不會拋出異常,也不會經過其餘方式執行這個任務的任何一個方法,更不會出現任何的日誌提示。

● DiscardOldestPolicy: 
這個處理器頗有意思。它會檢查當前ThreadPoolExecutor線程池的等待隊列。並調用隊列的poll()方法,將當前處於等待隊列列頭的等待任務強行取出,而後再試圖將當前被拒絕的任務提交到線程池執行:

public static class DiscardOldestPolicy implements RejectedExecutionHandler { 
    ...... 
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { 
        if (!e.isShutdown()) { 
            e.getQueue().poll(); 
            e.execute(r); 
        } 
    } 
    ...... 
}

實際上查閱這四種ThreadPoolExecutor線程池自帶的拒絕處理器實現,您能夠發現CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy處理器針對被拒絕的任務並非一個很好的處理方式。  CallerRunsPolicy在非線程池之外直接調用任務的run方法,可能會形成線程安全上的問題;DiscardPolicy默默的忽略掉被拒絕任務,也沒有輸出日誌或者提示,開發人員不會知道線程池的處理過程出現了錯誤;DiscardOldestPolicy中e.getQueue().poll()的方式好像是科學的,可是若是等待隊列出現了容量問題,大多數狀況下就是這個線程池的代碼出現了BUG。最科學的的仍是AbortPolicy提供的處理方式:拋出異常,由開發人員進行處理。

相關文章
相關標籤/搜索