剛一聽到阻塞隊列,就以爲它很是地高大上,很是地難!其實否則!爲何?由於當你有一點基本的數據結構基礎再看阻塞隊列的定義以後你就會發現就那麼回事。好了,言歸正傳,隊列?無非就是一種具備先進先出(FIFO)
特性的數據結構嘛!其最基本的操做是入隊
和出隊
。java
那上面是阻塞隊列呢?咱們來看下關於它的一番定義:算法
阻塞隊列(BlockingQueue)是一個支持兩個附加操做的一種特殊隊列。這兩個附加的操做是:編程
阻塞隊列常常用於生產者和消費者
的場景,生產者是往隊列裏添加元素的線程,消費者是從隊列裏拿元素的線程。阻塞隊列就是生產者存放元素的容器,而消費者也只從容器裏拿元素,所謂容器,就是咱們以前文章講到的臨界區,是爲了將生產者和消費者進行解耦而加入的。數組
那麼咱們就要開始問了,他的基本操做是怎樣的呢?怎麼實現隊列的阻塞呢?下面是Java中阻塞隊列支持的相關操做:安全
方法 | 拋出異常 | 返回特殊值 | 一直阻塞 | 超時退出 |
---|---|---|---|---|
入隊方法 |
add(e) |
offer(e) |
put(e) |
offer(e,time,unit) |
出隊方法 |
remove() |
poll() |
take() |
poll(time,unit) |
檢查方法 |
element() |
peek() |
不可用 |
不可用 |
從上表咱們能夠看出put()
和take()
方法當隊列滿或爲空的狀況下會一直阻塞,阻塞隊列會提供對這兩個操做的支持。bash
接下來我將列出 JDK
中對阻塞隊列的相關實現,並見到那挑選其中的某個實現進行源碼分析。數據結構
JDK
中阻塞隊列有如下實現:多線程
ArrayBlockingQueue :一個由數組結構組成的有界阻塞隊列。
LinkedBlockingQueue :一個由鏈表結構組成的有界阻塞隊列。
PriorityBlockingQueue :一個支持優先級排序的無界阻塞隊列。
DelayQueue:一個使用優先級隊列實現的無界阻塞隊列。
SynchronousQueue:一個不存儲元素的阻塞隊列。
LinkedTransferQueue:一個由鏈表結構組成的無界阻塞隊列。
LinkedBlockingDeque:一個由鏈表結構組成的雙向阻塞隊列。
複製代碼
對了,小編說說我對有界和無界的理解,我也不清楚對仍是不對,不對的話麻煩你評論告訴小編,灰常感謝!併發
從實現方面講:框架
有界
: 指的是實現裏頭持有的資源(數組)是有大小的,即容量是有限的
無界
: 指的是持有一個無界的鏈表
從訪問方式看:
無界
: 指的是不拒絕某些線程的訪問
有界
: 指的是拒絕某些線程的訪問
ArrayBlockingQueue.java
對於阻塞隊列的學習,咱們要時常在腦子裏模擬併發對其操做的場景。讓咱們先來看看ArrayBlockingQueue
中聲明的相關成員變量:
/** 存放隊列元素的數組 */
final Object[] items;
/** 下一次調用 take, poll, peek 或者 remove 時元素的下標,隊頭指針 */
int takeIndex;
/** 下一次調用 put, offer, 或者 add 方法時元素的下標,隊尾指針*/
int putIndex;
/** 隊列元素的大小,至關於ArrayList中的size
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.經典的雙條件算法
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** 判斷是否爲空的條件變量,用來表示隊列空與非空的狀態 */
private final Condition notEmpty;
/** 判斷是否滿了條件變量 用來表示隊列滿或沒滿的狀態 */
private final Condition notFull;
/**
* 當前活動迭代器的共享狀態,或者若是已知不存在,則返回null。 容許隊列操做更新迭代器狀態。
* Shared state for currently active iterators, or null if there
* are known not to be any. Allows queue operations to update
* iterator state.
*/
//在迭代器和它們的隊列之間共享數據,容許在刪除元素時修改隊列以更新迭代器
transient Itrs itrs = null;
複製代碼
對於存儲數據元素的字段items,爲何聲明爲Object[],而不聲明爲E[],能夠查看小編另一篇文章Java 集合 ArrayList 源代碼分析(帶着問題看源碼)
從上面能夠看出,ArrayBlockingQueue
擁有一個存儲元素的數組items
及其相關的出隊
、入隊
指針及隊列容量大小count
,這些都是最基本的屬性。再往下看能夠看出隊列中使用了經典的雙條件算法,即擁有兩個條件變量Condition
類型的變量,Condition
是JDK提供的在基本同步方法notify()、wait()、notifyAll()
的基礎上進行優化的工具類,它提供了代替wait(),notify()
等方法的相應版本await()、signal()
方法。通常來講,Condition
的使用通常結合一個鎖來實現,ArrayBlockingQueue
中使用了可重入鎖ReentrantLock
,即經典的一鎖雙條件
。
若是還不理解,想一想咱們在生產者和消費者文中代碼裏寫的,在調用notify(),wait()
等方法時必須先在synchnorized{}
同步塊下得到鎖,道理是同樣的,你調用await()、signal()
的時候也須要進行lock()
得到鎖)
並且Condition
和ReentrantLock
都是不可變的,final
修飾,多線程安全啦!
知道了其成員變量,咱們再來看看其相應的構造方法:
/**
* 使用給定的容量大小和默認的存取策略(FIFO)初始化一個ArrayBlockingQueue
*/
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
/**
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity and the specified access policy.
* 使用給定的容量大小和給定的存取策略初始化一個ArrayBlockingQueue
* @param capacity the capacity of this queue
* @param fair if {@code true} then queue accesses for threads blocked
* on insertion or removal, are processed in FIFO order;
* if {@code false} the access order is unspecified.(也就是下一步得到鎖的還不指定是誰)
* @throws IllegalArgumentException if {@code capacity < 1}
*/
public ArrayBlockingQueue(int capacity, boolean fair) {
/**
* 你容量不能爲負數吧!
*/
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
//你初始化ArrayBlockingQueue的時候也要初始化你的成員變量吧!一所雙條件很重要啊!
lock = new ReentrantLock(fair);
//Condition對象由鎖來進行建立
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
/**
* 經過給定的容量大小、存取策略,使用給定的Collection來初始化數據
* Creates an {@code ArrayBlockingQueue} with the given (fixed)
* capacity, the specified access policy and initially containing the
* elements of the given collection,
* added in traversal order of the collection's iterator. */ public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { /** * 這一步同上 */ this(capacity, fair); final ReentrantLock lock = this.lock; lock.lock(); // 這裏只是爲了可見性,而不是指相互排斥,想一想你剛建立這個對象,哪有什麼互相排斥嘛! try { int i = 0; try { for (E e : c) { /** * 先檢查後操做機制,是一種很好的編程規範 */ checkNotNull(e); items[i++] = e; } /** * 通常來講,c的大小是小於等於capacity的,不然報錯了 */ } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; putIndex = (i == capacity) ? 0 : i; } finally { lock.unlock(); } } 複製代碼
從其構造器能夠看出,咱們能夠在初始化時指定一個容量大小,也能夠經過傳入一個Collection
來初始化數據。同時咱們也能夠看出,在最後一個構造方法中使用了checkNotNull()
方法,其實這是一種頗有用的機制,優秀的框架通常都會這樣子去寫,好比Spring的Asserts.java
,這也是一種斷言機制,就是說咱們很肯定程序到達這一步必定是正確的,固然,若是不正確,那麼確定拋出異常啦!下面咱們看看各類入隊和出隊的操做吧!
入隊
put()
/**
* 典型的生產者嘛!插入一個元素到尾部,一直等到(阻塞)直到已經滿的隊列變爲非滿狀態
*/
public void put(E e) throws InterruptedException {
/**
* 先檢查後操做,若是是空,就拋出異常
*/
checkNotNull(e);
final ReentrantLock lock = this.lock;
/**
* 得到lock的鎖,除非當前線程被中斷了,也就是說當前的線程若是被中斷,咱們連鎖都得不到,還拋出可怕的異常
*/
lock.lockInterruptibly();
try {
/**
* 若是隊列滿了,確定得阻塞嘛!難道滿了還加?還阻不阻塞了
*/
while (count == items.length)
/**
* 當調用await方法後,當前線程會釋放lock鎖並進入Condition變量的等待隊列,而其餘線程調用signal方法後,通知正在Condition變量等待隊列的線程從await方法返回,而且在返回前已經得到了鎖。
*/
notFull.await();
//若是不滿,那麼就入隊
enqueue(e);
} finally {
lock.unlock();//解鎖
}
}
複製代碼
put()
方法很顯然就是典型生產者消費者模型中的生產者角色。只不過當滿了的時候是經過調用await()
的方法阻塞當前線程且釋放鎖,被阻塞的當前線程將進入Condition
對象提供的等待隊列中去排隊,直到有元素從阻塞隊列出隊時,會調用notFull.signal()
喚醒線程。
take()
public E take() throws InterruptedException {
/**
* 先加鎖
*/
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
/**
* 若是隊列是空,那取個啥子,直接阻塞
*/
while (count == 0)
notEmpty.await();
//出隊
return dequeue();
} finally {
lock.unlock();
}
}
複製代碼
能夠看出出隊的操做很是地簡單粗暴,下面咱們再看看兩個經常使用的內部方法:
enqueue()
/**
* 真正的入隊操做,能執行到此方法,說明你已經得到鎖了,且當前線程符合生產者消費者模型的要求(即put時未滿,take時非空)
*/
private void enqueue(E x) {
final Object[] items = this.items;
//還記得putIndex指的是什麼嗎?指的就是下一個能夠入隊的元素下標
items[putIndex] = x;
//改變相應的下標,這可能一眼看不懂,須要畫圖,實際上是一個循環隊列來着
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知阻塞的線程前來消費
notEmpty.signal();
}
複製代碼
dequeue()
/**
* 真正的出隊操做,能執行到此方法,說明你已經得到鎖了,且當前線程符合生產者消費者模型的要求(即put時未滿,take時非空)
*/
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
//置爲null
items[takeIndex] = null;
//改變相應的下標,這可能一眼看不懂,須要畫圖,實際上是一個循環隊列來着
if (++takeIndex == items.length)
takeIndex = 0;
count--;
//防止出隊致使其餘線程迭代失敗的操做
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
複製代碼
好了,源碼分析就到這裏啦!