public class ArrayBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable。ArrayBlockingQueue是BlockingQueue接口的一種實現,要了解它就必須清楚BlockingQueue的相關知識;java
在併發隊列上JDK提供了兩套實現,一個是以ConcurrentLinkedQueue爲表明的高性能隊列,一個是以BlockingQueue接口爲表明的阻塞隊列,不管哪一種都繼承自Queue接口!,BlockingQueue的類繼承關係以下:算法
BlockingQueue接口重要方法以下:數組
/** The queued items */ 以數組做爲數據結構 final Object[] items; /** items index for next take, poll, peek or remove */ 隊列中下一個將被取出值的下標 int takeIndex; /** items index for next put, offer, or add */ 隊列中下一個將被放入值的下標 int putIndex; /** Number of elements in the queue */ 數組元素數量 int count; /* * Concurrency control uses the classic two-condition algorithm 使用雙條件算法 * found in any textbook. */ /** Main lock guarding all access */ 使用重入鎖(獨佔鎖) final ReentrantLock lock; /** Condition for waiting takes */ take時候用於等待的條件 private final Condition notEmpty; /** Condition for waiting puts */ put時候用於等待的條件 private final Condition notFull; transient Itrs itrs = null;
/**數據結構
@throws IllegalArgumentException if {@code capacity < 1}
*/併發
public ArrayBlockingQueue(int capacity) { this(capacity, false); //調用public ArrayBlockingQueue(int capacity, boolean fair)構造方法,默認使用非公平鎖 }
/**app
public ArrayBlockingQueue(int capacity, boolean fair) { if (capacity <= 0) throw new IllegalArgumentException(); this.items = new Object[capacity]; //建立指定容量的數組 lock = new ReentrantLock(fair); //默認使用非公平鎖 notEmpty = lock.newCondition(); notFull = lock.newCondition(); }
/**less
of its elements are null
*/
//這個構造函數的核心就是c.size()與capacity的大小關係對比了
//若是c.size()>capacity那就會報錯,因此在初始化的時候要注意ide
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { this(capacity, fair); //先建立指定容量的數組,以便集合中的元素存放 //這種寫法咱們很常見,使用final表示引用不能改變,但又避免了直接使用成員變量 final ReentrantLock lock = this.lock; //對隊列直接修改操做,須要先獲取獨佔鎖 lock.lock(); // Lock only for visibility, not mutual exclusion try { int i = 0; try { for (E e : c) { checkNotNull(e); items[i++] = e; //下標從0開始存放 } } catch (ArrayIndexOutOfBoundsException ex) { throw new IllegalArgumentException(); } count = i; //將數組元素個數返回給全局變量 putIndex = (i == capacity) ? 0 : i; ////初始化入隊索引 } finally { lock.unlock(); //解鎖 } }
public boolean add(E e) { return super.add(e); }
//父類AbstractQueue的方法函數
public boolean add(E e) { if (offer(e)) //調用offer方法添加元素,不阻塞當前線程(等待) return true; else throw new IllegalStateException("Queue full"); }
offer(E e)方法,源碼以下:
//若是隊列能夠容納則當即返回成功,不然失敗,不阻塞當前線程,是官方推薦使用的方法源碼分析
public boolean offer(E e) { checkNotNull(e); //檢查元素是否爲空 final ReentrantLock lock = this.lock; lock.lock();//獲取獨佔鎖 try { if (count == items.length) //當前數組已滿,當即返回失敗 return false; else { enqueue(e); return true; } } finally { lock.unlock(); //解鎖 } }
//插入元素
private void enqueue(E x) { // assert lock.getHoldCount() == 1; 當前線程調用lock()的次數 // assert items[putIndex] == null; 當前位置沒值 final Object[] items = this.items; items[putIndex] = x; //在指定的位置插入元素 if (++putIndex == items.length) putIndex = 0; count++; //更新數組元素個數 notEmpty.signal();//通知被take方法讀取元素阻塞等待的線程(前提是該線程持有鎖) }
put(E e)方法,若是BlockQueue沒有空間, 則調用此方法的線程被阻斷直到BlockingQueue裏面有空間再繼續,其源碼以下:
public void put(E e) throws InterruptedException { checkNotNull(e); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //申請鎖的過程可被外界打斷(中斷響應,不會無限制等待) try { while (count == items.length) notFull.await(); //隊列已經滿了,則使用put的條件等待(此方法與object.wait()方法不一樣,不釋放鎖),之道有空閒空間繼續執行下一步enqueue(e); enqueue(e); } finally { lock.unlock(); //解鎖 } }
//offer(E,long,TimeUnit)會在等待一段時間後返回,可是等待的過程當中是能夠響應中斷的
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { checkNotNull(e); long nanos = unit.toNanos(timeout); final ReentrantLock lock = this.lock; lock.lockInterruptibly(); //響應中斷 try { while (count == items.length) { if (nanos <= 0) return false; nanos = notFull.awaitNanos(nanos); } enqueue(e); return true; } finally { lock.unlock(); } }
public E poll() { final ReentrantLock lock = this.lock; lock.lock(); try { return (count == 0) ? null : dequeue(); //數組不爲空,調用dequeue取出takeIndex下標位置上的元素,而且會喚醒等待notFull條件的線程 } finally { lock.unlock(); } }
public E take() throws InterruptedException { final ReentrantLock lock = this.lock; lock.lockInterruptibly(); try { while (count == 0) notEmpty.await(); //隊列爲空,則阻塞當前隊列 return dequeue(); //隊列有元素,則調用dequeue } finally { lock.unlock(); } }
E dequeue()閱讀:
private E dequeue() { // assert lock.getHoldCount() == 1; // assert items[takeIndex] != null; final Object[] items = this.items; @SuppressWarnings("unchecked") E x = (E) items[takeIndex]; //獲取takeIndex下標處的元素 items[takeIndex] = null; //將當前位置的元素設置爲null //這裏能夠看出這個數組是個環形數組,其實取元素,老是從隊列頭部開始,即items[0] if (++takeIndex == items.length) takeIndex = 0; count--; //更新數組元素個數 if (itrs != null) itrs.elementDequeued(); //修改迭代器參數 notFull.signal(); //通知阻塞在putIndex操做的線程 return x; }
public int drainTo(Collection<? super E> c) { return drainTo(c, Integer.MAX_VALUE); }
public int drainTo(Collection<? super E> c, int maxElements) { checkNotNull(c); if (c == this) //存放返回元素的集合不能使當前隊列 throw new IllegalArgumentException(); if (maxElements <= 0) return 0; final Object[] items = this.items; final ReentrantLock lock = this.lock; lock.lock(); try { int n = Math.min(maxElements, count); //要取出的元素不能大於數組元素個數 int take = takeIndex; int i = 0; try { while (i < n) { @SuppressWarnings("unchecked") E x = (E) items[take]; c.add(x); //將takerInex下標處的元素放入集合中 items[take] = null; //對應位置的元素設置爲null if (++take == items.length) take = 0; // 環形數組 i++; } return n; //返回獲取到的元素個數 } finally { // Restore invariants even if c.add() threw //更新狀態,即便操做失敗也要還原 if (i > 0) {// count -= i; takeIndex = take; if (itrs != null) { if (count == 0) itrs.queueIsEmpty(); else if (i > take) itrs.takeIndexWrapped(); } for (; i > 0 && lock.hasWaiters(notFull); i--) notFull.signal(); //喚醒阻塞在此條件(putIndex)的線程 } } } finally { lock.unlock(); } }