線程安全的隊列-ArrayBlockingQueue源碼分析

一,ArrayBlockingQueue源碼分析

ArrayBlockingQueue是隊列的一種,隊列的特色嘛,先出先出,然而這種隊列是一種線程安全阻塞式的隊列,爲何是阻塞式隊列?我想,這正好是我寫和分析這篇文章的內容所在。java

因爲本篇內容涉及的內容比較多,因此有些地方本身不會特意講的很詳細,可是足夠本身使本身明白了,通常文章出來的時候,若是連本身讀起來都費勁,或者有些不懂的地方,我想,這樣的文章,一種是寫做者本身遺漏了或者寫的有的時候含糊其辭了。web

可是,我不會,由於,個人文章基本上主要是寫給本身的,若是能夠幫助須要的人,本身仍是比較開心的,由於,大家或許也看到了,我以前寫的文章風格與別人不同,本身以爲我把當時的想法寫出來就能夠了,若是不完美也沒事,之後本身在改進就能夠了,我想這就是我與別的創做者不一樣的一點,我也不是很刻意追求閱讀量如何如何,固然了,若是大家關注我,或者分享我寫的內容,我仍是很感謝你的,哈哈,下面咱們分析這個隊列集合的源碼了。編程

二,方法分析

2.0,左右能夠滑動查看

2.1,構造函數

//默認必須給定隊列的容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//第二步
public ArrayBlockingQueue(int capacity, boolean fair) {
//固然了,容量不能小於等於0,由於隊列是用來裝填元素
//初始化容量爲0,沒有意義撒
if (capacity <= 0)
throw new IllegalArgumentException();
//建立一個容量爲capacity大小的數組空間賦值給成員變量items
this.items = new Object[capacity];
//建立一個非公平鎖的實例對象
//這裏若是對ReentrantLock不瞭解的話,能夠本身查看一下哈
//lock鎖與synchronized關鍵字的鎖的區別仍是要知道一下的
lock = new ReentrantLock(fair);
//下面的newCondition操做,就是爲了後面的線程間通訊作準備的
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}

上面的分析過程當中,咱們瞭解瞭如何實現一個鎖,以及線程間通訊的內容,這裏簡單說起下,日後看,本身會對這部分進行詳盡的說明的。數組

2.2,add()方法

public boolean add(E e) {
//調用共用的方法add
return super.add(e);
}
//第二步操做
public boolean add(E e) {
//複用offer()方法的實現邏輯
if (offer(e))
return true;
else
//若隊列添加失敗,說明隊列已經滿了,不可能裝填數據元素了
//此時拋出隊列已滿的異常便可
throw new IllegalStateException("Queue full");
}
//第三步操做
public boolean offer(E e) {
//這個隊列也是不能夠裝填元素爲null的元素的,因此須要進行檢查元素是否爲空的邏輯校驗
checkNotNull(e);
//獲取鎖實例對象
final ReentrantLock lock = this.lock;
//進行加鎖操做,因爲後面的大部分方法都會用到鎖,因此這裏能夠看出這是一個線程安全的隊列
lock.lock();
try {
//隊列的容量,在建立的時候就已經指定了,若是隊列的元素個數count和數組的空間相等了
//說明隊列已經沒有容量裝填數據元素了,此時返回false便可
if (count == items.length)
return false;
else {
//進行入隊列操做
enqueue(e);
return true;
}
} finally {
//釋放鎖的邏輯
lock.unlock();
}
}
//第四步操做
private static void checkNotNull(Object v) {
if (v == null)
throw new NullPointerException();
}
//第五步操做
private void enqueue(E x) {
//將實例變量items賦值給臨時變量items,主要也是編程中常見的寫法
final Object[] items = this.items;
//將元素x裝載到隊列的末尾,此時的putIndex能夠數組索引下標的,我我的理解
items[putIndex] = x;
//這裏爲啥要加這麼一句呢?我想這是由於數組空間滿了,又要從新開始了,因此這裏putIndex要置爲0
if (++putIndex == items.length)
putIndex = 0;
//count表示隊列的元素個數,是個成員變量,入隊列以後,count加一是必須的
count++;
//發出一個信號通知,說明隊列不空,有元素能夠從隊列進行獲取
//這裏主要是線程間通訊的,等下後面會介紹線程間通訊的
notEmpty.signal();
}

線程間通訊,你知道有哪一種方式嗎,後面本身會單獨介紹的,後面本身慢慢會介紹的,不要着急哦安全

2.3,peek()方法

public E peek() {
//加鎖lock.lock
final ReentrantLock lock = this.lock;
lock.lock();
try {
//根據數組的索引下標獲取指定位置的元素,此時元素並無出隊列,不一樣於後面要分析的poll方法
return itemAt(takeIndex);
} finally {
//解鎖
lock.unlock();
}
}
//第二步操做
//這是一個final關鍵字修飾的方法,fianl關鍵字修飾變量,方法,類的做用均可以去回顧一下的哈
final E itemAt(int i) {
//根據索引下標獲取指定位置元素,時間複雜度爲o(1)
return (E) items[i];
}

final修飾方法,若是一個類不容許其子類覆蓋某個方法,即,不容許被子類重寫,則能夠把這個方法聲明爲final方法微信

2.4,size()方法

public int size() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//直接返回隊列元素個數的實例變量count便可,是否是很簡單
//於此同時,這也是一個線程安全的方法
return count;
} finally {
lock.unlock();
}

2.5,contains()方法

public boolean contains(Object o) {
//由於這個隊列裏面不包含null元素,因此若元素o爲null,則直接返回false
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
//這是一個線程安全的方法
lock.lock();
try {
if (count > 0) {
//當隊列的元素個數增長時,此時的putIndex值是增長的
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//若元素o,等於數組的其中一個元素,則直接返回false
if (o.equals(items[i]))
return true;
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
//隊列裏面沒有元素,則直接返回false
return false;
} finally {
lock.unlock();
}
}

2.6,take()方法

public E take() throws InterruptedException {
//線程安全的方法
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//若是隊列的元素個數爲0,則此時須要等待,因此這裏是一個阻塞式隊列
//此時這裏就至關於一條指令一直在循環判斷count的值是否不等於0
while (count == 0)
notEmpty.await();
//進行出隊列操做
return dequeue();
} finally {
lock.unlock();
}
}
//第二步操做
private E dequeue() {
//默認takeIndex是從0開始的,若是出隊列了,takeIndex值就會增長一
final Object[] items = this.items;
E x = (E) items[takeIndex];
//出隊列以後,元素就須要置爲null了,等待gc在某個時刻進行回收不可達對象的回收
items[takeIndex] = null;
//若是takeIndex等於了數組空間的大小了,說明,隊列的元素個數已經取完了,此時須要重置takeIndex值爲0
if (++takeIndex == items.length)
takeIndex = 0;
//每取出一個數組元素,元素個數減一
count--;
if (itrs != null)
itrs.elementDequeued();
//發出一個信號通知,"隊列不滿,還能夠put操做的信號"
notFull.signal();
return x;
}

2.7,poll()方法

public E poll() {
//線程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
//若隊列的元素個數爲0,則隊列的元素是沒有的,就返回了null
//不然就執行出隊列操做,上面已經分析過了,這裏就不分析了
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}

2.8,clear()方法

public void clear() {
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
//若是隊列存在元素大於0,直接執行下面的操做
if (k > 0) {
//putIndex的位置,就是須要移動到的位置
final int putIndex = this.putIndex;
int i = takeIndex;
do {
//循環將每一個元素值置爲null,等待gc在某個時刻觸發
items[i] = null;
if (++i == items.length)
i = 0;
} while (i != putIndex);
//這裏爲啥要賦值呢?思考一下
takeIndex = putIndex;
//元素個數置爲0
count = 0;
if (itrs != null)
itrs.queueIsEmpty();
//進行通知,此時隊列裏是能夠裝填元素了
for (; k > 0 && lock.hasWaiters(notFull); k--)
notFull.signal();
}
} finally {
lock.unlock();
}
}

2.9,toString()方法

public String toString() {
//線程安全的方法
final ReentrantLock lock = this.lock;
lock.lock();
try {
int k = count;
//隊列裏面的元素個數爲0,則返回"[]"
if (k == 0)
return "[]";

final Object[] items = this.items;
//使用StringBuilder方法進行拼接隊列的每個元素
StringBuilder sb = new StringBuilder();
sb.append('[');
for (int i = takeIndex; ; ) {
Object e = items[i];
sb.append(e == this ? "(this Collection)" : e);
if (--k == 0)
return sb.append(']').toString();
sb.append(',').append(' ');
if (++i == items.length)
i = 0;
}
} finally {
lock.unlock();
}
}

2.10,remainingCapacity()方法

public int remainingCapacity() {
//獲取lock實例對象
final ReentrantLock lock = this.lock;
lock.lock();
try {
//剩餘空間等於數組空間大小減去元素就是剩餘空間的大小
return items.length - count;
} finally {
lock.unlock();
}
}

三,總結一下

3.1,線程間通訊

基於Condition的await()和singal()方法來實現app

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

/**
* @author pc
*/

public class ConditionTest {

public Lock lock = new ReentrantLock();
public Condition condition = lock.newCondition();

public static void main(String[] args) {
ConditionTest test = new ConditionTest();
ExecutorService executorService = Executors.newFixedThreadPool(2);
executorService.execute(() -> test.conditionWait());
executorService.execute(() -> test.conditionSignal());
}

public void conditionWait() {
lock.lock();
try {
System.out.println(Thread.currentThread().getName() + "拿到鎖了");
System.out.println(Thread.currentThread().getName() + "等待信號");
condition.await();
System.out.println(Thread.currentThread().getName() + "拿到信號");
} catch (Exception e) {

} finally {
lock.unlock();
}
}

public void conditionSignal() {
lock.lock();
try {
Thread.sleep(3000);
System.out.println(Thread.currentThread().getName() + "拿到鎖了");
condition.signal();
System.out.println(Thread.currentThread().getName() + "發出信號");
} catch (Exception e) {

} finally {
lock.unlock();
}
}

}

3.2,ArrayBlockingQueue總結

對於分析以後的ArrayBlockingQueue,咱們能夠獲得什麼,這裏本身總結一下,咱們學會了ReentrantLock鎖的使用,學會了線程間通訊的方式,學會了分析源碼的思路,與此同時也學會了與本身交流和思考的內容。編輯器


本文分享自微信公衆號 - WwpwW(gh_245290c1861a)。
若有侵權,請聯繫 support@oschina.cn 刪除。
本文參與「OSC源創計劃」,歡迎正在閱讀的你也加入,一塊兒分享。函數

相關文章
相關標籤/搜索