在多線程編程過程當中,爲了業務解耦和架構設計,常常會使用併發容器用於存儲多線程間的共享數據,這樣不只能夠保證線程安全,還能夠簡化各個線程操做。例如在「生產者-消費者」問題中,會使用阻塞隊列(BlockingQueue)做爲數據容器,關於BlockingQueue能夠看這篇文章。爲了加深對阻塞隊列的理解,惟一的方式是對其實驗原理進行理解,這篇文章就主要來看看ArrayBlockingQueue和LinkedBlockingQueue的實現原理。node
阻塞隊列最核心的功能是,可以可阻塞式的插入和刪除隊列元素。當前隊列爲空時,會阻塞消費數據的線程,直至隊列非空時,通知被阻塞的線程;當隊列滿時,會阻塞插入數據的線程,直至隊列未滿時,通知插入數據的線程(生產者線程)。那麼,多線程中消息通知機制最經常使用的是lock的condition機制,關於condition能夠看這篇文章的詳細介紹。那麼ArrayBlockingQueue的實現是否是也會採用Condition的通知機制呢?下面來看看。編程
ArrayBlockingQueue的主要屬性以下:數組
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
final ReentrantLock lock;
/** Condition for waiting takes */
private final Condition notEmpty; //消費者等待線程
/** Condition for waiting puts */
private final Condition notFull; //生產者等待線程
從源碼中能夠看出ArrayBlockingQueue內部是採用數組進行數據存儲的(屬性items
),爲了保證線程安全,採用的是ReentrantLock lock,爲了保證可阻塞式的插入刪除數據利用的是Condition,當獲取數據的消費者線程被阻塞時會將該線程放置到notEmpty等待隊列中,當插入數據的生產者線程被阻塞時,會將該線程放置到notFull等待隊列中。而notEmpty和notFull等中要屬性在構造方法中進行建立:安全
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
接下來,主要看看可阻塞式的put和take方法是怎樣實現的。數據結構
put(E e)
方法源碼以下:多線程
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//若是當前隊列已滿,將線程移入到notFull等待隊列中
while (count == items.length)
notFull.await(); //移入生產者等待線程
//知足插入數據的要求,直接進行入隊操做
enqueue(e);
} finally {
lock.unlock();
}
}
該方法的邏輯很簡單,當隊列已滿時(count == items.length
)將線程移入到notFull等待隊列中,若是當前知足插入數據的條件,就能夠直接調用enqueue(e)
插入數據元素。enqueue方法源碼爲:架構
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//插入數據
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
//通知消費者線程,當前隊列中有數據可供消費
notEmpty.signal();
}
enqueue方法的邏輯一樣也很簡單,先完成插入數據,即往數組中添加數據(items[putIndex] = x
),而後通知被阻併發
塞的消費者線程,當前隊列中有數據可供消費(notEmpty.signal()
)。less
take方法源碼以下:高併發
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//若是隊列爲空,沒有數據,將消費者線程移入等待隊列中
while (count == 0)
notEmpty.await();//移入消費者等待線程
//獲取數據
return dequeue();
} finally {
lock.unlock();
}
}
take方法也主要作了兩步:1. 若是當前隊列爲空的話,則將獲取數據的消費者線程移入到等待隊列中;2. 若隊列不爲空則獲取數據,即完成出隊操做dequeue
。dequeue方法源碼爲:
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
dequeue方法也主要作了兩件事情:1. 獲取隊列中的數據,即獲取數組中的數據元素((E) items[takeIndex]
);2. 通知notFull等待隊列中的線程,使其由等待隊列移入到同步隊列中,使其可以有機會得到lock,並執行完成功退出。
從以上分析,能夠看出put和take方法主要是經過condition的通知機制來完成可阻塞式的插入數據和獲取數據。在理解ArrayBlockingQueue後再去理解LinkedBlockingQueue就很容易了。
LinkedBlockingQueue是用鏈表實現的有界阻塞隊列,當構造對象時爲指定隊列大小時,隊列默認大小爲Integer.MAX_VALUE
。從它的構造方法能夠看出:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE);
}
LinkedBlockingQueue的主要屬性有:
/** Current number of elements */
private final AtomicInteger count = new AtomicInteger();
/**
* Head of linked list.
* Invariant: head.item == null
*/
transient Node<E> head;//頭
/**
* Tail of linked list.
* Invariant: last.next == null
*/
private transient Node<E> last; //尾
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock(); //消費者可重入鎖
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();//消費者線程
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock(); //生產者可重入鎖
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();//生產者線程
能夠看出與ArrayBlockingQueue主要的區別是,LinkedBlockingQueue在插入數據和刪除數據時分別是由兩個不一樣的lock(takeLock
和putLock
)來控制線程安全的,所以,也由這兩個lock生成了兩個對應的condition(notEmpty
和notFull
)來實現可阻塞的插入和刪除數據。而且,採用了鏈表的數據結構來實現隊列,這樣設計的目的是由於插入數據
是往隊尾進行的,刪除數據是往隊頭進行的,從而能夠下降獲取不到Lock鎖而進入Waiting狀態,提升併發
Node結點的定義爲:
static class Node<E> { E item; /** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */ Node<E> next; Node(E x) { item = x; } }
接下來,咱們也一樣來看看put方法和take方法的實現。
put方法源碼爲:
public void put(E e) throws InterruptedException { if (e == null) throw new NullPointerException(); // Note: convention in all put/take/etc is to preset local var // holding count negative to indicate failure unless set. int c = -1; Node<E> node = new Node<E>(e); final ReentrantLock putLock = this.putLock; final AtomicInteger count = this.count; putLock.lockInterruptibly();//與lock方法相似,優先考慮響應中斷 try { /* * Note that count is used in wait guard even though it is * not protected by lock. This works because count can * only decrease at this point (all other puts are shut * out by lock), and we (or some other waiting put) are * signalled if it ever changes from capacity. Similarly * for all other uses of count in other wait guards. */ //若是隊列已滿,則阻塞當前線程,將其移入等待隊列 while (count.get() == capacity) { notFull.await(); //生產者線程阻塞 } //入隊操做,向隊尾插入數據 enqueue(node); c = count.getAndIncrement();//索引後移 //若隊列知足插入數據的條件,則通知被阻塞的生產者線程 if (c + 1 < capacity) notFull.signal(); } finally { putLock.unlock(); } if (c == 0) signalNotEmpty(); }
put方法的邏輯也一樣很容易理解,可見註釋。基本上和ArrayBlockingQueue的put方法同樣。take方法的源碼以下:
public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { //當前隊列爲空,則阻塞當前線程,將其移入到等待隊列中,直至知足條件 while (count.get() == 0) { notEmpty.await(); } //移除隊頭元素,獲取數據 x = dequeue(); c = count.getAndDecrement(); //若是當前知足移除元素的條件,則通知被阻塞的消費者線程 if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }
take方法的主要邏輯請見於註釋,也很容易理解。
相同點:ArrayBlockingQueue和LinkedBlockingQueue都是經過condition通知機制來實現可阻塞式插入和刪除元素,並知足線程安全的特性;
不一樣點:1. ArrayBlockingQueue底層是採用的數組進行實現,而LinkedBlockingQueue則是採用鏈表數據結構;
ArrayBlockingQueue插入和刪除數據,只採用了一個lock,而LinkedBlockingQueue則是在插入和刪除分別採用了putLock
和`takeLock`,這樣能夠下降線程因爲線程沒法獲取到lock而進入WAITING狀態的可能性,從而提升了線程併發執行的效率