本文系《Java併發編程讀書筆記》java
阻塞隊列(BlockingQueue)是一個支持兩個附加操做的隊列。這兩個附加的操做支持阻塞的插入和移除方法。編程
阻塞隊列經常使用於生產者和消費者的場景,生產者是向隊列裏添加元素的線程,消費者是從隊列裏取元素的線程。阻塞隊列就是生產者用來存放元素、消費者用來獲取元素的容器。數組
在阻塞隊列不可用時,這兩個附加操做提供了4種處理方式,以下所示:緩存
拋出異常:當隊列滿時,若是再往隊列裏插入元素,會拋出IllegalArgumentException異常。當隊列空時,從隊列裏獲取元素會拋出NoSuchElementException異常。微信
返回特殊值:當往隊列插入元素時,會返回元素是否插入成功,成功返回true。若是是移除方法,則是從隊列裏取出一個元素,若是沒有則返回null
。多線程
一直阻塞:當阻塞隊列滿時,若是生產者線程往隊列裏put
元素,隊列會一直阻塞生產者線程,直到隊列可用或者響應中斷退出。當隊列空時,若是消費者從隊列裏take
元素,隊列會阻塞住消費者線程,直到隊列不爲空。併發
tips:若是是無界阻塞隊列,隊列不可能會出現滿的狀況,因此使用put或offer方法永遠不會被阻塞,並且使用offer方法時,該方法永遠返回true。函數
ArrayBlockingQueue是一個用數組實現的有界阻塞隊列。此隊列按照先進先出的原則對元素進行排序。構造方法以下:this
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
public ArrayBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
複製代碼
參數fair
用於設置線程是否公平訪問隊列。所謂公平訪問是指阻塞的線程,能夠按照阻塞的前後順序訪問隊列,即先阻塞線程先訪問隊列。非公平性是對先等待的線程是非公平的,當隊列可用時,阻塞的線程均可以爭奪訪問隊列的資格,有可能先阻塞的線程最後才訪問隊列。爲了保證公平性,一般會下降吞吐量。spa
LinkedBlockingQueue是一個用鏈表實現的有界阻塞隊列。此隊列的默認和最大長度爲Integer.MAX_VALUE
。此隊列按照先進先出的原則對元素進行排序。
PriorityBlockingQueue是一個支持優先級的無界阻塞隊列。默認狀況下元素採起天然順序升序排列。也能夠自定義類實現compareTo()
方法來指定元素排序規則,或者初始化PriorityBlockingQueue時,指定構造參數Comparator
來進行排序。須要注意的是不能保證同優先級元素的順序。
DelayQueue是一個支持延時獲取元素的無界阻塞隊列。隊列使用PriorityBlockingQueue來實現。隊列中的元素必須實現Delayed接口,在建立元素時能夠指定多久才能從隊列中獲取當前元素。只有在延遲期滿時才能從隊列中提取元素。
DelayQueue運用在如下應用場景:
DelayQueue隊列的元素必須實現Delayed接口。能夠參考ScheduledThreadPoolExecutor裏ScheduledFutureTask類的實現,步驟以下:
在對象建立的對象,初始化基本數據。使用time記錄當前對象延遲到何時能夠使用,使用sequenceNumber來標識元素在隊列中的前後順序。代碼以下:
private static final AtomicLong sequencer = new AtomicLong();
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
複製代碼
實現getDelay方法,該方法返回當前元素還須要延長多長時間,單位是納秒。代碼以下:
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), NANOSECONDS);
}
複製代碼
經過構造函數能夠看出延遲時間參數ns的單位是納秒,本身設計的時候最好使用納秒,實現getDelay()方法時能夠指定任意單位,一旦以秒或分做爲單位,而延時時間精確不到納秒就麻煩了。使用時注意當time小於當前時間時,getDelay會返回負數。
實現compareTo方法來指定元素的順序。例如,讓延時時間最長的放在隊列的末尾。代碼以下
public int compareTo(Delayed other) {
if (other == this) // compare zero if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber)
return -1;
else
return 1;
}
long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
複製代碼
延時阻塞隊列的實現很簡單,當消費者從隊列裏獲取元素時,若是元素沒有達到延時時間,就阻塞當前線程。
private Thread leader = null;
private final Condition available = lock.newCondition();
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
E first = q.peek();
if (first == null)
available.await();
else {
long delay = first.getDelay(NANOSECONDS);
if (delay <= 0)
return q.poll();
first = null; // don't retain ref while waiting
if (leader != null)
available.await();
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
複製代碼
SynchronousQueue是一個不存儲元素的阻塞隊列。每個put操做必須等待一個take操做,不然不能繼續添加元素。構造函數以下:
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
複製代碼
SynchronousQueue能夠當作是一個傳球手,負責把生產者線程處理的數據直接傳遞給消費者線程。隊列自己不存儲任何元素,很是適合傳遞性場景。SynchronousQueue的吞吐量高於LinkedBlockingQueue和ArrayBlockingQueue。
LinkedTransferQueue是一個由鏈表結構組成的無界阻塞TransferQueue隊列。相對於其餘阻塞隊列,LinkedTransferQueue多了tryTransfer和transfer方法。
transfer
方法
若是當前有消費者正在等待接收元素(消費者使用take()方法或帶時間限制的poll()方法時),transfer方法能夠把生產者傳入的元素馬上transfer(傳輸)給消費者。若是沒有消費者在等待接收元素,transfer方法會將元素存放在隊列的tail節點,並等到該元素被消費者了才返回。
tryTransfer
方法
tryTransfer方法時用來試探生產者傳入的元素是否能直接傳給消費者。若是沒有消費者等待接收元素,則返回fasle。和transfer方法的區別是tryTransfer方法不管消費者是否接收,方法當即返回,而transfer方法是必須等到消費者消費了才返回。
LinkedBlockingDeque是一個由鏈表結構組成的雙向阻塞隊列。所謂雙向隊列指的是能夠從隊列的兩端插入和移出元素。雙向隊列由於多了一個操做隊列的入口,在多線程同時入隊時,也就減小了一半的競爭。相比其餘的阻塞隊列,LinkedBlockingDeque多了addFirst
、addLast
、offerFirst
、offerLast
、peekFirst
和peekLast
等方法,以First單詞結尾的方法,表示插入、獲取(peek)或移除雙端隊列的第一個元素。以Last單詞結尾的方法,表示插入、獲取或移除雙向隊列的最後一個元素。
若是隊列是空的,消費者會一直等待,當生產者添加元素時,消費者是如何知道當前隊列有元素的呢?JDK使用通知模式實現的。所謂通知模式,就是當生產者往滿的隊列裏添加元素時會阻塞主生產者,當消息者消費了一個隊列中的元素後,會通知生產者當前隊列可用。
ArrayBlockingQueue使用Condition來實現,代碼以下:
private final Condition notEmpty;
private final Condition notFull;
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) // 隊列爲空時,阻塞當前消費者
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal(); // 隊列不爲空時,通知消息者獲取元素
}
複製代碼
若是讀完以爲有收穫的話,歡迎點贊、關注、加公衆號【牛覓技術】,查閱更多精彩歷史!!!