在多線程編程過程當中,爲了業務解耦和架構設計,常常會使用併發容器用於存儲多線程間的共享數據,這樣不只能夠保證線程安全,還能夠簡化各個線程操做。例如在「生產者-消費者」問題中,會使用阻塞隊列(BlockingQueue)做爲數據容器,關於BlockingQueue能夠看這篇文章。爲了加深對阻塞隊列的理解,惟一的方式是對其實驗原理進行理解,這篇文章就主要來看看ArrayBlockingQueue和LinkedBlockingQueue的實現原理。node
阻塞隊列最核心的功能是,可以可阻塞式的插入和刪除隊列元素。當前隊列爲空時,會阻塞消費數據的線程,直至隊列非空時,通知被阻塞的線程;當隊列滿時,會阻塞插入數據的線程,直至隊列未滿時,通知插入數據的線程(生產者線程)。那麼,多線程中消息通知機制最經常使用的是lock的condition機制,關於condition能夠看這篇文章的詳細介紹。那麼ArrayBlockingQueue的實現是否是也會採用Condition的通知機制呢?下面來看看。編程
ArrayBlockingQueue的主要屬性以下:數組
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty;
/** Condition for waiting puts */
private final Condition notFull;
複製代碼
從源碼中能夠看出ArrayBlockingQueue內部是採用數組進行數據存儲的(屬性items
),爲了保證線程安全,採用的是ReentrantLock lock
,爲了保證可阻塞式的插入刪除數據利用的是Condition,當獲取數據的消費者線程被阻塞時會將該線程放置到notEmpty等待隊列中,當插入數據的生產者線程被阻塞時,會將該線程放置到notFull等待隊列中。而notEmpty和notFull等中要屬性在構造方法中進行建立:安全
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
複製代碼
接下來,主要看看可阻塞式的put和take方法是怎樣實現的。數據結構
put(E e)
方法源碼以下:多線程
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//若是當前隊列已滿,將線程移入到notFull等待隊列中
while (count == items.length)
notFull.await();
//知足插入數據的要求,直接進行入隊操做
enqueue(e);
} finally {
lock.unlock();
}
}
複製代碼
該方法的邏輯很簡單,當隊列已滿時(count == items.length
)將線程移入到notFull等待隊列中,若是當前知足插入數據的條件,就能夠直接調用enqueue(e)
插入數據元素。enqueue方法源碼爲:架構
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//插入數據
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知消費者線程,當前隊列中有數據可供消費
notEmpty.signal();
}
複製代碼
enqueue方法的邏輯一樣也很簡單,先完成插入數據,即往數組中添加數據(items[putIndex] = x
),而後通知被阻塞的消費者線程,當前隊列中有數據可供消費(notEmpty.signal()
)。併發
take方法源碼以下:less
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//若是隊列爲空,沒有數據,將消費者線程移入等待隊列中
while (count == 0)
notEmpty.await();
//獲取數據
return dequeue();
} finally {
lock.unlock();
}
}
複製代碼
take方法也主要作了兩步:1. 若是當前隊列爲空的話,則將獲取數據的消費者線程移入到等待隊列中;2. 若隊列不爲空則獲取數據,即完成出隊操做dequeue
。dequeue方法源碼爲:post
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
//獲取數據
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//通知被阻塞的生產者線程
notFull.signal();
return x;
}
複製代碼
dequeue方法也主要作了兩件事情:1. 獲取隊列中的數據,即獲取數組中的數據元素((E) items[takeIndex]
);2. 通知notFull等待隊列中的線程,使其由等待隊列移入到同步隊列中,使其可以有機會得到lock,並執行完成功退出。
從以上分析,能夠看出put和take方法主要是經過condition的通知機制來完成可阻塞式的插入數據和獲取數據。在理解ArrayBlockingQueue後再去理解LinkedBlockingQueue就很容易了。
LinkedBlockingQueue是用鏈表實現的有界阻塞隊列,當構造對象時爲指定隊列大小時,隊列默認大小爲Integer.MAX_VALUE
。從它的構造方法能夠看出:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
複製代碼
LinkedBlockingQueue的主要屬性有:
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
複製代碼
能夠看出與ArrayBlockingQueue主要的區別是,LinkedBlockingQueue在插入數據和刪除數據時分別是由兩個不一樣的lock(takeLock
和putLock
)來控制線程安全的,所以,也由這兩個lock生成了兩個對應的condition(notEmpty
和notFull
)來實現可阻塞的插入和刪除數據。而且,採用了鏈表的數據結構來實現隊列,Node結點的定義爲:
static class Node<E> {
E item;
/**
* One of:
* - the real successor Node
* - this Node, meaning the successor is head.next
* - null, meaning there is no successor (this is the last node)
*/
Node<E> next;
Node(E x) { item = x; }
}
複製代碼
接下來,咱們也一樣來看看put方法和take方法的實現。
put方法源碼爲:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
// Note: convention in all put/take/etc is to preset local var
// holding count negative to indicate failure unless set.
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
/*
* Note that count is used in wait guard even though it is
* not protected by lock. This works because count can
* only decrease at this point (all other puts are shut
* out by lock), and we (or some other waiting put) are
* signalled if it ever changes from capacity. Similarly
* for all other uses of count in other wait guards.
*/
//若是隊列已滿,則阻塞當前線程,將其移入等待隊列
while (count.get() == capacity) {
notFull.await();
}
//入隊操做,插入數據
enqueue(node);
c = count.getAndIncrement();
//若隊列知足插入數據的條件,則通知被阻塞的生產者線程
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
}
複製代碼
put方法的邏輯也一樣很容易理解,可見註釋。基本上和ArrayBlockingQueue的put方法同樣。take方法的源碼以下:
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
//當前隊列爲空,則阻塞當前線程,將其移入到等待隊列中,直至知足條件
while (count.get() == 0) {
notEmpty.await();
}
//移除隊頭元素,獲取數據
x = dequeue();
c = count.getAndDecrement();
//若是當前知足移除元素的條件,則通知被阻塞的消費者線程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
複製代碼
take方法的主要邏輯請見於註釋,也很容易理解。
相同點:ArrayBlockingQueue和LinkedBlockingQueue都是經過condition通知機制來實現可阻塞式插入和刪除元素,並知足線程安全的特性;
不一樣點:1. ArrayBlockingQueue底層是採用的數組進行實現,而LinkedBlockingQueue則是採用鏈表數據結構; 2. ArrayBlockingQueue插入和刪除數據,只採用了一個lock,而LinkedBlockingQueue則是在插入和刪除分別採用了putLock
和takeLock
,這樣能夠下降線程因爲線程沒法獲取到lock而進入WAITING狀態的可能性,從而提升了線程併發執行的效率。