Java 阻塞隊列--BlockingQueue

1. 什麼是阻塞隊列?

阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做是:在隊列爲空時,獲取元素的線程會等待隊列變爲非空。當隊列滿時,存儲元素的線程會等待隊列可用。阻塞隊列經常使用於生產者和消費者的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素。java

阻塞隊列提供了四種處理方法:數組

方法\處理方式 拋出異常 返回特殊值 一直阻塞 超時退出
插入方法 add(e) offer(e) put(e) offer(e,time,unit)
移除方法 remove() poll() take() poll(time,unit)
檢查方法 element() peek() 不可用 不可用

異常:是指當阻塞隊列滿時候,再往隊列裏插入元素,會拋出IllegalStateException("Queue full")異常。當隊列爲空時,從隊列裏獲取元素時會拋出NoSuchElementException異常 。緩存


  • 返回特殊值:插入方法會返回是否成功,成功則返回true。移除方法,則是從隊列裏拿出一個元素,若是沒有則返回null
  • 一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put元素,隊列會一直阻塞生產者線程,直到拿到數據,或者響應中斷退出。當隊列空時,消費者線程試圖從隊列裏take元素,隊列也會阻塞消費者線程,直到隊列可用。
  • 超時退出:當阻塞隊列滿時,隊列會阻塞生產者線程一段時間,若是超過必定的時間,生產者線程就會退出。

    詳細介紹BlockingQueue,如下是涉及的主要內容:安全

  • BlockingQueue的核心方法
  • 阻塞隊列的成員的概要介紹
  • 詳細介紹DelayQueue、ArrayBlockingQueue、LinkedBlockingQueue的原理
  • 線程池與BlockingQueue
一、初識阻塞隊列

在新增的Concurrent包中,BlockingQueue很好的解決了多線程中,如何高效安全「傳輸」數據的問題。經過這些高效而且線程安全的隊列類,爲咱們快速搭建高質量的多線程程序帶來極大的便利。本文詳細介紹了BlockingQueue家庭中的全部成員,包括他們各自的功能以及常見使用場景。網絡

BlockingQueue的核心方法:數據結構

public interface BlockingQueue<E> extends Queue<E> { //將給定元素設置到隊列中,若是設置成功返回true, 不然拋出異常。若是是往限定了長度的隊列中設置值,推薦使用offer()方法。 boolean add(E e); //將給定的元素設置到隊列中,若是設置成功返回true, 不然返回false. e的值不能爲空,不然拋出空指針異常。 boolean offer(E e); //將元素設置到隊列中,若是隊列中沒有多餘的空間,該方法會一直阻塞,直到隊列中有多餘的空間。 void put(E e) throws InterruptedException; //將給定元素在給定的時間內設置到隊列中,若是設置成功返回true, 不然返回false. boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException; //從隊列中獲取值,若是隊列中沒有值,線程會一直阻塞,直到隊列中有值,而且該方法取得了該值。 E take() throws InterruptedException; //在給定的時間裏,從隊列中獲取值,若是沒有取到會拋出異常。 E poll(long timeout, TimeUnit unit) throws InterruptedException; //獲取隊列中剩餘的空間。 int remainingCapacity(); //從隊列中移除指定的值。 boolean remove(Object o); //判斷隊列中是否擁有該值。 public boolean contains(Object o); //將隊列中值,所有移除,併發設置到給定的集合中。 int drainTo(Collection<? super E> c); //指定最多數量限制將隊列中值,所有移除,併發設置到給定的集合中。 int drainTo(Collection<? super E> c, int maxElements); } 

在深刻以前先了解下下ReentrantLock 和 Condition:
重入鎖ReentrantLock:
ReentrantLock鎖在同一個時間點只能被一個線程鎖持有;而可重入的意思是,ReentrantLock鎖,能夠被單個線程屢次獲取。
ReentrantLock分爲「公平鎖」和「非公平鎖」。它們的區別體如今獲取鎖的機制上是否公平。「鎖」是爲了保護競爭資源,防止多個線程同時操做線程而出錯,ReentrantLock在同一個時間點只能被一個線程獲取(當某線程獲取到「鎖」時,其它線程就必須等待);ReentraantLock是經過一個FIFO的等待隊列來管理獲取該鎖全部線程的。在「公平鎖」的機制下,線程依次排隊獲取鎖;而「非公平鎖」在鎖是可獲取狀態時,無論本身是否是在隊列的開頭都會獲取鎖。
主要方法:多線程

  • lock()得到鎖
  • lockInterruptibly()得到鎖,但優先響應中斷
  • tryLock()嘗試得到鎖,成功返回true,不然false,該方法不等待,當即返回
  • tryLock(long time,TimeUnit unit)在給定時間內嘗試得到鎖
  • unlock()釋放鎖

Condition:await()、signal()方法分別對應以前的Object的wait()和notify()併發

  • 和重入鎖一塊兒使用
  • await()是當前線程等待同時釋放鎖
  • awaitUninterruptibly()不會在等待過程當中響應中斷
  • signal()用於喚醒一個在等待的線程,還有對應的singalAll()方法
二、阻塞隊列的成員
隊列 有界性 數據結構
ArrayBlockingQueue bounded(有界) 加鎖 arrayList
LinkedBlockingQueue optionally-bounded 加鎖 linkedList
PriorityBlockingQueue unbounded 加鎖 heap
DelayQueue unbounded 加鎖 heap
SynchronousQueue bounded 加鎖
LinkedTransferQueue unbounded 加鎖 heap
LinkedBlockingDeque unbounded 無鎖 heap

下面分別簡單介紹一下:函數

  • ArrayBlockingQueue:是一個用數組實現的有界阻塞隊列,此隊列按照先進先出(FIFO)的原則對元素進行排序。支持公平鎖和非公平鎖。【注:每個線程在獲取鎖的時候可能都會排隊等待,若是在等待時間上,先獲取鎖的線程的請求必定先被知足,那麼這個鎖就是公平的。反之,這個鎖就是不公平的。公平的獲取鎖,也就是當前等待時間最長的線程先獲取鎖】優化

  • LinkedBlockingQueue:一個由鏈表結構組成的有界隊列,此隊列的長度爲Integer.MAX_VALUE。此隊列按照先進先出的順序進行排序。
  • PriorityBlockingQueue: 一個支持線程優先級排序的無界隊列,默認天然序進行排序,也能夠自定義實現compareTo()方法來指定元素排序規則,不能保證同優先級元素的順序。
  • DelayQueue: 一個實現PriorityBlockingQueue實現延遲獲取的無界隊列,在建立元素時,能夠指定多久才能從隊列中獲取當前元素。只有延時期滿後才能從隊列中獲取元素。(DelayQueue能夠運用在如下應用場景:1.緩存系統的設計:能夠用DelayQueue保存緩存元素的有效期,使用一個線程循環查詢DelayQueue,一旦能從DelayQueue中獲取元素時,表示緩存有效期到了。2.定時任務調度。使用DelayQueue保存當天將會執行的任務和執行時間,一旦從DelayQueue中獲取到任務就開始執行,從好比TimerQueue就是使用DelayQueue實現的。)
  • SynchronousQueue: 一個不存儲元素的阻塞隊列,每個put操做必須等待take操做,不然不能添加元素。支持公平鎖和非公平鎖。SynchronousQueue的一個使用場景是在線程池裏。Executors.newCachedThreadPool()就使用了SynchronousQueue,這個線程池根據須要(新任務到來時)建立新的線程,若是有空閒線程則會重複使用,線程空閒了60秒後會被回收。
  • LinkedTransferQueue: 一個由鏈表結構組成的無界阻塞隊列,至關於其它隊列,LinkedTransferQueue隊列多了transfer和tryTransfer方法。
  • LinkedBlockingDeque: 一個由鏈表結構組成的雙向阻塞隊列。隊列頭部和尾部均可以添加和移除元素,多線程併發時,能夠將鎖的競爭最多降到一半。

接下來重點介紹下:ArrayBlockingQueue、LinkedBlockingQueue以及DelayQueue

三、阻塞隊列原理以及使用

(1)DelayQueue

DelayQueue的泛型參數須要實現Delayed接口,Delayed接口繼承了Comparable接口,DelayQueue內部使用非線程安全的優先隊列(PriorityQueue),並使用Leader/Followers模式,最小化沒必要要的等待時間。DelayQueue不容許包含null元素。

Leader/Followers模式:

  1. 有若干個線程(通常組成線程池)用來處理大量的事件
  2. 有一個線程做爲領導者,等待事件的發生;其餘的線程做爲追隨者,僅僅是睡眠。
  3. 假若有事件須要處理,領導者會從追隨者中指定一個新的領導者,本身去處理事件。
  4. 喚醒的追隨者做爲新的領導者等待事件的發生。
  5. 處理事件的線程處理完畢之後,就會成爲追隨者的一員,直到被喚醒成爲領導者。
  6. 假如須要處理的事件太多,而線程數量不夠(可以動態建立線程處理另當別論),則有的事件可能會得不處處理。

全部線程會有三種身份中的一種:leader和follower,以及一個幹活中的狀態:proccesser。它的基本原則就是,永遠最多隻有一個leader。而全部follower都在等待成爲leader。線程池啓動時會自動產生一個Leader負責等待網絡IO事件,當有一個事件產生時,Leader線程首先通知一個Follower線程將其提拔爲新的Leader,而後本身就去幹活了,去處理這個網絡事件,處理完畢後加入Follower線程等待隊列,等待下次成爲Leader。這種方法能夠加強CPU高速緩存類似性,及消除動態內存分配和線程間的數據交換。
參數以及構造函數:

// 可重入鎖 private final transient ReentrantLock lock = new ReentrantLock(); // 存儲隊列元素的隊列——優先隊列 private final PriorityQueue<E> q = new PriorityQueue<E>(); //用於優化阻塞通知的線程元素leader,Leader/Followers模式 private Thread leader = null; //用於實現阻塞和通知的Condition對象 private final Condition available = lock.newCondition(); public DelayQueue() {} public DelayQueue(Collection<? extends E> c) { this.addAll(c); } 

先看offer()方法:

public boolean offer(E e) { final ReentrantLock lock = this.lock; lock.lock(); try { q.offer(e); // 若是原來隊列爲空,重置leader線程,通知available條件 if (q.peek() == e) { leader = null; available.signal(); } return true; } finally { lock.unlock(); } } //由於DelayQueue不限制長度,所以添加元素的時候不會由於隊列已滿產生阻塞,所以帶有超時的offer方法的超時設置是不起做用的 public boolean offer(E e, long timeout, TimeUnit unit) { // 和不帶timeout的offer方法同樣 return offer(e); }

普通的poll()方法:若是延遲時間沒有耗盡的話,直接返回null

public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { E first = q.peek(); if (first == null || first.getDelay(TimeUnit.NANOSECONDS) > 0) return null; else return q.poll(); } finally { lock.unlock(); } }

再看看take()方法:

public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { // 若是隊列爲空,須要等待available條件被通知 E first = q.peek(); if (first == null) available.await(); else { long delay = first.getDelay(TimeUnit.NANOSECONDS); // 若是延遲時間已到,直接返回第一個元素 if (delay <= 0) return q.poll(); // leader線程存在表示有其餘線程在等待,那麼當前線程確定須要等待 else if (leader != null) available.await(); else { Thread thisThread = Thread.currentThread(); leader = thisThread; // 若是沒有leader線程,設置當前線程爲leader線程 // 嘗試等待直到延遲時間耗盡(可能提早返回,那麼下次 // 循環會繼續處理) try { available.awaitNanos(delay); } finally { // 若是leader線程仍是當前線程,重置它用於下一次循環。 // 等待available條件時,鎖可能被其餘線程佔用從而致使 // leader線程被改變,因此要檢查 if (leader == thisThread) leader = null; } } } } } finally { // 若是沒有其餘線程在等待,而且隊列不爲空,通知available條件 if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

最後看看帶有timeout的poll方法:

public E poll(long timeout, TimeUnit unit) throws InterruptedException { long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { for (;;) { E first = q.peek(); if (first == null) { if (nanos <= 0) return null; else // 嘗試等待available條件,記錄剩餘的時間 nanos = available.awaitNanos(nanos); } else { long delay = first.getDelay(TimeUnit.NANOSECONDS); if (delay <= 0) return q.poll(); if (nanos <= 0) return null; // 當leader線程不爲空時(此時delay>=nanos),等待的時間 // 彷佛delay更合理,可是nanos也能夠,由於排在當前線程前面的 // 其餘線程返回時會喚醒available條件從而返回, if (nanos < delay || leader != null) nanos = available.awaitNanos(nanos); else { Thread thisThread = Thread.currentThread(); leader = thisThread; try { long timeLeft = available.awaitNanos(delay); // nanos須要更新 nanos -= delay - timeLeft; } finally { if (leader == thisThread) leader = null; } } } } } finally { if (leader == null && q.peek() != null) available.signal(); lock.unlock(); } }

(2)ArrayBlockingQueue

參數以及構造函數:

// 存儲隊列元素的數組 final Object[] items; // 拿數據的索引,用於take,poll,peek,remove方法 int takeIndex; // 放數據的索引,用於put,offer,add方法 int putIndex; // 元素個數 int count; // 可重入鎖 final ReentrantLock lock; // notEmpty條件對象,由lock建立 private final Condition notEmpty; // notFull條件對象,由lock建立 private final Condition notFull; public ArrayBlockingQueue(int capacity) { this(capacity, false);//默認構造非公平鎖的阻塞隊列 } public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //初始化ReentrantLock重入鎖,出隊入隊擁有這同一個鎖 lock = new ReentrantLock(fair); //初始化非空等待隊列 notEmpty = lock.newCondition(); //初始化非滿等待隊列 notFull = lock.newCondition(); } public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; //將集合添加進數組構成的隊列中 try { for (E e : c) { checkNotNull(e); items[i++] = e; } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } }

添加的實現原理:

這裏的add方法和offer方法最終調用的是enqueue(E x)方法,其方法內部經過putIndex索引直接將元素添加到數組items中,這裏可能會疑惑的是當putIndex索引大小等於數組長度時,須要將putIndex從新設置爲0,這是由於當前隊列執行元素獲取時老是從隊列頭部獲取,而添加元素從中從隊列尾部獲取因此當隊列索引(從0開始)與數組長度相等時,下次咱們就須要從數組頭部開始添加了,以下圖演示

//入隊操做 private void enqueue(E x) { final Object[] items = this.items; //經過putIndex索引對數組進行賦值 items[putIndex] = x; //索引自增,若是已經是最後一個位置,從新設置 putIndex = 0; if (++putIndex == items.length) putIndex = 0; count++; notEmpty.signal(); }

接着看put方法:
put方法是一個阻塞的方法,若是隊列元素已滿,那麼當前線程將會被notFull條件對象掛起加到等待隊列中,直到隊列有空檔纔會喚醒執行添加操做。但若是隊列沒有滿,那麼就直接調用enqueue(e)方法將元素加入到數組隊列中。到此咱們對三個添加方法即put,offer,add都分析完畢,其中offer,add在正常狀況下都是無阻塞的添加,而put方法是阻塞添加。這就是阻塞隊列的添加過程。說白了就是當隊列滿時經過條件對象Condtion來阻塞當前調用put方法的線程,直到線程又再次被喚醒執行。總得來講添加線程的執行存在如下兩種狀況,一是,隊列已滿,那麼新到來的put線程將添加到notFull的條件隊列中等待,二是,有移除線程執行移除操做,移除成功同時喚醒put線程,以下圖所示

public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { //當隊列元素個數與數組長度相等時,沒法添加元素 while (count == items.length) //將當前調用線程掛起,添加到notFull條件隊列中等待喚醒 notFull.await(); enqueue(e); } finally { lock.unlock(); } }
移除實現原理:

poll方法,該方法獲取並移除此隊列的頭元素,若隊列爲空,則返回 null

public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { //判斷隊列是否爲null,不爲null執行dequeue()方法,不然返回null return (count == 0) ? null : dequeue(); } finally { lock.unlock(); } } //刪除隊列頭元素並返回 private E dequeue() { //拿到當前數組的數據 final Object[] items = this.items; @SuppressWarnings("unchecked") //獲取要刪除的對象 E x = (E) items[takeIndex]; 將數組中takeIndex索引位置設置爲null items[takeIndex] = null; //takeIndex索引加1並判斷是否與數組長度相等, //若是相等說明已到盡頭,恢復爲0 if (++takeIndex == items.length) takeIndex = 0; count--;//隊列個數減1 if (itrs != null) itrs.elementDequeued();//同時更新迭代器中的元素數據 //刪除了元素說明隊列有空位,喚醒notFull條件對象添加線程,執行添加操做 notFull.signal(); return x; }

接着看remove(Object o)方法

public boolean remove(Object o) { if (o == null) return false; //獲取數組數據 final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock();//加鎖 try { //若是此時隊列不爲null,這裏是爲了防止併發狀況 if (count > 0) { //獲取下一個要添加元素時的索引 final int putIndex = this.putIndex; //獲取當前要被刪除元素的索引 int i = takeIndex; //執行循環查找要刪除的元素 do { //找到要刪除的元素 if (o.equals(items[i])) { removeAt(i);//執行刪除 return true;//刪除成功返回true } //當前刪除索引執行加1後判斷是否與數組長度相等 //若爲true,說明索引已到數組盡頭,將i設置爲0 if (++i == items.length) i = 0; } while (i != putIndex);//繼承查找 } return false; } finally { lock.unlock(); } } //根據索引刪除元素,其實是把刪除索引以後的元素往前移動一個位置 void removeAt(final int removeIndex) { final Object[] items = this.items; //先判斷要刪除的元素是否爲當前隊列頭元素 if (removeIndex == takeIndex) { //若是是直接刪除 items[takeIndex] = null; //當前隊列頭元素加1並判斷是否與數組長度相等,若爲true設置爲0 if (++takeIndex == items.length) takeIndex = 0; count--;//隊列元素減1 if (itrs != null) itrs.elementDequeued();//更新迭代器中的數據 } else { //若是要刪除的元素不在隊列頭部, //那麼只需循環迭代把刪除元素後面的全部元素往前移動一個位置 //獲取下一個要被添加的元素的索引,做爲循環判斷結束條件 final int putIndex = this.putIndex; //執行循環 for (int i = removeIndex;;) { //獲取要刪除節點索引的下一個索引 int next = i + 1; //判斷是否已爲數組長度,若是是從數組頭部(索引爲0)開始找 if (next == items.length) next = 0; //若是查找的索引不等於要添加元素的索引,說明元素能夠再移動 if (next != putIndex) { items[i] = items[next];//把後一個元素前移覆蓋要刪除的元 i = next; } else { //在removeIndex索引以後的元素都往前移動完畢後清空最後一個元素 items[i] = null; this.putIndex = i; break;//結束循環 } } count--;//隊列元素減1 if (itrs != null) itrs.removedAt(removeIndex);//更新迭代器數據 } notFull.signal();//喚醒添加線程 }

remove(Object o)方法的刪除過程相對複雜些,由於該方法並非直接從隊列頭部刪除元素。首先線程先獲取鎖,再一步判斷隊列count>0,這點是保證併發狀況下刪除操做安全執行。接着獲取下一個要添加源的索引putIndex以及takeIndex索引 ,做爲後續循環的結束判斷,由於只要putIndex與takeIndex不相等就說明隊列沒有結束。而後經過while循環找到要刪除的元素索引,執行removeAt(i)方法刪除,在removeAt(i)方法中實際上作了兩件事,一是首先判斷隊列頭部元素是否爲刪除元素,若是是直接刪除,並喚醒添加線程,二是若是要刪除的元素並非隊列頭元素,那麼執行循環操做,從要刪除元素的索引removeIndex以後的元素都往前移動一個位置,那麼要刪除的元素就被removeIndex以後的元素替換,從而也就完成了刪除操做。

接着看take()方法
take方法其實很簡單,有就刪除沒有就阻塞,注意這個阻塞是能夠中斷的,若是隊列沒有數據那麼就加入notEmpty條件隊列等待(有數據就直接取走,方法結束),若是有新的put線程添加了數據,那麼put操做將會喚醒take線程,執行take操做。圖示以下

//從隊列頭部刪除,隊列沒有元素就阻塞,可中斷 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//中斷 try { //若是隊列沒有元素 while (count == 0) //執行阻塞操做 notEmpty.await(); return dequeue();//若是隊列有元素執行刪除操做 } finally { lock.unlock(); } }

最後看看peek()方法,比較簡單,直接返回當前隊列的頭元素但不刪除任何元素。

public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { //直接返回當前隊列的頭元素,但不刪除 return itemAt(takeIndex); // null when queue is empty } finally { lock.unlock(); } } final E itemAt(int i) { return (E) items[i]; }

(3)LinkedBlockingQueue

參數以及構造函數:

//節點類,用於存儲數據 static class Node<E> { E item; Node<E> next; Node(E x) { item = x; } } // 容量大小 private final int capacity; // 元素個數,由於有2個鎖,存在競態條件,使用AtomicInteger private final AtomicInteger count = new AtomicInteger(0); // 頭結點 private transient Node<E> head; // 尾節點 private transient Node<E> last; // 獲取並移除元素時使用的鎖,如take, poll, etc private final ReentrantLock takeLock = new ReentrantLock(); // notEmpty條件對象,當隊列沒有數據時用於掛起執行刪除的線程 private final Condition notEmpty = takeLock.newCondition(); // 添加元素時使用的鎖如 put, offer, etc private final ReentrantLock putLock = new ReentrantLock(); // notFull條件對象,當隊列數據已滿時用於掛起執行添加的線程 private final Condition notFull = putLock.newCondition(); public LinkedBlockingQueue() { this(Integer.MAX_VALUE); } public LinkedBlockingQueue(int capacity) { if (capacity <= 0) throw new IllegalArgumentException(); this.capacity = capacity; last = head = new Node<E>(null); } public LinkedBlockingQueue(Collection<? extends E> c) { this(Integer.MAX_VALUE); final ReentrantLock putLock = this.putLock; putLock.lock(); // Never contended, but necessary for visibility try { int n = 0; for (E e : c) { if (e == null) throw new NullPointerException(); if (n == capacity) throw new IllegalStateException("Queue full"); enqueue(new Node<E>(e)); ++n; } count.set(n); } finally { putLock.unlock(); } } 
四、線程池中的BlockingQueue

首先看下構造函數

public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler){...}

TimeUnit:時間單位;BlockingQueue:等待的線程存放隊列;keepAliveTime:非核心線程的閒置超時時間,超過這個時間就會被回收;RejectedExecutionHandler:線程池對拒絕任務的處理策略。
自定義線程池:這個構造方法對於隊列是什麼類型比較關鍵。

  • 在使用有界隊列時,如有新的任務須要執行,若是線程池實際線程數小於corePoolSize,則優先建立線程,
  • 若大於corePoolSize,則會將任務加入隊列,
  • 若隊列已滿,則在總線程數不大於maximumPoolSize的前提下,建立新的線程,
  • 若隊列已經滿了且線程數大於maximumPoolSize,則執行拒絕策略。或其餘自定義方式。

接下來看下源碼:

public void execute(Runnable command) { if (command == null) //不能是空任務 throw new NullPointerException(); //若是尚未達到corePoolSize,則添加新線程來執行任務 if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //若是已經達到corePoolSize,則不斷的向工做隊列中添加任務 if (runState == RUNNING && workQueue.offer(command)) { //線程池已經沒有任務 if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } //若是線程池不處於運行中或者工做隊列已經滿了,可是當前的線程數量還小於容許最大的maximumPoolSize線程數量,則繼續建立線程來執行任務 else if (!addIfUnderMaximumPoolSize(command)) //已達到最大線程數量,任務隊列也已經滿了,則調用飽和策略執行處理器 reject(command); // is shutdown or saturated } } private boolean addIfUnderCorePoolSize(Runnable firstTask) { Thread t = null; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); //更改幾個重要的控制字段須要加鎖 try { //池裏線程數量小於核心線程數量,而且還須要是運行時 if (poolSize < corePoolSize && runState == RUNNING) t = addThread(firstTask); } finally { mainLock.unlock(); } if (t == null) return false; t.start(); //建立後,當即執行該任務 return true; } private Thread addThread(Runnable firstTask) { Worker w = new Worker(firstTask); Thread t = threadFactory.newThread(w); //委託線程工廠來建立,具備相同的組、優先級、都是非後臺線程 if (t != null) { w.thread = t; workers.add(w); //加入到工做者線程集合裏 int nt = ++poolSize; if (nt > largestPoolSize) largestPoolSize = nt; } return t; } 
相關文章
相關標籤/搜索