ArrayBlockingQueue底層是由數組實現的定長阻塞隊列(阻塞表示若是沒有原始那麼獲取元素會阻塞當前線程)html
ArrayBlockingQueue通常用於生產者消費者模型業務(排隊機制,先進先出)java
public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable { private static final long serialVersionUID = -817911632652898426L; /** The queued items 存儲元素容器*/ final Object[] items; /** items index for next take, poll, peek or remove 使用過的元素 */ int takeIndex; /** items index for next put, offer, or add 添加過的元素 */ int putIndex; /** Number of elements in the queue 當前元素數量 */ int count;
add數組
public boolean add(E e) { return super.add(e); } super.add public boolean add(E e) { if (offer(e)) return true; else throw new IllegalStateException("Queue full"); } public boolean offer(E e) { checkNotNull(e);//ArrayBlockingQueue不能存儲null對象 final ReentrantLock lock = this.lock;//插入操做線程安全 lock.lock(); try { if (count == items.length)//若是當前count==items.length表示隊列已經忙了,不能插入 return false; else { insert(e);//插入元素 return true; } } finally { lock.unlock(); } } private void insert(E x) { items[putIndex] = x;//第一次put爲0 putIndex = inc(putIndex);//遞增 ++count;//數量遞增 notEmpty.signal();//通知獲取原始方法能夠進行獲取 } final int inc(int i) {//若是當前putIndex==items.length那麼putIndex從新從零開始 return (++i == items.length) ? 0 : i; } //一樣爲添加元素,lock.lockInterruptibly若是檢測到有Thread.interrupted();會直接拋出異常 public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == items.length) notFull.await(); insert(e); } finally { lock.unlock(); } }
remove安全
public boolean remove(Object o) { if (o == null) return false; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) { if (o.equals(items[i])) {//從頭部開始遍歷元素判斷 removeAt(i); return true; } } return false; } finally { lock.unlock(); } } //queue size = 10 putSize = 5 tackSize = 0 //queue 1,2,3,4,5 removeAt 3 step1: removeAt != takeIndex i = nexti = 4 void removeAt(int i) { final Object[] items = this.items; // if removing front item, just advance if (i == takeIndex) { items[takeIndex] = null;//引用設置爲空 takeIndex = inc(takeIndex);//takeIndex++ } else { // slide over all others up through putIndex. for (;;) { int nexti = inc(i);//>隊列的頭部 遞增(putIndex一個循環的0-n) if (nexti != putIndex) {//遞增後部位putIndex所有向前移動位置 items[i] = items[nexti]; i = nexti; } else { items[i] = null;//元素設置爲空 putIndex = i; break; } } } --count;//元素遞減 notFull.signal();//通知notFull.awit() }
getoracle
public E poll() {//獲取隊列頭部元素,獲取後設置爲空 final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : extract();//若是當前隊列爲空直接返回null,不爲空調用extract() } finally { lock.unlock(); } } //獲取隊列頭部元素,獲取後設置爲空 //take獲取原始若是隊列爲空會進入阻塞狀態知道等到有添加元素纔會去返回 public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly();//lock.lockInterruptibly若是檢測到有Thread.interrupted();會直接拋出異常 try { while (count == 0) notEmpty.await();//若是沒有元素進入等待狀態,等待被喚醒 return extract(); } finally { lock.unlock(); } } //peek查看隊列頭部元素 public E peek() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : itemAt(takeIndex);//若是元素爲空直接返回null,不爲空條用itemAt(takeIndex) } finally { lock.unlock(); } } private E extract() { final Object[] items = this.items; E x = this.<E>cast(items[takeIndex]);//泛型轉換而且得到當前元素 items[takeIndex] = null;//當前元素設置爲空 takeIndex = inc(takeIndex);//獲取原始遞增 --count;//隊列元素遞減 notFull.signal();//通知notFull.await()能夠進行插入元素 return x;//返回當前獲取原始 } //獲取元素 final E itemAt(int i) { return this.<E>cast(items[i]); }
定長隊列,不能進行擴容ide
線程安全this