研究了一段時間框架,有點審美疲勞,今天講點輕鬆的,手寫一個阻塞隊列,實踐一把lock+condition。java
首先複習一下經典的 「等待通知」機制。編程
線程首先獲取互斥鎖,當線程要求的條件不知足時,釋放互斥鎖,進入等待狀態;當要求的條件知足時,通知等待的線程,從新獲取互斥鎖 --《極客時間-Java併發編程實戰》數組
在Java中實現 「等待通知」 機制通常有兩種方式,synchronized/Lock+Condition。多線程
synchronized同步原語(或稱:管程)配合wait()、notify()、notifyAll()就能夠實現「等待通知」機制。併發
機理是怎樣的呢?框架
當使用synchronized管程對某一塊臨界區進行加鎖,同一時刻,只能容許一個線程進入synchronized保護的臨界區中。ide
當該遠程進入臨界區以後,其餘的線程若是來訪問臨界區就須要進入等待隊列中進行等待。測試
這裏要注意,等待隊列與鎖是一一對應關係,每一個互斥鎖都有本身的獨立的等待隊列。this
Java對象的wait()方法就可以讓線程進入等待狀態,此時線程被阻塞。spa
當線程進入等待隊列時,會釋放當前持有的互斥鎖。當它釋放鎖以後,其餘的線程就有機會得到該互斥鎖並進入臨界區。
那如何通知知足條件的線程呢?
經過Java對象的notify()和notifyAll()方法就可以實現。當條件知足時調用notify(),會通知等待隊列中的線程,通知它 條件曾經知足過。
notify()只能保證在通知的那一時間點,條件是知足的。也就是,有可能被通知線程執行的時間點與通知的時間點是不相等的;即:線程執行的時候,條件已經不知足了(可能有其餘的線程知足了該條件而插隊)
另外,就算線程被通知而喚醒,在進入臨界區前依舊須要獲取互斥鎖,由於這把須要獲取的鎖在調用wait()的時候已經被釋放了。
須要注意的是:
wait()、notify()、notifyAll()被調用的前提是獲取到了響應的互斥鎖,也就是調用這三個方法的位置都是在 synchronized{} 內部。若是調用的位置在synchronized外部或者不是使用同一把互斥鎖,JVM會拋出 java.lang.IllegalMonitorStateException 異常。
關於synchronized實現 「等待-通知」 機制咱們就講到這裏。
經過Lock+Condition實現 「等待-通知」 機制與synchronized相似,咱們本文實現阻塞隊列BlockedQueue的方式就是經過Lock+Condition實現。
Condition 定義了等待/通知兩種類型的方法:await()/signal()/signalAll()。線程調用這些方法以前須要獲取Condition關聯的鎖。
Condition對象是由Lock對象經過newCondition()方法建立的,也就是說,Condition是依賴Lock對象的。
類比上文中講到的synchronized實現 「等待-通知」 機制,Lock/Condition涉及到的方法與synchronized方式涉及到的方法的語義是一一對應的,具體以下表:
瞭解並複習了 管程中的「等待/通知機制」,咱們開始實現阻塞隊列BlockedQueue。
在編寫過程當中參考了JUC中的ArrayBlockingQueue源碼實現。
public class BlockedQueue<T> {複製代碼
final Lock lock = new ReentrantLock();
// 條件變量:隊列不滿
final Condition notFull = lock.newCondition();
// 條件變量:隊列不空
final Condition notEmpty = lock.newCondition();複製代碼
// 阻塞單列最大長度
int capacity = 0;複製代碼
// 當前已經存在下標:入隊
int putIndex = 0;複製代碼
// 當前已經存在下標:出隊
int takeIndex = 0;複製代碼
// 元素總數
int elementsSize = 0;複製代碼
// 元素數組
Object[] items;複製代碼
// 構造方法
public BlockedQueue(int capacity) {
this.capacity = capacity;
items = new Object[capacity];
System.out.println("capacity=" + capacity + ",items.size=" + items.length);
}複製代碼
這段代碼中咱們聲明瞭阻塞隊列,支持泛型。聲明瞭須要的成員變量以及有參構造方法。構造方法中根據外界輸入的隊列最大長度初始化了內部的元素數組。
提早聲明並初始化了Lock(實現方式爲ReentrantLock可重入鎖),並在Lock基礎上初始化了兩個Condition條件變量,分別標記隊列不滿、隊列不空。
// 入隊
void enq(T x) {
lock.lock();
try {
// 隊列已滿
while (items.length == elementsSize) {
// 等待隊列不滿
notFull.await();
}
// 入隊操做...
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
++elementsSize;
// 入隊後, 通知可出隊
notEmpty.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
System.out.println(x.toString() + "--入隊完成");
}
}複製代碼
這段代碼爲入隊邏輯。
首先獲取可重入鎖,若是加鎖成功則進入臨界區邏輯,不然嘗試解鎖。
當隊列已經滿時,則進入阻塞狀態,等待隊列不滿。
若是隊列不滿則進行入隊,當前下標的元素即爲要入隊的元素,元素總長度增1。
// 出隊
T deq() {
lock.lock();
T x = null;
try {
// 隊列已空
while (items.length == 0) {
// 等待隊列不空
notEmpty.await();
}
// 出隊操做...
x = (T) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
elementsSize--;
// 出隊後,通知可入隊
notFull.signal();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
lock.unlock();
}
return x;
}複製代碼
這段代碼爲出隊邏輯。
首先獲取可重入鎖,若是加鎖成功則進入臨界區邏輯,不然嘗試解鎖。
當隊列已經空,則進入阻塞狀態,等待隊列不空。
若是隊列不空則進行出隊操做,先暫存當前下標的元素,並將當前下標的元素標記爲空(NULL);元素總長度減1,解鎖後返回當前已經出隊的元素。
public T get(int index) {
return (T) items[index];
}複製代碼
這段代碼爲獲取對應下標的元素,若是元素不存在則返回空。
開發完基本邏輯以後,咱們寫一個demo來測試一下BlockedQueue。
public static void main(String[] args) {
BlockedQueue<String> blockedQueue = new BlockedQueue<>(20);
for (int i = 0; i < 20; i++) {
blockedQueue.enq("snowalker:" + i);
}複製代碼
System.out.println("入隊結束:-------------------------");
for (int i = 0; i < 20; i++) {
System.out.println(blockedQueue.get(i));
}複製代碼
for (int i = 0; i < 20; i++) {
blockedQueue.deq();
}
System.out.println("出隊結束:-------------------------");
for (int i = 0; i < 20; i++) {
System.out.println(blockedQueue.get(i));
}複製代碼
}複製代碼
邏輯很好理解,咱們構造了一個BlockedQueue,添加了20個元素進行入隊。入隊以後遍歷元素,查看入隊結果。
接着進行20次出隊,並遍歷出隊後的結果。
運行結果以下:
capacity=20,items.size=20
入隊結束:-------------------------
snowalker:0
snowalker:1
snowalker:2
snowalker:3
snowalker:4
snowalker:5
snowalker:6
snowalker:7
snowalker:8
snowalker:9
snowalker:10
snowalker:11
snowalker:12
snowalker:13
snowalker:14
snowalker:15
snowalker:16
snowalker:17
snowalker:18
snowalker:19
出隊結束:-------------------------
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null複製代碼
能夠看到,進行了20次入隊以後元素共有20個;
進行了20次出隊操做以後,元素所有爲空,表示出隊成功。
咱們接着測試一下多線程併發操做下,BlockedQueue的表現。
BlockedQueue<String> blockedQueue = new BlockedQueue<>(20);
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch end = new CountDownLatch(2);複製代碼
Thread thread0 = new Thread(new Runnable() {
@Override
public void run() {
try {
begin.await();
System.out.println("線程0準備完畢");
for (int i = 0; i < 10; i++) {
blockedQueue.enq("線程0-snowalker-" + i);
}
System.out.println("線程0入隊結束:-------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
});複製代碼
Thread thread1 = new Thread(new Runnable() {
@Override
public void run() {
try {
begin.await();
System.out.println("線程1準備完畢");
for (int i = 10; i < 20; i++) {
blockedQueue.enq("線程1-snowalker-" + i);
}
System.out.println("線程1入隊結束:-------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
end.countDown();
}
}
});複製代碼
thread0.start();
thread1.start();
begin.countDown();
end.await();
System.out.println("主線程準備完畢!");
System.out.println("主線程遍歷開始!");
for (int i = 0; i < 20; i++) {
System.out.println(blockedQueue.get(i));
}
System.out.println("Bingo!");
}複製代碼
咱們定義了兩個線程,每一個線程添加10個元素,經過閉鎖CountDownLatch進行併發添加,添加完成以後遍歷添加結果。打印以下:
capacity=20,items.size=20
線程0準備完畢
線程1準備完畢
線程0-snowalker-0--入隊完成
線程1-snowalker-10--入隊完成
線程0-snowalker-1--入隊完成
線程1-snowalker-11--入隊完成
線程0-snowalker-2--入隊完成
線程1-snowalker-12--入隊完成
線程0-snowalker-3--入隊完成
線程1-snowalker-13--入隊完成
線程0-snowalker-4--入隊完成
線程1-snowalker-14--入隊完成
線程0-snowalker-5--入隊完成
線程1-snowalker-15--入隊完成
線程1-snowalker-16--入隊完成
線程1-snowalker-17--入隊完成
線程1-snowalker-18--入隊完成
線程0-snowalker-6--入隊完成
線程1-snowalker-19--入隊完成
線程1入隊結束:-------------------------
線程0-snowalker-7--入隊完成
線程0-snowalker-8--入隊完成
線程0-snowalker-9--入隊完成
線程0入隊結束:-------------------------
主線程準備完畢!
主線程遍歷開始!
線程0-snowalker-0
線程1-snowalker-10
線程0-snowalker-1
線程1-snowalker-11
線程0-snowalker-2
線程1-snowalker-12
線程0-snowalker-3
線程1-snowalker-13
線程0-snowalker-4
線程1-snowalker-14
線程0-snowalker-5
線程1-snowalker-15
線程0-snowalker-6
線程1-snowalker-16
線程1-snowalker-17
線程1-snowalker-18
線程1-snowalker-19
線程0-snowalker-7
線程0-snowalker-8
線程0-snowalker-9
Bingo!複製代碼
能夠看到結果符合預期,咱們接着測試一下併發出隊,接着上面的添加結果進行併發出隊操做。
CountDownLatch begin = new CountDownLatch(1);
CountDownLatch dequeue = new CountDownLatch(2);
for (int i = 0; i < 20; i++) {
blockedQueue.enq("snowalker:" + i);
}複製代碼
Thread thread2 = new Thread(new Runnable() {
@Override
public void run() {
try {
begin.await();
System.out.println("線程2準備完畢");
for (int i = 0; i <= 10; i++) {
blockedQueue.deq();
}
System.out.println("線程2出隊結束:-------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
dequeue.countDown();
}
}
});複製代碼
Thread thread3 = new Thread(new Runnable() {
@Override
public void run() {
try {
begin.await();
System.out.println("線程3準備完畢");
for (int i = 0; i <= 10; i++) {
blockedQueue.deq();
}
System.out.println("線程3出隊結束:-------------------------");
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
dequeue.countDown();
}
}
});複製代碼
thread2.start();
thread3.start();
begin.countDown();
dequeue.await();
System.out.println("主線程準備完畢!");
System.out.println("主線程遍歷開始!");
for (int i = 0; i < 20; i++) {
System.out.println(blockedQueue.get(i));
}
System.out.println("Bingo!");
}複製代碼
咱們準備了20個元素入隊,而後併發進行出隊,等待兩個線程出隊完成以後,在主線程進行隊列元素的遍歷操做,結果以下:
capacity=20,items.size=20
snowalker:0--入隊完成
snowalker:1--入隊完成
snowalker:2--入隊完成
snowalker:3--入隊完成
snowalker:4--入隊完成
snowalker:5--入隊完成
snowalker:6--入隊完成
snowalker:7--入隊完成
snowalker:8--入隊完成
snowalker:9--入隊完成
snowalker:10--入隊完成
snowalker:11--入隊完成
snowalker:12--入隊完成
snowalker:13--入隊完成
snowalker:14--入隊完成
snowalker:15--入隊完成
snowalker:16--入隊完成
snowalker:17--入隊完成
snowalker:18--入隊完成
snowalker:19--入隊完成
線程2準備完畢
線程2出隊結束:-------------------------
線程3準備完畢
線程3出隊結束:-------------------------
主線程準備完畢!
主線程遍歷開始!
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
null
Bingo!複製代碼
結果如上圖所示,能夠看到併發出隊結果知足預期。
本文咱們利用JUC中的Lock+Condition管程實現了自定義BlockedQueue阻塞隊列的開發,並經過測試用例測試了併發條件下的出隊入隊,結果符合預期。
版權聲明: 原創不易,洗文可恥。除非註明,本博文章均爲原創,轉載請以連接形式標明本文地址。