線程是一個操做系統概念。操做系統負責這個線程的建立、掛起、運行、阻塞和終結操做。而操做系統建立線程、切換線程狀態、終結線程都要進行CPU調度——這是一個耗費時間和系統資源的事情。
另外一方面,大多數實際場景中是這樣的:處理某一次請求的時間是很是短暫的,可是請求數量是巨大的。這種技術背景下,若是咱們爲每個請求都單首創建一個線程,那麼物理機的全部資源基本上都被操做系統建立線程、切換線程狀態、銷燬線程這些操做所佔用,用於業務請求處理的資源反而減小了。因此最理想的處理方式是,將處理請求的線程數量控制在一個範圍,既保證後續的請求不會等待太長時間,又保證物理機將足夠的資源用於請求處理自己。
另外,一些操做系統是有最大線程數量限制的。當運行的線程數量逼近這個值的時候,操做系統會變得不穩定。這也是咱們要限制線程數量的緣由。前端
JAVA語言爲咱們提供了兩種基礎線程池的選擇:ScheduledThreadPoolExecutor和ThreadPoolExecutor。它們都實現了ExecutorService接口(注意,ExecutorService接口自己和「線程池」並無直接關係,它的定義更接近「執行器」,而「使用線程管理的方式進行實現」只是其中的一種實現方式)。這篇文章中,咱們主要圍繞ThreadPoolExecutor類進行講解。java
public class PoolThreadSimple { public static void main(String[] args) throws Throwable { /* * corePoolSize:核心大小,線程池初始化的時候,就會有這麼大 * * maximumPoolSize:線程池最大線程數 * * keepAliveTime:若是當前線程池中線程數大於corePoolSize。 * * 多餘的線程,在等待keepAliveTime時間後若是尚未新的線程任務指派給它,它就會被回收 * * unit:等待時間keepAliveTime的單位 * * workQueue:等待隊列。這個對象的設置是本文將重點介紹的內容 * */ ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new SynchronousQueue()); for(int index = 0 ; index < 10 ; index ++) { poolExecutor.submit(new PoolThreadSimple.TestRunnable(index)); } } private static class TestRunnable implements Runnable { /** * 這個就是測試用的線程 */ /** * 日誌 */ private static Log LOGGER = LogFactory.getLog(TestRunnable.class); /** * 記錄任務的惟一編號,這樣在日誌中好作識別 */ private Integer index; public TestRunnable(int index) { this.index = index; } /** * @return the index */ public Integer getIndex() { return index; } /* * 線程中,就只作一件事情: * 等待60秒鐘的事件,以便模擬業務操做過程 * */ @Override public void run() { Thread currentThread = Thread.currentThread(); TestRunnable.LOGGER.info("線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")開始執行==="); synchronized (currentThread) { try { currentThread.wait(60000); } catch (InterruptedException e) { TestRunnable.LOGGER.error(e.getMessage(), e); } } TestRunnable.LOGGER.info("線程:" + currentThread.getId() + " 中的任務(" + this.getIndex() + ")執行完成"); } } }
下文中,將對線程池中的corePoolSize、maximumPoolSize、keepAliveTime、timeUnit、workQueue、threadFactory、handler參數和一些經常使用/不經常使用的設置項進行逐一講解。後端
在上面的代碼中,咱們建立線程池的時候使用了ThreadPoolExecutor中最簡單的一個構造函數:數組
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)
構造函數中須要傳入的參數包括corePoolSize、maximumPoolSize、keepAliveTime、timeUnit和workQueue。要明確理解這些參數(和後續將要介紹的參數)的含義,就首先要搞清楚ThreadPoolExecutor線程池的邏輯結構。
必定要注意一個概念,即存在於線程池中容器的必定是Thread對象,而不是你要求運行的任務(因此叫線程池而不叫任務池也不叫對象池);你要求運行的任務將被線程池分配給某一個空閒的Thread運行。
從上圖中,咱們能夠看到構成線程池的幾個重要元素:
● 等待隊列:顧名思義,就是你調用線程池對象的submit()方法或者execute()方法,要求線程池運行的任務(這些任務必須實現Runnable接口或者Callable接口)。可是出於某些緣由線程池並無立刻運行這些任務,而是送入一個隊列等待執行。緩存
● 核心線程:線程池主要用於執行任務的是「核心線程」,「核心線程」的數量是你建立線程時所設置的corePoolSize參數決定的。若是不進行特別的設定,線程池中始終會保持corePoolSize數量的線程數(不包括建立階段)。安全
● 非核心線程:一旦任務數量過多(由等待隊列的特性決定),線程池將建立「非核心線程」臨時幫助運行任務。你設置的大於corePoolSize參數小於maximumPoolSize參數的部分,就是線程池能夠臨時建立的「非核心線程」的最大數量。這種狀況下若是某個線程沒有運行任何任務,在等待keepAliveTime時間後,這個線程將會被銷燬,直到線程池的線程數量從新達到corePoolSize。less
● maximumPoolSize參數也是當前線程池容許建立的最大線程數量。那麼若是設置的corePoolSize參數和設置的maximumPoolSize參數一致時,線程池在任何狀況下都不會回收空閒線程。keepAliveTime和timeUnit也就失去了意義。ide
● keepAliveTime參數和timeUnit參數也是配合使用的。keepAliveTime參數指明等待時間的量化值,timeUnit指明量化值單位。例如keepAliveTime=1,timeUnit爲TimeUnit.MINUTES,表明空閒線程的回收閥值爲1分鐘。函數
說完了線程池的邏輯結構,下面咱們討論一下線程池是怎樣處理某一個運行任務的。
一、首先能夠經過線程池提供的submit()方法或者execute()方法,要求線程池執行某個任務。線程池收到這個要求執行的任務後,會有幾種處理狀況:
1.一、若是當前線程池中運行的線程數量尚未達到corePoolSize大小時,線程池會建立一個新的線程運行你的任務,不管以前已經建立的線程是否處於空閒狀態。
1.二、若是當前線程池中運行的線程數量已經達到設置的corePoolSize大小,線程池會把你的這個任務加入到等待隊列中。直到某一個的線程空閒了,線程池會根據設置的等待隊列規則,從隊列中取出一個新的任務執行。
1.三、若是根據隊列規則,這個任務沒法加入等待隊列。這時線程池就會建立一個「非核心線程」直接運行這個任務。注意,若是這種狀況下任務執行成功,那麼當前線程池中的線程數量必定大於corePoolSize。
1.四、若是這個任務,沒法被「核心線程」直接執行,又沒法加入等待隊列,又沒法建立「非核心線程」直接執行,且你沒有爲線程池設置RejectedExecutionHandler。這時線程池會拋出RejectedExecutionException異常,即線程池拒絕接受這個任務。(實際上拋出RejectedExecutionException異常的操做,是ThreadPoolExecutor線程池中一個默認的RejectedExecutionHandler實現:AbortPolicy,這在後文會提到)
二、一旦線程池中某個線程完成了任務的執行,它就會試圖到任務等待隊列中拿去下一個等待任務(全部的等待任務都實現了BlockingQueue接口,按照接口字面上的理解,這是一個可阻塞的隊列接口),它會調用等待隊列的poll()方法,並停留在哪裏。
三、當線程池中的線程超過你設置的corePoolSize參數,說明當前線程池中有所謂的「非核心線程」。那麼當某個線程處理完任務後,若是等待keepAliveTime時間後仍然沒有新的任務分配給它,那麼這個線程將會被回收。線程池回收線程時,對所謂的「核心線程」和「非核心線程」是一視同仁的,直到線程池中線程的數量等於你設置的corePoolSize參數時,回收過程纔會中止。工具
在ThreadPoolExecutor線程池中,有一些不經常使用的甚至不須要的設置
線程池回收線程只會發生在當前線程池中線程數量大於corePoolSize參數的時候;當線程池中線程數量小於等於corePoolSize參數的時候,回收過程就會中止。
allowCoreThreadTimeOut設置項能夠要求線程池:將包括「核心線程」在內的,沒有任務分配的任何線程,在等待keepAliveTime時間後所有進行回收:
ThreadPoolExecutor poolExecutor = new ThreadPoolExecutor(5, 10, 1, TimeUnit.MINUTES, new ArrayBlockingQueue(1)); poolExecutor.allowCoreThreadTimeOut(true);
如下是設置前的效果:
如下是設置後的效果:
前文咱們還討論到,當線程池中的線程尚未達到你設置的corePoolSize參數值的時候,若是有新的任務到來,線程池將建立新的線程運行這個任務,不管以前已經建立的線程是否處於空閒狀態。這個描述能夠用下面的示意圖表示出來:
prestartAllCoreThreads設置項,能夠在線程池建立,但尚未接收到任何任務的狀況下,先行建立符合corePoolSize參數值的線程數:
ThreadPoolExecutor poolExecutor =new ThreadPoolExecutor(5,10,1, TimeUnit.MINUTES, new ArrayBlockingQueue<Runnable>(1)); poolExecutor.prestartAllCoreThreads();
咱們繼續討論ThreadPoolExecutor線程池。上面給出的最簡單的ThreadPoolExecutor線程池的使用方式中,咱們只採用了ThreadPoolExecutor最簡單的一個構造函數:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue)
實際上ThreadPoolExecutor線程池有不少種構造函數,其中最複雜的一種構造函數是:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler)
在上文中咱們尚未介紹的workQueue、threadFactory和handler參數,將是本文講解的重點。
線程池最主要的一項工做,就是在知足某些條件的狀況下建立線程。而在ThreadPoolExecutor線程池中,建立線程的工做交給ThreadFactory來完成。要使用線程池,就必需要指定ThreadFactory。
相似於上文中,若是咱們使用的構造函數時並無指定使用的ThreadFactory,這個時候ThreadPoolExecutor會使用一個默認的ThreadFactory:DefaultThreadFactory。(這個類在Executors工具類中)
固然,在某些特殊業務場景下,還可使用一個自定義的ThreadFactory線程工廠,以下代碼片斷:
import java.util.concurrent.ThreadFactory; /** * 測試自定義的一個線程工廠 */ public class TestThreadFactory implements ThreadFactory { @Override public Thread newThread(Runnable r) { return new Thread(r); } }
在使用ThreadPoolExecutor線程池的時候,須要指定一個實現了BlockingQueue接口的任務等待隊列。在ThreadPoolExecutor線程池的API文檔中,一共推薦了三種等待隊列,它們是:SynchronousQueue、LinkedBlockingQueue和ArrayBlockingQueue;
● 隊列:是一種特殊的線性結構,容許在線性結構的前端進行刪除/讀取操做;容許在線性結構的後端進行插入操做;這種線性結構具備「先進先出」的操做特色:
可是在實際應用中,隊列中的元素有可能不是以「進入的順序」爲排序依據的。例如咱們將要講到的PriorityBlockingQueue隊列。
● 棧:棧也是一種線性結構,可是棧和隊列相比只容許在線性結構的一端進行操做,入棧和出棧都是在一端完成。
● SynchronousQueue:
「是這樣 一種阻塞隊列,其中每一個 put 必須等待一個 take,反之亦然。同步隊列沒有任何內部容量。翻譯一下:這是一個內部沒有任何容量的阻塞隊列,任何一次插入操做的元素都要等待相對的刪除/讀取操做,不然進行插入操做的線程就要一直等待,反之亦然。
SynchronousQueue<Object> queue = new SynchronousQueue<Object>(); // 不要使用add,由於這個隊列內部沒有任何容量,因此會拋出異常「IllegalStateException」 //queue.add(new Object()); // 操做線程會在這裏被阻塞,直到有其餘操做線程取走這個對象 queue.put(new Object());
● ArrayBlockingQueue:
一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。新元素插入到隊列的尾部,隊列獲取操做則是從隊列頭部開始得到元素。這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞。
// 咱們建立了一個ArrayBlockingQueue,而且設置隊列空間爲2 ArrayBlockingQueue<Object> arrayQueue = new ArrayBlockingQueue<Object>(2); // 插入第一個對象 arrayQueue.put(new Object()); // 插入第二個對象 arrayQueue.put(new Object()); // 插入第三個對象時,這個操做線程就會被阻塞。 arrayQueue.put(new Object()); // 請不要使用add操做,和SynchronousQueue的add操做同樣,它們都使用了AbstractQueue中的add實現
● LinkedBlockingQueue:
LinkedBlockingQueue是咱們在ThreadPoolExecutor線程池中經常使用的等待隊列。它能夠指定容量也能夠不指定容量。因爲它具備「無限容量」的特性,因此我仍是將它納入了無限隊列的範疇(實際上任何無限容量的隊列/棧都是有容量的,這個容量就是Integer.MAX_VALUE)。
LinkedBlockingQueue的實現是基於鏈表結構,而不是相似ArrayBlockingQueue那樣的數組。但實際使用過程當中,不須要關心它的內部實現,若是指定了LinkedBlockingQueue的容量大小,那麼它反映出來的使用特性就和ArrayBlockingQueue相似了。
LinkedBlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<Object>(2); linkedQueue.put(new Object()); // 插入第二個對象 linkedQueue.put(new Object()); // 插入第三個對象時,這個操做線程就會被阻塞。 linkedQueue.put(new Object());
=======================================
// 或者以下使用: LinkedBlockingQueue<Object> linkedQueue = new LinkedBlockingQueue<Object>(); linkedQueue.put(new Object()); // 插入第二個對象 linkedQueue.put(new Object()); // 插入第N個對象時,都不會阻塞 linkedQueue.put(new Object());
● LinkedBlockingDeque
LinkedBlockingDeque是一個基於鏈表的雙端隊列。LinkedBlockingQueue的內部結構決定了它只能從隊列尾部插入,從隊列頭部取出元素;可是LinkedBlockingDeque既能夠從尾部插入/取出元素,還能夠從頭部插入元素/取出元素。
LinkedBlockingDeque linkedDeque = new LinkedBlockingDeque(); // push ,能夠從隊列的頭部插入元素 linkedDeque.push(new TempObject(1)); linkedDeque.push(new TempObject(2)); linkedDeque.push(new TempObject(3)); // poll , 能夠從隊列的頭部取出元素 TempObject tempObject = linkedDeque.poll(); // 這裏會打印 tempObject.index = 3 System.out.println("tempObject.index = " + tempObject.getIndex()); // put , 能夠從隊列的尾部插入元素 linkedDeque.put(new TempObject(4)); linkedDeque.put(new TempObject(5)); // pollLast , 能夠從隊列尾部取出元素 tempObject = linkedDeque.pollLast(); // 這裏會打印 tempObject.index = 5 System.out.println("tempObject.index = " + tempObject.getIndex());
● PriorityBlockingQueue
PriorityBlockingQueue是一個按照優先級進行內部元素排序的無限隊列。存放在PriorityBlockingQueue中的元素必須實現Comparable接口,這樣才能經過實現compareTo()方法進行排序。優先級最高的元素將始終排在隊列的頭部;PriorityBlockingQueue不會保證優先級同樣的元素的排序,也不保證當前隊列中除了優先級最高的元素之外的元素,隨時處於正確排序的位置。
這是什麼意思呢?PriorityBlockingQueue並不保證除了隊列頭部之外的元素排序必定是正確的。請看下面的示例代碼:
PriorityBlockingQueue priorityQueue = new PriorityBlockingQueue(); priorityQueue.put(new TempObject(-5)); priorityQueue.put(new TempObject(5)); priorityQueue.put(new TempObject(-1)); priorityQueue.put(new TempObject(1)); // 第一個元素是5 // 實際上在尚未執行priorityQueue.poll()語句的時候,隊列中的第二個元素不必定是1 TempObject targetTempObject = priorityQueue.poll(); System.out.println("tempObject.index = " + targetTempObject.getIndex()); // 第二個元素是1 targetTempObject = priorityQueue.poll(); System.out.println("tempObject.index = " + targetTempObject.getIndex()); // 第三個元素是-1 targetTempObject = priorityQueue.poll(); System.out.println("tempObject.index = " + targetTempObject.getIndex()); // 第四個元素是-5 targetTempObject = priorityQueue.poll(); System.out.println("tempObject.index = " + targetTempObject.getIndex());
// 這個元素類,必須實現Comparable接口 private static class TempObject implements Comparable<TempObject> { private int index; public TempObject(int index) { this.index = index; } /** * @return the index */ public int getIndex() { return index; } /* (non-Javadoc) * @see java.lang.Comparable#compareTo(java.lang.Object) */ @Override public int compareTo(TempObject o) { return o.getIndex() - this.index; } }
● LinkedTransferQueue
LinkedTransferQueue也是一個無限隊列,它除了具備通常隊列的操做特性外(先進先出),還具備一個阻塞特性:LinkedTransferQueue能夠由一對生產者/消費者線程進行操做,當消費者將一個新的元素插入隊列後,消費者線程將會一直等待,直到某一個消費者線程將這個元素取走,反之亦然。
LinkedTransferQueue的操做特性能夠由下面這段代碼提現。在下面的代碼片斷中,有兩中類型的線程:生產者和消費者,這兩類線程互相等待對方的操做:
/** * 生產者線程 */ private static class ProducerRunnable implements Runnable { private LinkedTransferQueue linkedQueue; public ProducerRunnable(LinkedTransferQueue linkedQueue) { this.linkedQueue = linkedQueue; } @Override public void run() { for(int index = 1 ; ; index++) { try { // 向LinkedTransferQueue隊列插入一個新的元素 // 而後生產者線程就會等待,直到有一個消費者將這個元素從隊列中取走 this.linkedQueue.transfer(new TempObject(index)); } catch (InterruptedException e) { e.printStackTrace(System.out); } } } } /** * 消費者線程 */ private static class ConsumerRunnable implements Runnable { private LinkedTransferQueue linkedQueue; public ConsumerRunnable(LinkedTransferQueue linkedQueue) { this.linkedQueue = linkedQueue; } @Override public void run() { Thread currentThread = Thread.currentThread(); while(!currentThread.isInterrupted()) { try { // 等待,直到從LinkedTransferQueue隊列中獲得一個元素 TempObject targetObject = this.linkedQueue.take(); System.out.println("線程(" + currentThread.getId() + ")取得targetObject.index = " + targetObject.getIndex()); } catch (InterruptedException e) { e.printStackTrace(System.out); } } } }
=====如下是啓動代碼:
LinkedTransferQueue<TempObject> linkedQueue = new LinkedTransferQueue<TempObject>(); // 這是一個生產者線程 Thread producerThread = new Thread(new ProducerRunnable(linkedQueue)); // 這裏有兩個消費者線程 Thread consumerRunnable1 = new Thread(new ConsumerRunnable(linkedQueue)); Thread consumerRunnable2 = new Thread(new ConsumerRunnable(linkedQueue)); // 開始運行 producerThread.start(); consumerRunnable1.start(); consumerRunnable2.start(); // 這裏只是爲了main不退出,沒有任何演示含義 Thread currentThread = Thread.currentThread(); synchronized (currentThread) { currentThread.wait(); }
在ThreadPoolExecutor線程池中還有一個重要的接口:RejectedExecutionHandler。當提交給線程池的某一個新任務沒法直接被線程池中「核心線程」直接處理,又沒法加入等待隊列,也沒法建立新的線程執行;又或者線程池已經調用shutdown()方法中止了工做;又或者線程池不是處於正常的工做狀態;這時候ThreadPoolExecutor線程池會拒絕處理這個任務,觸發建立ThreadPoolExecutor線程池時定義的RejectedExecutionHandler接口的實現
在建立ThreadPoolExecutor線程池時,必定會指定RejectedExecutionHandler接口的實現。若是調用的是不須要指定RejectedExecutionHandler接口的構造函數,如:
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue); public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory);
那麼ThreadPoolExecutor線程池在建立時,會使用一個默認的RejectedExecutionHandler接口實現,源代碼片斷以下:
public class ThreadPoolExecutor extends AbstractExecutorService { ...... /** * The default rejected execution handler */ private static final RejectedExecutionHandler defaultHandler = new AbortPolicy(); ...... // 能夠看到,ThreadPoolExecutor中的兩個沒有指定RejectedExecutionHandler // 接口的構造函數,都是使用了一個RejectedExecutionHandler接口的默認實現: AbortPolicy public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } ...... public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue workQueue, ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, defaultHandler); } ...... }
實際上,在ThreadPoolExecutor中已經提供了四種能夠直接使用的RejectedExecutionHandler接口的實現:
● CallerRunsPolicy:
這個拒絕處理器,將直接運行這個任務的run方法。可是,請注意並非在ThreadPoolExecutor線程池中的線程中運行,而是直接調用這個任務實現的run方法。源代碼以下:
public static class CallerRunsPolicy implements RejectedExecutionHandler { /** * Creates a {@code CallerRunsPolicy}. */ public CallerRunsPolicy() { } /** Executes task r in the caller's thread, unless the executor * has been shut down, in which case the task is discarded. ** @param r the runnable task requested to be executed * @param e the executor attempting to execute this task **/ public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { r.run(); } } }
● AbortPolicy:
這個處理器,在任務被拒絕後會建立一個RejectedExecutionException異常並拋出。這個處理過程也是ThreadPoolExecutor線程池默認的RejectedExecutionHandler實現。
● DiscardPolicy:
DiscardPolicy處理器,將會默默丟棄這個被拒絕的任務,不會拋出異常,也不會經過其餘方式執行這個任務的任何一個方法,更不會出現任何的日誌提示。
● DiscardOldestPolicy:
這個處理器頗有意思。它會檢查當前ThreadPoolExecutor線程池的等待隊列。並調用隊列的poll()方法,將當前處於等待隊列列頭的等待任務強行取出,而後再試圖將當前被拒絕的任務提交到線程池執行:
public static class DiscardOldestPolicy implements RejectedExecutionHandler { ...... public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { e.getQueue().poll(); e.execute(r); } } ...... }
實際上查閱這四種ThreadPoolExecutor線程池自帶的拒絕處理器實現,您能夠發現CallerRunsPolicy、DiscardPolicy、DiscardOldestPolicy處理器針對被拒絕的任務並非一個很好的處理方式。 CallerRunsPolicy在非線程池之外直接調用任務的run方法,可能會形成線程安全上的問題;DiscardPolicy默默的忽略掉被拒絕任務,也沒有輸出日誌或者提示,開發人員不會知道線程池的處理過程出現了錯誤;DiscardOldestPolicy中e.getQueue().poll()的方式好像是科學的,可是若是等待隊列出現了容量問題,大多數狀況下就是這個線程池的代碼出現了BUG。最科學的的仍是AbortPolicy提供的處理方式:拋出異常,由開發人員進行處理。