LinkedBlockingQueue源碼解析

LinkedBlockingQueue源碼解析

什麼是LinkedBlockingQueue

LinkedBlockingQueue底層是由節點鏈表實現的定長阻塞隊列(阻塞表示若是沒有原始那麼獲取元素會阻塞當前線程)html

LinkedBlockingQueue用來幹嗎

LinkedBlockingQueue通常用於生產者消費者模型業務(排隊機制,先進先出)java

源碼解析

數據的存儲

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {
    private static final long serialVersionUID = -6903933977591709194L;
    /**
     * Linked list node class
     */
    static class Node<E> {//存儲數據的節點
        E item;
 
        Node<E> next;

        Node(E x) { item = x; }
    }

    /** The capacity bound, or Integer.MAX_VALUE if none */
    private final int capacity;//鏈表的最大長度,若是不設置值默認爲Integer.MAX_VALUE

    /** Current number of elements */
    private final AtomicInteger count = new AtomicInteger(0);//統計數量線程安全

    /**
     * Head of linked list.
     * Invariant: head.item == null
     */
    private transient Node<E> head;//頭節點

    /**
     * Tail of linked list.
     * Invariant: last.next == null
     */
    private transient Node<E> last;//尾節點

    /** Lock held by take, poll, etc */
    private final ReentrantLock takeLock = new ReentrantLock();//tackLock

    /** Wait queue for waiting takes */
    private final Condition notEmpty = takeLock.newCondition();//tackLock條件不爲空

    /** Lock held by put, offer, etc */
    private final ReentrantLock putLock = new ReentrantLock();//putLock

    /** Wait queue for waiting puts */
    private final Condition notFull = putLock.newCondition();//putLock條件沒滿
    public LinkedBlockingQueue() {
            this(Integer.MAX_VALUE);
    }
    public LinkedBlockingQueue(int capacity) {
    	if (capacity <= 0) throw new IllegalArgumentException();
    	this.capacity = capacity;
    	last = head = new Node<E>(null);//默認last=head=空節點
    }

數據的操做

addnode

public void put(E e) throws InterruptedException {
	if (e == null) throw new NullPointerException();//不能存儲空元素
	int c = -1;
	Node<E> node = new Node(e);//建立節點
	final ReentrantLock putLock = this.putLock;//得到putLock
	final AtomicInteger count = this.count;//獲取當前數量
	putLock.lockInterruptibly();//獲取鎖,若是有調用Thread.Interrupted()直接拋出異常
	try {
		
		while (count.get() == capacity) {//若是當前隊列以滿,進入等待狀態
			notFull.await();
		}
		enqueue(node);
		c = count.getAndIncrement();
		if (c + 1 < capacity)
			notFull.signal();
	} finally {
		putLock.unlock();
	}
	if (c == 0)
		signalNotEmpty();
}

public boolean offer(E e, long timeout, TimeUnit unit)	offer(e)相似
	throws InterruptedException {

	if (e == null) throw new NullPointerException();//不能存儲空元素
	long nanos = unit.toNanos(timeout);//裝換爲納秒
	int c = -1;
	final ReentrantLock putLock = this.putLock;
	final AtomicInteger count = this.count;
	putLock.lockInterruptibly();
	try {
		while (count.get() == capacity) {
			if (nanos <= 0)
				return false;
			nanos = notFull.awaitNanos(nanos);//等待一段時間
		}
		enqueue(new Node<E>(e));
		c = count.getAndIncrement();//遞增
		if (c + 1 < capacity)//若是未滿喚醒notFull.awit
			notFull.signal();
	} finally {
		putLock.unlock();
	}
	if (c == 0)
		signalNotEmpty();//喚醒notEmpty.await()
	return true;
}
private void enqueue(Node<E> node) {
	// assert putLock.isHeldByCurrentThread();
	// assert last.next == null;
	//拆分爲兩步 last.next = node,last = node
	//每次head.next=當前的last而後last.next指向node
	last = last.next = node;
	
}

remove安全

public boolean remove(Object o) {
	if (o == null) return false;
	fullyLock();//刪除數據時所有lock
	try {
		for (Node<E> trail = head, p = trail.next;
			 p != null;
			 trail = p, p = p.next) {
			if (o.equals(p.item)) {
				unlink(p, trail);
				return true;
			}
		}
		return false;
	} finally {
		fullyUnlock();
	}
}
void unlink(Node<E> p, Node<E> trail) {
	// assert isFullyLocked();
	// p.next is not changed, to allow iterators that are
	// traversing p to maintain their weak-consistency guarantee.
	p.item = null;
	trail.next = p.next;//先後元素執行,大年元素設置爲空
	if (last == p)
		last = trail;
	if (count.getAndDecrement() == capacity)//count獲取數量同時遞減(獲取數量爲遞減錢數量)
		notFull.signal();//喚醒 notFull.await()
}

getoracle

//獲取元素,消費,可能被中斷
public E take() throws InterruptedException {
	E x;
	int c = -1;
	final AtomicInteger count = this.count;
	final ReentrantLock takeLock = this.takeLock;
	takeLock.lockInterruptibly();//若是有調用Thread.Interrupted()拋出異常
	try {
		while (count.get() == 0) {
			notEmpty.await();//元素爲空進入等待狀態
		}
		x = dequeue();//
		c = count.getAndDecrement();
		if (c > 1)
			notEmpty.signal();
	} finally {
		takeLock.unlock();
	}
	if (c == capacity)
		signalNotFull();
	return x;
}
//獲取元素,消費
public E poll() {
	final AtomicInteger count = this.count;
	if (count.get() == 0)
		return null;
	E x = null;
	int c = -1;
	final ReentrantLock takeLock = this.takeLock;
	takeLock.lock();
	try {
		if (count.get() > 0) {
			x = dequeue();
			c = count.getAndDecrement();
			if (c > 1)
				notEmpty.signal();
		}
	} finally {
		takeLock.unlock();
	}
	if (c == capacity)
		signalNotFull();
	return x;
}
//查看元素
public E peek() {
	if (count.get() == 0)
		return null;
	final ReentrantLock takeLock = this.takeLock;
	takeLock.lock();
	try {
		Node<E> first = head.next;
		if (first == null)
			return null;
		else
			return first.item;
	} finally {
		takeLock.unlock();
	}
}
[null,aaa,bbb] queue
[null,bbb] delete after queue
去掉頭部null元素獲取aaa元素修改aaa元素的item=null
private E dequeue() {
	// assert takeLock.isHeldByCurrentThread();
	// assert head.item == null;
	Node<E> h = head;
	Node<E> first = h.next;//first第一個有值的節點
	h.next = h; // help GC
	head = first;
	E x = first.item;//獲取元素
	first.item = null;//設置爲空
	return x;
}

何時擴容

定長鏈表不支持擴容this

是否線程安全

線程安全線程

使用注意事項

  • 默認建立方式鏈表醉大長度爲Ineger.MAX_SIZE

引用

相關文章
相關標籤/搜索