ArrayBlockingQueue源碼解析

ArrayBlockingQueue源碼解析

什麼是ArrayBlockingQueue

ArrayBlockingQueue底層是由數組實現的定長阻塞隊列(阻塞表示若是沒有原始那麼獲取元素會阻塞當前線程)html

ArrayBlockingQueue用來幹嗎

ArrayBlockingQueue通常用於生產者消費者模型業務(排隊機制,先進先出)java

源碼解析

數據的存儲

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    private static final long serialVersionUID = -817911632652898426L;

    /** The queued items 存儲元素容器*/
    final Object[] items;

    /** items index for next take, poll, peek or remove 使用過的元素 */
    int takeIndex;

    /** items index for next put, offer, or add 添加過的元素 */
    int putIndex;

    /** Number of elements in the queue 當前元素數量 */
    int count;

數據的操做

add數組

public boolean add(E e) {
	return super.add(e);
}
super.add
public boolean add(E e) {
	if (offer(e))
		return true;
	else
		throw new IllegalStateException("Queue full");
}
public boolean offer(E e) {
	checkNotNull(e);//ArrayBlockingQueue不能存儲null對象
	final ReentrantLock lock = this.lock;//插入操做線程安全
	lock.lock();
	try {
		if (count == items.length)//若是當前count==items.length表示隊列已經忙了,不能插入
			return false;
		else {
			insert(e);//插入元素
			return true;
		}
	} finally {
		lock.unlock();
	}
}
private void insert(E x) {
	items[putIndex] = x;//第一次put爲0
	putIndex = inc(putIndex);//遞增
	++count;//數量遞增
	notEmpty.signal();//通知獲取原始方法能夠進行獲取
}
final int inc(int i) {//若是當前putIndex==items.length那麼putIndex從新從零開始
	return (++i == items.length) ? 0 : i;
}
//一樣爲添加元素,lock.lockInterruptibly若是檢測到有Thread.interrupted();會直接拋出異常
public void put(E e) throws InterruptedException {
	checkNotNull(e);
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();
	try {
		while (count == items.length)
			notFull.await();
		insert(e);
	} finally {
		lock.unlock();
	}
}

remove安全

public boolean remove(Object o) {
	if (o == null) return false;
	final Object[] items = this.items;
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		for (int i = takeIndex, k = count; k > 0; i = inc(i), k--) {
			if (o.equals(items[i])) {//從頭部開始遍歷元素判斷
				removeAt(i);
				return true;
			}
		}
		return false;
	} finally {
		lock.unlock();
	}
}
//queue size = 10 putSize = 5 tackSize = 0
//queue 1,2,3,4,5
removeAt 3
step1: removeAt != takeIndex
i = 
nexti = 4


void removeAt(int i) {
	final Object[] items = this.items;
	// if removing front item, just advance
	if (i == takeIndex) {
		items[takeIndex] = null;//引用設置爲空
		takeIndex = inc(takeIndex);//takeIndex++
	} else {
		// slide over all others up through putIndex.
		for (;;) {
			int nexti = inc(i);//>隊列的頭部  遞增(putIndex一個循環的0-n)
			if (nexti != putIndex) {//遞增後部位putIndex所有向前移動位置
				items[i] = items[nexti];
				i = nexti;
			} else {
				items[i] = null;//元素設置爲空
				putIndex = i;
				break;
			}
		}
	}
	--count;//元素遞減
	notFull.signal();//通知notFull.awit()
}

getoracle

public E poll() {//獲取隊列頭部元素,獲取後設置爲空
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		return (count == 0) ? null : extract();//若是當前隊列爲空直接返回null,不爲空調用extract()
	} finally {
		lock.unlock();
	}
}
//獲取隊列頭部元素,獲取後設置爲空
//take獲取原始若是隊列爲空會進入阻塞狀態知道等到有添加元素纔會去返回
public E take() throws InterruptedException {
	final ReentrantLock lock = this.lock;
	lock.lockInterruptibly();//lock.lockInterruptibly若是檢測到有Thread.interrupted();會直接拋出異常
	try {
		while (count == 0)
			notEmpty.await();//若是沒有元素進入等待狀態,等待被喚醒
		return extract();
	} finally {
		lock.unlock();
	}
}
//peek查看隊列頭部元素
public E peek() {
	final ReentrantLock lock = this.lock;
	lock.lock();
	try {
		return (count == 0) ? null : itemAt(takeIndex);//若是元素爲空直接返回null,不爲空條用itemAt(takeIndex)
	} finally {
		lock.unlock();
	}
}
private E extract() {
	final Object[] items = this.items;
	E x = this.<E>cast(items[takeIndex]);//泛型轉換而且得到當前元素
	items[takeIndex] = null;//當前元素設置爲空
	takeIndex = inc(takeIndex);//獲取原始遞增
	--count;//隊列元素遞減
	notFull.signal();//通知notFull.await()能夠進行插入元素
	return x;//返回當前獲取原始
}
//獲取元素
final E itemAt(int i) {
	return this.<E>cast(items[i]);
}

何時擴容

定長隊列,不能進行擴容ide

是否線程安全

線程安全this

使用注意事項

  • ArrayBlockingQueue爲定長隊列
  • ArrayBlockingQueue的添加和獲取方法都有提供阻塞和非阻塞的根據須要使用

引用

相關文章
相關標籤/搜索