多線程環境中,經過隊列能夠很容易實現數據共享,好比經典的「生產者」和「消費者」模型中,經過隊列能夠很便利地實現二者之間的數據共享。假設咱們有若干生產者線程,另外又有若干個消費者線程。若是生產者線程須要把準備好的數據共享給消費者線程,利用隊列的方式來傳遞數據,就能夠很方便地解決他們之間的數據共享問題。但若是生產者和消費者在某個時間段內,萬一發生數據處理速度不匹配的狀況呢?理想狀況下,若是生產者產出數據的速度大於消費者消費的速度,而且當生產出來的數據累積到必定程度的時候,那麼生產者必須暫停等待一下(阻塞生產者線程),以便等待消費者線程把累積的數據處理完畢,反之亦然。html
BlockingQueue很好地解決了上述問題,BlockingQueue即阻塞隊列,它是一個接口,它的實現類有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它們的區別主要體如今存儲結構上或對元素操做上的不一樣。java
public interface BlockingQueue<E> extends Queue<E> {
//往隊列尾部添加元素,若是BlockingQueue能夠容納,則返回true,不然拋出異常
boolean add(E e);
//移除元素,若是有這個元素則就回true,不然拋出異常
boolean remove(Object o);
//往隊列尾部添加元素,若是BlockingQueue能夠容納則返回true,不然返回false.
//若是是往限定了長度的隊列中設置值,推薦使用offer()方法。
boolean offer(E e);
//和上面的方法差很少,不過若是隊列滿了能夠阻塞等待一段時間
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
//取出頭部對象,若不能當即取出,則能夠等time參數規定的時間,取不到時返回null
E poll(long timeout, TimeUnit unit) throws InterruptedException;
//往隊列尾部添加元素,若是沒有空間,則調用此方法的線程被阻塞直到有空間再繼續.
void put(E e) throws InterruptedException;
//取出頭部對象,若BlockingQueue爲空,阻斷進入等待狀態直到Blocking有新的對象被加入爲止
E take() throws InterruptedException;
//剩餘容量,超出此容量,便沒法無阻塞地添加元素
int remainingCapacity();
//判斷隊列中是否擁有該值。
boolean contains(Object o);
//一次性從BlockingQueue獲取全部可用的數據對象,能夠提高獲取數據效率
int drainTo(Collection<? super E> c);
//和上面的方法差很少,不過限制了最大取出數量
int drainTo(Collection<? super E> c, int maxElements);
}
複製代碼
咱們以ArrayBlockingQueue
爲例分析下上述方法:數組
offer(E e)安全
public boolean offer(E e) {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
return false;
else {
enqueue(e);
return true;
}
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length) putIndex = 0;
count++;
notEmpty.signal();
}
複製代碼
offer操做如上,代碼比較簡單,可見阻塞隊列是經過可重入保證線程安全。enqueue
方法也說明了ArrayBlockingQueue
是經過數組的形式存儲數據的。若是隊列滿了直接會返回false,不會阻塞線程。bash
put(E e)多線程
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)//隊列滿了,一直阻塞在這裏
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
複製代碼
由於put方法在隊列已滿的狀況下會阻塞線程,take、poll等方法會調用dequeue方法出列,從而調用notFull.signal(),從而喚醒阻塞在put方法中線程去繼續進行入列操做:ui
take()this
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
複製代碼
poll(long timeout, TimeUnit unit)spa
從對頭取出一個元素:若是數組不空,出隊;若是數組已空且已經超時,返回null;若是數組已空則進入等待,直到被喚醒或超時:.net
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);////將時間轉換爲納秒
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {//隊列爲空
if (nanos <= 0L)
return null;
//阻塞指定時間,enqueue()方法會調用notEmpty.signal()喚醒進行poll操做的線程
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
複製代碼
雖然只講了阻塞隊列,但涉及了ReentrantLock、中斷、Condition等知識點,若是不清楚的話能夠看下下面的幾篇文章: