java集合框架 arrayblockingqueue應用分析

ArrayBlockingQueue是一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是在隊列中存在時間最長的元素
Queue  
------------ 
1.ArrayDeque, (數組雙端隊列) 
2.PriorityQueue, (優先級隊列) 
3.ConcurrentLinkedQueue, (基於鏈表的併發隊列) 
4.DelayQueue, (延期阻塞隊列)(阻塞隊列實現了BlockingQueue接口) 
5.ArrayBlockingQueue, (基於數組的併發阻塞隊列) 
6.LinkedBlockingQueue, (基於鏈表的FIFO阻塞隊列) 
7.LinkedBlockingDeque, (基於鏈表的FIFO雙端阻塞隊列) 
8.PriorityBlockingQueue, (帶優先級的無界阻塞隊列) 
9.SynchronousQueue (併發同步阻塞隊列) 
----------------------------------------------------- 
ArrayBlockingQueue 
是一個由數組支持的有界阻塞隊列。此隊列按 FIFO(先進先出)原則對元素進行排序。隊列的頭部 是在隊列中存在時間最長的元素。隊列的尾部 是在隊列中存在時間最短的元素。新元素插入到隊列的尾部,隊列獲取操做則是從隊列頭部開始得到元素。 
這是一個典型的「有界緩存區」,固定大小的數組在其中保持生產者插入的元素和使用者提取的元素。一旦建立了這樣的緩存區,就不能再增長其容量。試圖向已滿隊列中放入元素會致使操做受阻塞;試圖從空隊列中提取元素將致使相似阻塞。 
此類支持對等待的生產者線程和消費者線程進行排序的可選公平策略。默認狀況下,不保證是這種排序。然而,經過將公平性 (fairness) 設置爲 true 而構造的隊列容許按照 FIFO 順序訪問線程。公平性一般會下降吞吐量,但也減小了可變性和避免了「不平衡性」。 
複製代碼代碼以下:

public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { 
/** 隊列元素 數組 */ 
private final E[] items; 
/** 獲取、刪除元素時的索引(take, poll 或 remove操做) */ 
private int takeIndex; 
/** 添加元素時的索引(put, offer或 add操做) */ 
private int putIndex; 
/** 隊列元素的數目*/ 
private int count; 
/** 鎖 */ 
private final ReentrantLock lock; 
/** 獲取操做時的條件 */ 
private final Condition notEmpty; 
/** 插入操做時的條件 */ 
private final Condition notFull; 
//超出數組長度時,重設爲0 
final int inc(int i) { 
return (++i == items.length)? 0 : i; 

/** 
* 插入元素(在得到鎖的狀況下才調用) 
*/ 
private void insert(E x) { 
items[putIndex] = x; 
putIndex = inc(putIndex); 
++count; 
notEmpty.signal(); 

/** 
* 獲取並移除元素(在得到鎖的狀況下才調用) 
*/ 
private E extract() { 
final E[] items = this.items; 
E x = items[takeIndex]; 
items[takeIndex] = null; 
takeIndex = inc(takeIndex);//移到下一個位置 
--count; 
notFull.signal(); 
return x; 

/** 
* 刪除i位置的元素 
*/ 
void removeAt(int i) { 
final E[] items = this.items; 
// if removing front item, just advance 
if (i == takeIndex) { 
items[takeIndex] = null; 
takeIndex = inc(takeIndex); 
} else { 
// 把i後面的直到putIndex的元素都向前移動一個位置 
for (;;) { 
int nexti = inc(i); 
if (nexti != putIndex) { 
items[i] = items[nexti]; 
i = nexti; 
} else { 
items[i] = null; 
putIndex = i; 
break; 



--count; 
notFull.signal(); 

/** 
*構造方法,指定容量,默認策略(不是按照FIFO的順序訪問) 
*/ 
public ArrayBlockingQueue(int capacity) { 
this(capacity, false); 

/** 
*構造方法,指定容量及策略 
*/ 
public ArrayBlockingQueue(int capacity, boolean fair) { 
if (capacity <= 0) 
throw new IllegalArgumentException(); 
this.items = (E[]) new Object[capacity]; 
lock = new ReentrantLock(fair); 
notEmpty = lock.newCondition(); 
notFull = lock.newCondition(); 

/** 
* 經過集合構造 
*/ 
public ArrayBlockingQueue(int capacity, boolean fair, 
Collection<? extends E> c) { 
this(capacity, fair); 
if (capacity < c.size()) 
throw new IllegalArgumentException(); 
for (Iterator<? extends E> it = c.iterator(); it.hasNext();) 
add(it.next()); 

/** 
* 插入元素到隊尾(super調用offer方法) 
* public boolean add(E e) { 
* if (offer(e)) 
* return true; 
* else 
* throw new IllegalStateException("Queue full"); 
* } 
* 將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量), 
* 在成功時返回 true,若是此隊列已滿,則拋出 IllegalStateException。 
*/ 
public boolean add(E e) { 
return super.add(e); 

/** 
* 將指定的元素插入到此隊列的尾部(若是當即可行且不會超過該隊列的容量), 
* 在成功時返回 true,若是此隊列已滿,則返回 false。 
*/ 
public boolean offer(E e) { 
if (e == null) throw new NullPointerException(); 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
if (count == items.length) 
return false; 
else { 
insert(e); 
return true; 

} finally { 
lock.unlock(); 


/** 
* 將指定的元素插入此隊列的尾部,若是該隊列已滿,則等待可用的空間。 
*/ 
public void put(E e) throws InterruptedException { 
if (e == null) throw new NullPointerException(); 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
try { 
while (count == items.length) 
notFull.await(); 
} catch (InterruptedException ie) { 
notFull.signal(); // propagate to non-interrupted thread 
throw ie; 

insert(e); 
} finally { 
lock.unlock(); 


/** 
* 將指定的元素插入此隊列的尾部,若是該隊列已滿,則在到達指定的等待時間以前等待可用的空間。 
*/ 
public boolean offer(E e, long timeout, TimeUnit unit) 
throws InterruptedException { 
if (e == null) throw new NullPointerException(); 
long nanos = unit.toNanos(timeout); 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
for (;;) { 
if (count != items.length) { 
insert(e); 
return true; 

if (nanos <= 0)//若是時間到了就返回 
return false; 
try { 
nanos = notFull.awaitNanos(nanos); 
} catch (InterruptedException ie) { 
notFull.signal(); // propagate to non-interrupted thread 
throw ie; 


} finally { 
lock.unlock(); 


//獲取並移除此隊列的頭,若是此隊列爲空,則返回 null。 
public E poll() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
if (count == 0) 
return null; 
E x = extract(); 
return x; 
} finally { 
lock.unlock(); 


//獲取並移除此隊列的頭部,在元素變得可用以前一直等待(若是有必要)。 
public E take() throws InterruptedException { 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
try { 
while (count == 0) 
notEmpty.await(); 
} catch (InterruptedException ie) { 
notEmpty.signal(); // propagate to non-interrupted thread 
throw ie; 

E x = extract(); 
return x; 
} finally { 
lock.unlock(); 


//獲取並移除此隊列的頭部,在指定的等待時間前等待可用的元素(若是有必要)。 
public E poll(long timeout, TimeUnit unit) throws InterruptedException { 
long nanos = unit.toNanos(timeout); 
final ReentrantLock lock = this.lock; 
lock.lockInterruptibly(); 
try { 
for (;;) { 
if (count != 0) { 
E x = extract(); 
return x; 

if (nanos <= 0) 
return null; 
try { 
nanos = notEmpty.awaitNanos(nanos); 
} catch (InterruptedException ie) { 
notEmpty.signal(); // propagate to non-interrupted thread 
throw ie; 


} finally { 
lock.unlock(); 


//獲取但不移除此隊列的頭;若是此隊列爲空,則返回 null。 
public E peek() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return (count == 0) ? null : items[takeIndex]; 
} finally { 
lock.unlock(); 


/** 
* 返回此隊列中元素的數量。 
*/ 
public int size() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return count; 
} finally { 
lock.unlock(); 


/** 
*返回在無阻塞的理想狀況下(不存在內存或資源約束)此隊列能接受的其餘元素數量。 
*/ 
public int remainingCapacity() { 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
return items.length - count; 
} finally { 
lock.unlock(); 


/** 
* 今後隊列中移除指定元素的單個實例(若是存在)。 
*/ 
public boolean remove(Object o) { 
if (o == null) return false; 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
int i = takeIndex; 
int k = 0; 
for (;;) { 
if (k++ >= count) 
return false; 
if (o.equals(items[i])) { 
removeAt(i); 
return true; 

i = inc(i); 

} finally { 
lock.unlock(); 


/** 
* 若是此隊列包含指定的元素,則返回 true。 
*/ 
public boolean contains(Object o) { 
if (o == null) return false; 
final E[] items = this.items; 
final ReentrantLock lock = this.lock; 
lock.lock(); 
try { 
int i = takeIndex; 
int k = 0; 
while (k++ < count) { 
if (o.equals(items[i])) 
return true; 
i = inc(i); 

return false; 
} finally { 
lock.unlock(); 


…… 
相關文章
相關標籤/搜索